Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async Rust: From Futures to Production / Async Rust:从 Future 到生产实践

Speaker Intro / 讲师简介

  • Principal Firmware Architect in Microsoft SCHIE (Silicon and Cloud Hardware Infrastructure Engineering) team / Microsoft SCHIE(Silicon and Cloud Hardware Infrastructure Engineering)团队首席固件架构师
  • Industry veteran with expertise in security, systems programming (firmware, operating systems, hypervisors), CPU and platform architecture, and C++ systems / 在安全、系统编程(固件、操作系统、虚拟机监控器)、CPU 与平台架构以及 C++ 系统方面拥有丰富经验
  • Started programming in Rust in 2017 (@AWS EC2), and have been in love with the language ever since / 2017 年在 AWS EC2 开始使用 Rust,此后一直深度投入并持续使用这门语言

A deep-dive guide to asynchronous programming in Rust. Unlike most async tutorials that start with tokio::main and hand-wave the internals, this guide builds understanding from first principles - the Future trait, polling, state machines - then progresses to real-world patterns, runtime selection, and production pitfalls.

这是一本关于 Rust 异步编程的深度指南。不同于许多从 tokio::main 直接入门、对内部机制一笔带过的教程,本书从第一性原理展开:Future trait、轮询、状态机,然后逐步过渡到真实世界中的模式、运行时选型以及生产环境常见陷阱。

Who This Is For / 适合谁阅读

  • Rust developers who can write synchronous Rust but find async confusing / 能写同步 Rust,但对 async 仍感到困惑的 Rust 开发者
  • Developers from C#, Go, Python, or JavaScript who know async/await but not Rust’s model / 熟悉 C#、Go、Python 或 JavaScript 中 async/await,但不了解 Rust 模型的开发者
  • Anyone who’s been bitten by Future is not Send, Pin<Box<dyn Future>>, or “why does my program hang?” / 被 Future is not SendPin<Box<dyn Future>> 或“程序为什么卡住了”这些问题困扰过的人

Prerequisites / 前置知识

You should be comfortable with:

你应当熟悉以下内容:

  • Ownership, borrowing, and lifetimes / 所有权、借用和生命周期
  • Traits and generics (including impl Trait) / Trait 与泛型(包括 impl Trait
  • Using Result<T, E> and the ? operator / 使用 Result<T, E>? 操作符
  • Basic multi-threading (std::thread::spawn, Arc, Mutex) / 基础多线程(std::thread::spawnArcMutex

No prior async Rust experience is needed.

不需要事先具备 async Rust 经验。

How to Use This Book / 如何使用本书

Read linearly the first time. Parts I-II build on each other. Each chapter has:

第一次阅读建议按顺序进行。 第一部分和第二部分是逐层递进的。每章都包含:

Symbol / 标记Meaning / 含义
🟢Beginner - foundational concept / 初级:基础概念
🟡Intermediate - requires earlier chapters / 中级:依赖前文内容
🔶Advanced - deep internals or production patterns / 高级:深入内部机制或生产模式

Each chapter includes:

每章包括:

  • A “What you’ll learn” block at the top / 顶部的 “你将学到什么” 区块
  • Mermaid diagrams for visual learners / 适合视觉学习者的 Mermaid 图示
  • An inline exercise with a hidden solution / 带隐藏答案的 内联练习
  • Key Takeaways summarizing the core ideas / 总结核心概念的 关键要点
  • Cross-references to related chapters / 指向相关章节的 交叉引用

Pacing Guide / 学习节奏建议

Chapters / 章节Topic / 主题Suggested Time / 建议时间Checkpoint / 检查点
1-5How Async Works / Async 如何工作6-8 hours / 6-8 小时You can explain Future, Poll, Pin, and why Rust has no built-in runtime / 你可以解释 FuturePollPin,以及 Rust 为什么没有内建运行时
6-10The Ecosystem / 生态系统6-8 hours / 6-8 小时You can build futures by hand, choose a runtime, and use tokio’s API / 你可以手写 future、选择运行时并使用 tokio API
11-13Production Async / 生产级 Async6-8 hours / 6-8 小时You can write production-grade async code with streams, proper error handling, and graceful shutdown / 你可以编写包含流、正确错误处理和优雅关闭能力的生产级异步代码
Capstone / 综合项目Chat Server / 聊天服务器4-6 hours / 4-6 小时You’ve built a real async application integrating all concepts / 你已经构建出整合所有概念的真实异步应用

Total estimated time: 22-30 hours

预计总时长:22-30 小时

Working Through Exercises / 练习建议

Every content chapter has an inline exercise. The capstone (Ch 16) integrates everything into a single project. For maximum learning:

每个内容章节都包含内联练习。综合项目(第 16 章)会把所有内容整合进一个项目。为了获得最佳学习效果:

  1. Try the exercise before expanding the solution - struggling is where learning happens / 先做题,再看答案,真正的学习往往发生在卡住的时候
  2. Type the code, don’t copy-paste - muscle memory matters for Rust’s syntax / 手敲代码,不要复制粘贴,Rust 语法需要肌肉记忆
  3. Run every example - cargo new async-exercises and test as you go / 运行每个示例,可以先用 cargo new async-exercises 边学边试

Table of Contents / 目录

Part I: How Async Works / 第一部分:Async 如何工作

Part II: The Ecosystem / 第二部分:生态系统

Part III: Production Async / 第三部分:生产级 Async

Appendices / 附录


1. Why Async is Different in Rust / 1. 为什么 Rust 中的 Async 与众不同 🟢

What you’ll learn / 你将学到:

  • Why Rust has no built-in async runtime (and what that means for you) / 为什么 Rust 没有内建 async 运行时,以及这对你意味着什么
  • The three key properties: lazy execution, no runtime, zero-cost abstraction / 三个关键特性:惰性执行、无内建运行时、零成本抽象
  • When async is the right tool (and when it’s slower) / 什么时候应该用 async,什么时候它反而更慢
  • How Rust’s model compares to C#, Go, Python, and JavaScript / Rust 的模型与 C#、Go、Python、JavaScript 的对比

The Fundamental Difference / 根本差异

Most languages with async/await hide the machinery. C# has the CLR thread pool. JavaScript has the event loop. Go has goroutines and a scheduler built into the runtime. Python has asyncio.

大多数带有 async/await 的语言都会把底层机制隐藏起来。C# 有 CLR 线程池,JavaScript 有事件循环,Go 在运行时中内建 goroutine 与调度器,Python 有 asyncio

Rust has nothing.

Rust 什么都没有。

There is no built-in runtime, no thread pool, no event loop. The async keyword is a zero-cost compilation strategy - it transforms your function into a state machine that implements the Future trait. Someone else (an executor) must drive that state machine forward.

Rust 没有内建运行时、没有线程池、没有事件循环。async 关键字本质上是一种零成本编译策略,它会把函数转换成一个实现了 Future trait 的状态机。这个状态机必须由别的东西(也就是 executor,执行器)来驱动前进。

Three Key Properties of Rust Async / Rust Async 的三个关键特性

graph LR
    subgraph "C# / JS / Go"
        EAGER["Eager Execution<br/>Task starts immediately"]
        BUILTIN["Built-in Runtime<br/>Thread pool included"]
        GC["GC-Managed<br/>No lifetime concerns"]
    end

    subgraph "Rust (and Python*)"
        LAZY["Lazy Execution<br/>Nothing happens until polled/awaited"]
        BYOB["Bring Your Own Runtime<br/>You choose the executor"]
        OWNED["Ownership Applies<br/>Lifetimes, Send, Sync matter"]
    end

    EAGER -. "opposite" .-> LAZY
    BUILTIN -. "opposite" .-> BYOB
    GC -. "opposite" .-> OWNED

    style LAZY fill:#e8f5e8,color:#000
    style BYOB fill:#e8f5e8,color:#000
    style OWNED fill:#e8f5e8,color:#000
    style EAGER fill:#e3f2fd,color:#000
    style BUILTIN fill:#e3f2fd,color:#000
    style GC fill:#e3f2fd,color:#000

* Python coroutines are lazy like Rust futures - they don’t execute until awaited or scheduled. However, Python still uses GC and has no ownership/lifetime concerns.

* Python 的 coroutine 和 Rust 的 future 一样也是惰性的,只有在 await 或被调度时才会执行。但 Python 仍然依赖 GC,也没有所有权和生命周期问题。

No Built-In Runtime / 没有内建运行时

// This compiles but does NOTHING:
// 这段代码可以编译,但什么都不会发生:
async fn fetch_data() -> String {
    "hello".to_string()
}

fn main() {
    let future = fetch_data(); // Creates the Future, but doesn't execute it
    // future is just a struct sitting on the stack
    // No output, no side effects, nothing happens
    // 这里只是创建了 Future,但并没有执行
    // future 只是一个放在栈上的结构体
    // 没有输出,没有副作用,什么都没发生
    drop(future); // Silently dropped - work was never started
    // 被静默丢弃,工作从未真正开始
}

Compare with C# where Task starts eagerly:

对比 C#,Task 是急切启动的:

// C# - this immediately starts executing:
// C# 中,这里会立刻开始执行:
async Task<string> FetchData() => "hello";

var task = FetchData(); // Already running!
var result = await task; // Just waits for completion

Lazy Futures vs Eager Tasks / 惰性 Future 与急切 Task

This is the single most important mental shift:

这是最重要的思维转变:

C# / JavaScriptPythonGoRust
Creation / 创建Task starts executing immediately / Task 会立刻执行Coroutine is lazy - returns an object, doesn’t run until awaited or scheduled / Coroutine 是惰性的,返回对象但不会立刻执行Goroutine starts immediately / Goroutine 立即开始Future does nothing until polled / Future 在被 poll 前什么都不做
Dropping / 丢弃Detached task continues running / 脱离引用后任务仍继续Unawaited coroutine is garbage-collected (with a warning) / 未 await 的 coroutine 会被 GC 回收(伴随警告)Goroutine runs until return / Goroutine 一直运行到返回Dropping a Future cancels it / 丢弃 Future 就等于取消它
Runtime / 运行时Built into the language/VM / 内建于语言或虚拟机asyncio event loop (must be explicitly started) / asyncio 事件循环(需显式启动)Built into the binary (M:N scheduler) / 内建于二进制(M:N 调度)You choose (tokio, smol, etc.) / 由你选择(tokio、smol 等)
Scheduling / 调度Automatic (thread pool) / 自动(线程池)Event loop + await or create_task() / 事件循环加 awaitcreate_task()Automatic (GMP scheduler) / 自动(GMP 调度器)Explicit (spawn, block_on) / 显式(spawnblock_on
Cancellation / 取消CancellationToken (cooperative) / CancellationToken(协作式)Task.cancel() (cooperative, raises CancelledError) / Task.cancel()(协作式,会抛 CancelledErrorcontext.Context (cooperative) / context.Context(协作式)Drop the future (immediate) / 直接丢弃 future(立即取消)
// To actually RUN a future, you need an executor:
// 真正要运行 future,需要执行器:
#[tokio::main]
async fn main() {
    let result = fetch_data().await; // NOW it executes
    println!("{result}");
}

When to Use Async (and When Not To) / 什么时候该用 Async,什么时候不该用

graph TD
    START["What kind of work?"]

    IO["I/O-bound?<br/>(network, files, DB)"]
    CPU["CPU-bound?<br/>(computation, parsing)"]
    MANY["Many concurrent connections?<br/>(100+)"]
    FEW["Few concurrent tasks?<br/>(<10)"]

    USE_ASYNC["Use async/await"]
    USE_THREADS["Use std::thread or rayon"]
    USE_SPAWN_BLOCKING["Use spawn_blocking()"]
    MAYBE_SYNC["Consider synchronous code<br/>(simpler, less overhead)"]

    START -->|Network, files, DB| IO
    START -->|Computation| CPU
    IO -->|Yes, many| MANY
    IO -->|Just a few| FEW
    MANY --> USE_ASYNC
    FEW --> MAYBE_SYNC
    CPU -->|Parallelize| USE_THREADS
    CPU -->|Inside async context| USE_SPAWN_BLOCKING

    style USE_ASYNC fill:#c8e6c9,color:#000
    style USE_THREADS fill:#c8e6c9,color:#000
    style USE_SPAWN_BLOCKING fill:#c8e6c9,color:#000
    style MAYBE_SYNC fill:#fff3e0,color:#000

Rule of thumb: Async is for I/O concurrency (doing many things at once while waiting), not CPU parallelism (making one thing faster). If you have 10,000 network connections, async shines. If you’re crunching numbers, use rayon or OS threads.

经验法则:Async 适用于 I/O 并发,也就是“等待时同时做很多事”,而不是 CPU 并行,也就是“把一件计算做得更快”。如果你要处理 10,000 个网络连接,async 会非常合适;如果你是在做重计算,请用 rayon 或操作系统线程。

When Async Can Be Slower / Async 什么时候可能更慢

Async isn’t free. For low-concurrency workloads, synchronous code can outperform async:

Async 不是免费的。对于低并发工作负载,同步代码有时反而比 async 更快:

Cost / 成本Why / 原因
State machine overhead / 状态机开销Each .await adds an enum variant; deeply nested futures produce large, complex state machines / 每个 .await 都会增加状态,深层嵌套会生成很大、很复杂的状态机
Dynamic dispatch / 动态分发Box<dyn Future> adds indirection and kills inlining / Box<dyn Future> 会增加间接层并阻碍内联优化
Context switching / 上下文切换Cooperative scheduling still has cost - the executor must manage a task queue, wakers, and I/O registrations / 协作式调度仍有代价,执行器需要维护任务队列、waker 与 I/O 注册
Compile time / 编译时间Async code generates more complex types, slowing down compilation / Async 代码会生成更复杂的类型,从而拖慢编译
Debuggability / 可调试性Stack traces through state machines are harder to read (see Ch. 12) / 穿过状态机的调用栈更难读(见第 12 章)

Benchmarking guidance: If fewer than ~10 concurrent I/O operations, profile before committing to async. A simple std::thread::spawn per connection scales fine to hundreds of threads on modern Linux.

性能建议:如果并发 I/O 操作少于大约 10 个,在决定使用 async 之前先做性能分析。对于现代 Linux,简单地每个连接一个 std::thread::spawn,扩展到几百个线程通常也没有问题。

Exercise: When Would You Use Async? / 练习:你会在什么场景下使用 Async?

Exercise / 练习(点击展开)

For each scenario, decide whether async is appropriate and explain why:

请判断以下每个场景是否适合使用 async,并说明原因:

  1. A web server handling 10,000 concurrent WebSocket connections / 一个需要处理 10,000 个并发 WebSocket 连接的 Web 服务器
  2. A CLI tool that compresses a single large file / 一个压缩单个大文件的命令行工具
  3. A service that queries 5 different databases and merges results / 一个需要查询 5 个不同数据库并合并结果的服务
  4. A game engine running a physics simulation at 60 FPS / 一个以 60 FPS 运行物理模拟的游戏引擎
Solution / 参考答案
  1. Async - I/O-bound with massive concurrency. Each connection spends most time waiting for data. Threads would require 10K stacks.
    适合 Async:典型 I/O 密集型且并发极高。每个连接大多数时间都在等待数据,若用线程则需要 10K 个线程栈。
  2. Sync/threads - CPU-bound, single task. Async adds overhead with no benefit. Use rayon for parallel compression.
    适合同步或线程:这是 CPU 密集型单任务。Async 只会增加开销,没有收益。若要并行压缩,使用 rayon
  3. Async - Five concurrent I/O waits. tokio::join! runs all five queries simultaneously.
    适合 Async:这里有五个可并发等待的 I/O 操作,可以用 tokio::join! 同时发起并等待。
  4. Sync/threads - CPU-bound, latency-sensitive. Async’s cooperative scheduling could introduce frame jitter.
    适合同步或线程:这是 CPU 密集型且对延迟敏感的工作,协作式调度可能引入帧抖动。

Key Takeaways - Why Async is Different / 关键要点:为什么 Async 在 Rust 中不同

  • Rust futures are lazy - they do nothing until polled by an executor / Rust future 是惰性的,在被执行器 poll 之前什么都不会做
  • There is no built-in runtime - you choose (or build) your own / Rust 没有内建运行时,你需要自己选择(甚至自己实现)运行时
  • Async is a zero-cost compilation strategy that produces state machines / Async 是一种零成本编译策略,最终会生成状态机
  • Async shines for I/O-bound concurrency; for CPU-bound work, use threads or rayon / Async 擅长 I/O 并发;对于 CPU 密集型任务,请使用线程或 rayon

See also / 延伸阅读: Ch 2 - The Future Trait / 第 2 章:Future Trait, Ch 7 - Executors and Runtimes / 第 7 章:执行器与运行时


2. The Future Trait / 2. Future Trait 🟡

What you’ll learn / 你将学到:

  • The Future trait: Output, poll(), Context, Waker / Future trait 的组成:Outputpoll()ContextWaker
  • How a waker tells the executor “poll me again” / Waker 如何告知执行器“请再次轮询我”
  • The contract: never call wake() = your program silently hangs / 契约:如果不调用 wake(),程序就会静默挂起
  • Implementing a real future by hand (Delay) / 手动落实一个真实的 future(Delay

Anatomy of a Future / Future 的解剖

Everything in async Rust ultimately implements this trait:

Async Rust 中的一切最终都实现了这个 trait:

#![allow(unused)]
fn main() {
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),   // The future has completed with value T
    Pending,    // The future is not ready yet — call me back later
}
}

That’s it. A Future is anything that can be polled — asked “are you done yet?” — and responds with either “yes, here’s the result” or “not yet, I’ll wake you up when I’m ready.”

就这么简单。Future 是任何可以被 poll(轮询)的对象 —— 即被询问“你做完了吗?” —— 并回答“做完了,这是结果”或“还没呢,等我准备好了会叫醒你”。

Output, poll(), Context, Waker / Output、poll()、Context 与 Waker

sequenceDiagram
    participant E as Executor
    participant F as Future
    participant R as Resource (I/O)

    E->>F: poll(cx)
    F->>R: Check: is data ready?
    R-->>F: Not yet
    F->>R: Register waker from cx
    F-->>E: Poll::Pending

    Note over R: ... time passes, data arrives ...

    R->>E: waker.wake() — "I'm ready!"
    E->>F: poll(cx) — try again
    F->>R: Check: is data ready?
    R-->>F: Yes! Here's the data
    F-->>E: Poll::Ready(data)

Let’s break down each piece:

让我们分解每个部分:

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// A future that returns 42 immediately
struct Ready42;

impl Future for Ready42 {
    type Output = i32; // What the future eventually produces

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<i32> {
        Poll::Ready(42) // Always ready — no waiting
    }
}
}

The components / 组成部分:

  • Output — the type of value produced when the future completes / Future 完成时产生的值的类型
  • poll() — called by the executor to check progress; returns Ready(value) or Pending / 由执行器调用以检查进度;返回 Ready(value)Pending
  • Pin<&mut Self> — ensures the future won’t be moved in memory (we’ll cover why in Ch. 4) / 确保 future 不会在内存中被移动(我们将在第 4 章解释原因)
  • Context — carries the Waker so the future can signal the executor when it’s ready to make progress / 携带 Waker,以便 future 在准备好继续时通知执行器

The Waker Contract / Waker 契约

The Waker is the callback mechanism. When a future returns Pending, it must arrange for waker.wake() to be called later — otherwise the executor will never poll it again and the program hangs.

Waker 是一种回调机制。当 future 返回 Pending 时,它 必须 安排在之后调用 waker.wake() —— 否则执行器永远不会再次轮询它,程序就会挂起。

#![allow(unused)]
fn main() {
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

/// A future that completes after a delay (toy implementation)
struct Delay {
    completed: Arc<Mutex<bool>>,
    waker_stored: Arc<Mutex<Option<Waker>>>,
    duration: Duration,
    started: bool,
}

impl Delay {
    fn new(duration: Duration) -> Self {
        Delay {
            completed: Arc::new(Mutex::new(false)),
            waker_stored: Arc::new(Mutex::new(None)),
            duration,
            started: false,
        }
    }
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // Check if already completed
        if *self.completed.lock().unwrap() {
            return Poll::Ready(());
        }

        // Store the waker so the background thread can wake us
        *self.waker_stored.lock().unwrap() = Some(cx.waker().clone());

        // Start the background timer on first poll
        if !self.started {
            self.started = true;
            let completed = Arc::clone(&self.completed);
            let waker = Arc::clone(&self.waker_stored);
            let duration = self.duration;

            thread::spawn(move || {
                thread::sleep(duration);
                *completed.lock().unwrap() = true;

                // CRITICAL: wake the executor so it polls us again
                if let Some(w) = waker.lock().unwrap().take() {
                    w.wake(); // "Hey executor, I'm ready — poll me again!"
                }
            });
        }

        Poll::Pending // Not done yet
    }
}
}

Key insight: In C#, the TaskScheduler handles waking automatically. In Rust, you (or the I/O library you use) are responsible for calling waker.wake(). Forget it, and your program silently hangs.

关键洞察:在 C# 中,TaskScheduler 会自动处理唤醒。而在 Rust 中,(或者你使用的 I/O 库)负责调用 waker.wake()。如果忘了这一步,你的程序就会静默挂起。

Exercise: Implement a CountdownFuture / 练习:实现一个倒计时 Future

🏋️ Exercise / 练习(点击展开)

Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!").

挑战:实现一个 CountdownFuture,从 N 倒数到 0,每次被轮询时打印当前数值。当达到 0 时,返回 Ready("Liftoff!") 完成。

Hint: The future needs to store the current count and decrement it on each poll. Remember to always re-register the waker!

提示:Future 需要存储当前计数并在每次轮询时递减。记得一定要重新注册 waker!

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct CountdownFuture {
    count: u32,
}

impl CountdownFuture {
    fn new(start: u32) -> Self {
        CountdownFuture { count: start }
    }
}

impl Future for CountdownFuture {
    type Output = &'static str;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.count == 0 {
            println!("Liftoff!");
            Poll::Ready("Liftoff!")
        } else {
            println!("{}...", self.count);
            self.count -= 1;
            cx.waker().wake_by_ref(); // Schedule re-poll immediately
            Poll::Pending
        }
    }
}
}

Key takeaway: This future is polled once per count. Each time it returns Pending, it immediately wakes itself to be polled again. In production, you’d use a timer instead of busy-polling.

关键点:这个 future 每一跳会被轮询一次。每次返回 Pending 时,它都会立即唤醒自己以便再次被轮询。在生产环境中,你会使用定时器而不是这种忙碌轮询。

Key Takeaways — The Future Trait / 关键要点:Future Trait

  • Future::poll() returns Poll::Ready(value) or Poll::Pending / Future::poll() 返回 Poll::Ready(value)Poll::Pending
  • A future must register a Waker before returning Pending — the executor uses it to know when to re-poll / Future 在返回 Pending 之前必须注册一个 Waker —— 执行器通过它知道何时重新轮询
  • Pin<&mut Self> guarantees the future won’t be moved in memory (needed for self-referential state machines — see Ch 4) / Pin<&mut Self> 保证 future 不会在内存中移动(自引用状态机需要此特性 —— 见第 4 章)
  • Everything in async Rust — async fn, .await, combinators — is built on this one trait / Async Rust 中的一切 —— async fn.await、组合器 —— 都构建在这一 trait 之上

See also / 延伸阅读: Ch 3 — How Poll Works / 第 3 章:poll 的工作机制 for the executor loop, Ch 6 — Building Futures by Hand / 第 6 章:手写 Future for more complex implementations


3. How Poll Works / 3. poll 的工作机制 🟡

What you’ll learn / 你将学到:

  • The executor’s poll loop: poll → pending → wake → poll again / 执行器的轮询循环:poll → pending → wake → poll again
  • How to build a minimal executor from scratch / 如何从零构建一个最小执行器
  • Spurious wake rules and why they matter / 虚假唤醒规则及其重要性
  • Utility functions: poll_fn() and yield_now() / 实用函数:poll_fn()yield_now()

The Polling State Machine / 轮询状态机

The executor runs a loop: poll a future, if it’s Pending, park it until its waker fires, then poll again. This is fundamentally different from OS threads where the kernel handles scheduling.

执行器运行一个循环:轮询一个 future,如果返回 Pending,就将其挂起,直到其 waker 被触发,然后再次轮询。这与操作系统线程有本质不同,后者由内核处理调度。

stateDiagram-v2
    [*] --> Idle : Future created
    Idle --> Polling : executor calls poll()
    Polling --> Complete : Ready(value)
    Polling --> Waiting : Pending
    Waiting --> Polling : waker.wake() called
    Complete --> [*] : Value returned

Important / 重要提示: While in the Waiting state the future must have registered the waker with an I/O source. No registration = hang forever.

当处于 Waiting(等待)状态时,future 必须 已经向某个 I/O 源注册了 waker。如果没有注册,程序将永远挂起。

A Minimal Executor / 一个最小执行器

To demystify executors, let’s build the simplest possible one:

为了揭开执行器的神秘面纱,让我们构建一个最简单的执行器:

use std::future::Future;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::pin::Pin;

/// The simplest possible executor: busy-loop poll until Ready
fn block_on<F: Future>(mut future: F) -> F::Output {
    // Pin the future on the stack
    // SAFETY: `future` is never moved after this point — we only
    // access it through the pinned reference until it completes.
    let mut future = unsafe { Pin::new_unchecked(&mut future) };

    // Create a no-op waker (just keeps polling — inefficient but simple)
    fn noop_raw_waker() -> RawWaker {
        fn no_op(_: *const ()) {}
        fn clone(_: *const ()) -> RawWaker { noop_raw_waker() }
        let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
        RawWaker::new(std::ptr::null(), vtable)
    }

    // SAFETY: noop_raw_waker() returns a valid RawWaker with a correct vtable.
    let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
    let mut cx = Context::from_waker(&waker);

    // Busy-loop until the future completes
    loop {
        match future.as_mut().poll(&mut cx) {
            Poll::Ready(value) => return value,
            Poll::Pending => {
                // A real executor would park the thread here
                // and wait for waker.wake() — we just spin
                std::thread::yield_now();
            }
        }
    }
}

// Usage:
fn main() {
    let result = block_on(async {
        println!("Hello from our mini executor!");
        42
    });
    println!("Got: {result}");
}

Don’t use this in production! It busy-loops, wasting CPU. Real executors (tokio, smol) use epoll/kqueue/io_uring to sleep until I/O is ready. But this shows the core idea: an executor is just a loop that calls poll().

不要在生产环境中使用它! 它由于忙碌循环(busy-loop)会浪费 CPU。真实的执行器(如 tokio、smol)会使用 epoll/kqueue/io_uring 在 I/O 就绪前保持休眠。但这个例子展示了核心思想:执行器本质上就是一个调用 poll() 的循环。

Wake-Up Notifications / 唤醒通知

A real executor is event-driven. When all futures are Pending, the executor sleeps. The waker is an interrupt mechanism:

真实的执行器是事件驱动的。当所有 future 都处于 Pending 状态时,执行器会进入休眠。Waker 则是一种中断机制:

#![allow(unused)]
fn main() {
// Conceptual model of a real executor's main loop:
fn executor_loop(tasks: &mut TaskQueue) {
    loop {
        // 1. Poll all tasks that have been woken
        while let Some(task) = tasks.get_woken_task() {
            match task.poll() {
                Poll::Ready(result) => task.complete(result),
                Poll::Pending => { /* task stays in queue, waiting for wake */ }
            }
        }

        // 2. Sleep until something wakes us up (epoll_wait, kevent, etc.)
        //    This is where mio/polling does the heavy lifting
        tasks.wait_for_events(); // blocks until an I/O event or waker fires
    }
}
}

Spurious Wakes / 虚假唤醒

A future may be polled even when its I/O isn’t ready. This is called a spurious wake. Futures must handle this correctly:

即使 I/O 尚未就绪,future 也可能会被轮询。这被称为 spurious wake(虚假唤醒)。Future 必须正确处理这种情况:

#![allow(unused)]
fn main() {
impl Future for MyFuture {
    type Output = Data;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Data> {
        // ✅ CORRECT: Always re-check the actual condition
        if let Some(data) = self.try_read_data() {
            Poll::Ready(data)
        } else {
            // Re-register the waker (it might have changed!)
            self.register_waker(cx.waker());
            Poll::Pending
        }

        // ❌ WRONG: Assuming poll means data is ready
        // let data = self.read_data(); // might block or panic
        // Poll::Ready(data)
    }
}
}

Rules for implementing poll() / 实现 poll() 的规则:

  1. Never block — return Pending immediately if not ready / 绝不阻塞 —— 如果未就绪,立即返回 Pending
  2. Always re-register the waker — it may have changed between polls / 始终重新注册 waker —— 它在轮询之间可能会发生变化
  3. Handle spurious wakes — check the actual condition, don’t assume readiness / 处理虚假唤醒 —— 检查实际条件,不要假设已就绪
  4. Don’t poll after Ready — behavior is unspecified (may panic, return Pending, or repeat Ready). Only FusedFuture guarantees safe post-completion polling / 不要在 Ready 之后继续轮询 —— 其行为是未定义的(可能会 panic、返回 Pending 或重复 Ready)。只有 FusedFuture 保证在完成后轮询是安全的。
🏋️ Exercise: Implement a CountdownFuture / 练习:实现一个倒计时 Future (点击展开)

Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count as a side-effect each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!"). (Note: a Future produces only one final value — the printing is a side-effect, not a yielded value. For multiple async values, see Stream in Ch. 11.)

挑战:实现一个 CountdownFuture,从 N 倒数到 0,并在每次轮询时通过副作用 打印 当前计数。当达到 0 时,返回 Ready("Liftoff!") 完成。(注:一个 Future 只产生 一个 最终值 —— 打印是副作用,而不是产出的值。关于多个异步值,请参见第 11 章中的 Stream。)

Hint: This doesn’t need a real I/O source — it can wake itself immediately with cx.waker().wake_by_ref() after each decrement.

提示:这不需要真实的 I/O 源 —— 它可以每次递减后使用 cx.waker().wake_by_ref() 立即唤醒自己。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct CountdownFuture {
    count: u32,
}

impl CountdownFuture {
    fn new(start: u32) -> Self {
        CountdownFuture { count: start }
    }
}

impl Future for CountdownFuture {
    type Output = &'static str;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.count == 0 {
            Poll::Ready("Liftoff!")
        } else {
            println!("{}...", self.count);
            self.count -= 1;
            // Wake immediately — we're always ready to make progress
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

// Usage with our mini executor or tokio:
// let msg = block_on(CountdownFuture::new(5));
// prints: 5... 4... 3... 2... 1...
// msg == "Liftoff!"
}

Key takeaway: Even though this future is always ready to progress, it returns Pending to yield control between steps. It calls wake_by_ref() immediately so the executor re-polls it right away. This is the basis of cooperative multitasking — each future voluntarily yields.

关键点:尽管这个 future 总是可以继续推进,但它仍返回 Pending 以便在步骤之间转让控制权。它立即调用 wake_by_ref(),因此执行器会马上再次轮询它。这是协作式多任务的基础 —— 每个 future 都会主动让出执行权。

Handy Utilities: poll_fn and yield_now / 实用工具:poll_fnyield_now

Two utilities from the standard library and tokio that avoid writing full Future impls:

来自标准库和 tokio 的两个实用工具,可以避免编写完整的 Future 实现:

#![allow(unused)]
fn main() {
use std::future::poll_fn;
use std::task::Poll;

// poll_fn: create a one-off future from a closure
// poll_fn: 从闭包创建一个一次性的 future
let value = poll_fn(|cx| {
    // Do something with cx.waker(), return Ready or Pending
    Poll::Ready(42)
}).await;

// Real-world use: bridge a callback-based API into async
// 实际用途:将基于回调的 API 桥接到 async
async fn read_when_ready(source: &MySource) -> Data {
    poll_fn(|cx| source.poll_read(cx)).await
}
}
#![allow(unused)]
fn main() {
// yield_now: voluntarily yield control to the executor
// Useful in CPU-heavy async loops to avoid starving other tasks
// yield_now: 主动向执行器让出控制权
// 在计算密集型的异步循环中非常有用,可以避免其他任务被“饿死”
async fn cpu_heavy_work(items: &[Item]) {
    for (i, item) in items.iter().enumerate() {
        process(item); // CPU work

        // Every 100 items, yield to let other tasks run
        // 每处理 100 个条目,让出执行权给其他任务
        if i % 100 == 0 {
            tokio::task::yield_now().await;
        }
    }
}
}

When to use yield_now(): If your async function does CPU work in a loop without any .await points, it monopolizes the executor thread. Insert yield_now().await periodically to enable cooperative multitasking.

何时使用 yield_now():如果你的异步函数在循环中执行 CPU 操作且没有任何 .await 点,它将独占执行器线程。定期插入 yield_now().await 可以启用协作式多任务。

Key Takeaways — How Poll Works / 关键要点:poll 的工作机制

  • An executor repeatedly calls poll() on futures that have been woken / 执行器会对被唤醒的 future 反复调用 poll()
  • Futures must handle spurious wakes — always re-check the actual condition / Future 必须处理虚假唤醒 —— 始终重新检查实际条件
  • poll_fn() lets you create ad-hoc futures from closures / poll_fn() 允许你从闭包创建临时的 future
  • yield_now() is a cooperative scheduling escape hatch for CPU-heavy async code / yield_now() 是计算密集型异步代码的协作式调度“逃生口”

See also / 延伸阅读: Ch 2 — The Future Trait / 第 2 章:Future Trait for the trait definition, Ch 5 — The State Machine Reveal / 第 5 章:状态机真相 for what the compiler generates


4. Pin and Unpin / 4. PinUnpin 🔴

What you’ll learn / 你将学到:

  • Why self-referential structs break when moved in memory / 为什么自引用结构体在内存中移动时会崩溃
  • What Pin<P> guarantees and how it prevents moves / Pin<P> 保证了什么,以及它是如何防止移动的
  • The three practical pinning patterns: Box::pin(), tokio::pin!(), Pin::new() / 三种实用的 Pin 模式:Box::pin()tokio::pin!()Pin::new()
  • When Unpin gives you an escape hatch / 什么时候 Unpin 可以作为“逃生口”

Why Pin Exists / 为什么需要 Pin

This is the most confusing concept in async Rust. Let’s build the intuition step by step.

这是 async Rust 中最令人困惑的概念。让我们循序渐进地建立直觉。

The Problem: Self-Referential Structs / 问题所在:自引用结构体

When the compiler transforms an async fn into a state machine, that state machine may contain references to its own fields. This creates a self-referential struct — and moving it in memory would invalidate those internal references.

当编译器将 async fn 转换为状态机时,该状态机可能包含对其自身字段的引用。这创建了一个 self-referential struct(自引用结构体)—— 在内存中移动它会导致这些内部引用失效。

#![allow(unused)]
fn main() {
// What the compiler generates (simplified) for:
// async fn example() {
//     let data = vec![1, 2, 3];
//     let reference = &data;       // Points to data above
//     use_ref(reference).await;
// }

// Becomes something like:
enum ExampleStateMachine {
    State0 {
        data: Vec<i32>,
        // reference: &Vec<i32>,  // PROBLEM: points to `data` above
        //                        // If this struct moves, the pointer is dangling!
    },
    State1 {
        data: Vec<i32>,
        reference: *const Vec<i32>, // Internal pointer to data field
    },
    Complete,
}
}
graph LR
    subgraph "Before Move (Valid)"
        A["data: [1,2,3]<br/>at addr 0x1000"]
        B["reference: 0x1000<br/>(points to data)"]
        B -->|"valid"| A
    end

    subgraph "After Move (INVALID)"
        C["data: [1,2,3]<br/>at addr 0x2000"]
        D["reference: 0x1000<br/>(still points to OLD location!)"]
        D -->|"dangling!"| E["💥 0x1000<br/>(freed/garbage)"]
    end

    style E fill:#ffcdd2,color:#000
    style D fill:#ffcdd2,color:#000
    style B fill:#c8e6c9,color:#000

Self-Referential Structs / 自引用结构体

This isn’t an academic concern. Every async fn that holds a reference across an .await point creates a self-referential state machine:

这不仅仅是一个理论问题。每一个跨越 .await 点持有引用的 async fn 都会创建一个自引用的状态机:

#![allow(unused)]
fn main() {
async fn problematic() {
    let data = String::from("hello");
    let slice = &data[..]; // slice borrows data
    
    some_io().await; // <-- .await point: state machine stores both data AND slice
    
    println!("{slice}"); // uses the reference after await
}
// The generated state machine has `data: String` and `slice: &str`
// where slice points INTO data. Moving the state machine = dangling pointer.
// 生成的状态机包含 `data: String` 和 `slice: &str`,其中 slice 指向 data 内部。
// 移动该状态机将导致指针悬空。
}

Pin in Practice / Pin 的实践

Pin<P> is a wrapper that prevents moving the value behind the pointer:

Pin<P> 是一个包装器,用于防止移动指针所指向的值:

#![allow(unused)]
fn main() {
use std::pin::Pin;

let mut data = String::from("hello");

// Pin it — now it can't be moved
// 固定它 —— 现在它不能被移动了
let pinned: Pin<&mut String> = Pin::new(&mut data);

// Can still use it:
println!("{}", pinned.as_ref().get_ref()); // "hello"

// But we can't get &mut String back (which would allow mem::swap):
// 但我们无法拿回 &mut String(那将允许 mem::swap):
// let mutable: &mut String = Pin::into_inner(pinned); // Only if String: Unpin
// String IS Unpin, so this actually works for String.
// But for self-referential state machines (which are !Unpin), it's blocked.
}

In real code, you mostly encounter Pin in three places:

在实际代码中,你主要在三个地方遇到 Pin:

#![allow(unused)]
fn main() {
// 1. poll() signature — all futures are polled through Pin
// 1. poll() 签名 —— 所有 future 都是通过 Pin 进行轮询的
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output>;

// 2. Box::pin() — heap-allocate and pin a future
// 2. Box::pin() —— 在堆上分配并固定一个 future
let future: Pin<Box<dyn Future<Output = i32>>> = Box::pin(async { 42 });

// 3. tokio::pin!() — pin a future on the stack
// 3. tokio::pin!() —— 在栈上固定一个 future
tokio::pin!(my_future);
// Now my_future: Pin<&mut impl Future>
}

The Unpin Escape Hatch / Unpin 逃生口

Most types in Rust are Unpin — they don’t contain self-references, so pinning is a no-op. Only compiler-generated state machines (from async fn) are !Unpin.

Rust 中的大多数类型都是 Unpin —— 它们不包含自引用,因此固定(pinning)操作对它们没有实际影响。只有编译器生成的(来自 async fn)状态机是 !Unpin

#![allow(unused)]
fn main() {
// These are all Unpin — pinning them does nothing special:
// 这些都是 Unpin —— 固定它们没有什么特别之处:
// i32, String, Vec<T>, HashMap<K,V>, Box<T>, &T, &mut T

// These are !Unpin — they MUST be pinned before polling:
// 这些是 !Unpin —— 它们在轮询之前必须被固定:
// The state machines generated by `async fn` and `async {}`

// Practical implication:
// If you write a Future by hand and it has NO self-references,
// implement Unpin to make it easier to work with:
// 实际影响:如果你手写一个 Future 且它没有自引用,请实现 Unpin 以方便使用:
impl Unpin for MySimpleFuture {} // "I'm safe to move, trust me"
}

Quick Reference / 快速参考

What / 内容When / 场景How / 方式
Pin a future on the heap / 在堆上固定 futureStoring in a collection, returning from function / 存储在集合中、从函数返回Box::pin(future)
Pin a future on the stack / 在栈上固定 futureLocal use in select! or manual polling / 在 select! 中局部使用或手动轮询tokio::pin!(future) or pin_mut! from pin-utils
Pin in function signature / 函数签名中的 PinAccepting pinned futures / 接收已固定的 futurefuture: Pin<&mut F>
Require Unpin / 要求 UnpinWhen you need to move a future after creation / 需要在创建后移动 future 时F: Future + Unpin
🏋️ Exercise: Pin and Move / 练习:Pin 与移动 (点击展开)

Challenge: Which of these code snippets compile? For each one that doesn’t, explain why and fix it.

挑战:以下哪些代码片段可以编译?对于不能编译的,解释原因并修复它。

#![allow(unused)]
fn main() {
// Snippet A
let fut = async { 42 };
let pinned = Box::pin(fut);
let moved = pinned; // Move the Box
let result = moved.await;

// Snippet B
let fut = async { 42 };
tokio::pin!(fut);
let moved = fut; // Move the pinned future
let result = moved.await;

// Snippet C
use std::pin::Pin;
let mut fut = async { 42 };
let pinned = Pin::new(&mut fut);
}
🔑 Solution / 参考答案

Snippet A: ✅ Compiles / 可编译。 Box::pin() puts the future on the heap. Moving the Box moves the pointer, not the future itself. The future stays pinned in its heap location. Box::pin() 将 future 放在堆上。移动 Box 只是移动了 指针,而不是 future 本身。Future 在其堆位置保持固定。

Snippet B: ❌ Does not compile / 不可编译。 tokio::pin! pins the future to the stack and rebinds fut as Pin<&mut ...>. You can’t move out of a pinned reference. Fix: Don’t move it — use it in place: tokio::pin! 将 future 固定在栈上,并将 fut 重新绑定为 Pin<&mut ...>。你不能从固定引用中移出。修复方案:不要移动它 —— 就地使用:

#![allow(unused)]
fn main() {
let fut = async { 42 };
tokio::pin!(fut);
let result = fut.await; // Use directly, don't reassign
}

Snippet C: ❌ Does not compile / 不可编译。 Pin::new() requires T: Unpin. Async blocks generate !Unpin types. Fix: Use Box::pin() or unsafe Pin::new_unchecked(): Pin::new() 要求 T: Unpin。异步块生成的是 !Unpin 类型。修复方案:使用 Box::pin()unsafe Pin::new_unchecked()

#![allow(unused)]
fn main() {
let fut = async { 42 };
let pinned = Box::pin(fut); // Heap-pin — works with !Unpin
}

Key takeaway: Box::pin() is the safe, easy way to pin !Unpin futures. tokio::pin!() pins on the stack but the future can’t be moved after. Pin::new() only works with Unpin types. 关键点Box::pin() 是固定 !Unpin future 的安全简便方法。tokio::pin!() 在栈上固定,但之后 future 就不能再移动。Pin::new() 仅适用于 Unpin 类型。

Key Takeaways — Pin and Unpin / 关键要点:Pin 与 Unpin

  • Pin<P> is a wrapper that prevents the pointee from being moved — essential for self-referential state machines / Pin<P> 是一个包装器,防止所指对象被移动 —— 这对于自引用状态机至关重要
  • Box::pin() is the safe, easy default for pinning futures on the heap / Box::pin() 是在堆上固定 future 的安全简便默认选择
  • tokio::pin!() pins on the stack — cheaper but the future can’t be moved afterward / tokio::pin!() 在栈上固定 —— 开销更小,但之后 future 无法移动
  • Unpin is an auto-trait opt-out: types that implement Unpin can be moved even when pinned (most types are Unpin; async blocks are not) / Unpin 是一个自动 trait 选择退出:实现了 Unpin 的类型即使被固定也可以移动(大多数类型都是 Unpin;异步块则不是)

See also / 延伸阅读: Ch 2 — The Future Trait / 第 2 章:Future Trait for Pin<&mut Self> in poll, Ch 5 — The State Machine Reveal / 第 5 章:状态机真相 for why async state machines are self-referential


5. The State Machine Reveal / 5. 状态机真相 🟢

What you’ll learn / 你将学到:

  • How the compiler transforms async fn into an enum state machine / 编译器如何将 async fn 转换为枚举状态机
  • Side-by-side comparison: source code vs generated states / 源码与生成的各状态之间的对比
  • Why large stack allocations in async fn blow up future sizes / 为什么 async fn 中巨大的栈分配会导致 future 体积膨胀
  • The drop optimization: values drop as soon as they’re no longer needed / Drop 优化:不再需要的值会立即被释放

What the Compiler Actually Generates / 编译器究竟生成了什么

When you write async fn, the compiler transforms your sequential-looking code into an enum-based state machine. Understanding this transformation is the key to understanding async Rust’s performance characteristics and many of its quirks.

当你编写 async fn 时,编译器会将你看起来像是顺序执行的代码转换为基于枚举的状态机。理解这一转换过程是掌握 async Rust 性能特性及其许多“怪癖”的关键。

Side-by-Side: async fn vs State Machine / 对比:async fn 与状态机

#![allow(unused)]
fn main() {
// What you write:
// 你写的代码:
async fn fetch_two_pages() -> String {
    let page1 = http_get("https://example.com/a").await;
    let page2 = http_get("https://example.com/b").await;
    format!("{page1}\n{page2}")
}
}

The compiler generates something conceptually like this:

编译器会生成概念上类似于以下的代码:

#![allow(unused)]
fn main() {
enum FetchTwoPagesStateMachine {
    // State 0: About to call http_get for page1
    // 状态 0:准备为 page1 调用 http_get
    Start,

    // State 1: Waiting for page1, holding the future
    // 状态 1:等待 page1,持有相应的 future
    WaitingPage1 {
        fut1: HttpGetFuture,
    },

    // State 2: Got page1, waiting for page2
    // 状态 2:拿到 page1,等待 page2
    WaitingPage2 {
        page1: String,
        fut2: HttpGetFuture,
    },

    // Terminal state
    // 终止状态
    Complete,
}

impl Future for FetchTwoPagesStateMachine {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
        loop {
            match self.as_mut().get_mut() {
                Self::Start => {
                    let fut1 = http_get("https://example.com/a");
                    *self.as_mut().get_mut() = Self::WaitingPage1 { fut1 };
                }
                Self::WaitingPage1 { fut1 } => {
                    let page1 = match Pin::new(fut1).poll(cx) {
                        Poll::Ready(v) => v,
                        Poll::Pending => return Poll::Pending,
                    };
                    let fut2 = http_get("https://example.com/b");
                    *self.as_mut().get_mut() = Self::WaitingPage2 { page1, fut2 };
                }
                Self::WaitingPage2 { page1, fut2 } => {
                    let page2 = match Pin::new(fut2).poll(cx) {
                        Poll::Ready(v) => v,
                        Poll::Pending => return Poll::Pending,
                    };
                    let result = format!("{page1}\n{page2}");
                    *self.as_mut().get_mut() = Self::Complete;
                    return Poll::Ready(result);
                }
                Self::Complete => panic!("polled after completion"),
            }
        }
    }
}
}

Note: This desugaring is conceptual. The real compiler output uses unsafe pin projections — the get_mut() calls shown here require Unpin, but async state machines are !Unpin. The goal is to illustrate state transitions, not produce compilable code.

注意:这种语法糖还原(desugaring)是 概念性 的。编译器实际生成的代码使用 unsafe 的 pin 投影 —— 这里显示的 get_mut() 调用要求 Unpin,但异步状态机是 !Unpin 的。这里的目的是演示状态转换,而不是生成可编译的代码。

stateDiagram-v2
    [*] --> Start
    Start --> WaitingPage1: Create http_get future #1
    WaitingPage1 --> WaitingPage1: poll() → Pending
    WaitingPage1 --> WaitingPage2: poll() → Ready(page1)
    WaitingPage2 --> WaitingPage2: poll() → Pending
    WaitingPage2 --> Complete: poll() → Ready(page2)
    Complete --> [*]: Return format!("{page1}\\n{page2}")

State contents / 状态内容:

  • WaitingPage1 — stores fut1: HttpGetFuture (page2 not yet allocated) / 存储 fut1: HttpGetFuture(page2 尚未分配)
  • WaitingPage2 — stores page1: String, fut2: HttpGetFuture (fut1 has been dropped) / 存储 page1: Stringfut2: HttpGetFuture(fut1 已被释放)

Why This Matters for Performance / 为什么这对性能很重要

Zero-cost / 零成本: The state machine is a stack-allocated enum. No heap allocation per future, no garbage collector, no boxing — unless you explicitly use Box::pin().

零成本:状态机是一个分配在栈上的枚举。每个 future 都没有堆分配,没有垃圾回收,没有 boxing —— 除非你显式使用 Box::pin()

Size / 尺寸: The enum’s size is the maximum of all its variants. Each .await point creates a new variant. This means:

尺寸:枚举的大小是其所有变体中的最大值。每个 .await 点都会创建一个新的变体。这意味着:

#![allow(unused)]
fn main() {
async fn small() {
    let a: u8 = 0;
    yield_now().await;
    let b: u8 = 0;
    yield_now().await;
}
// Size ≈ max(size_of(u8), size_of(u8)) + discriminant + future sizes
//      ≈ small!
// 尺寸 ≈ 变体最大值 + 判别码 + 内部 future 大小,依然很小!

async fn big() {
    let buf: [u8; 1_000_000] = [0; 1_000_000]; // 1MB on the stack!
    some_io().await;
    process(&buf);
}
// Size ≈ 1MB + inner future sizes
// ⚠️ Don't stack-allocate huge buffers in async functions!
// Use Vec<u8> or Box<[u8]> instead.
// ⚠️ 不要在异步函数中在栈上分配巨大的缓冲区!请改用 Vec<u8> 或 Box<[u8]>。
}

Drop optimization / Drop 优化: When a state machine transitions, it drops values no longer needed. In the example above, fut1 is dropped when we transition from WaitingPage1 to WaitingPage2 — the compiler inserts the drop automatically.

Drop 优化:当状态机发生迁移时,它会释放(drop)不再需要的值。在上面的例子中,当我们从 WaitingPage1 迁移到 WaitingPage2 时,fut1 会被释放 —— 编译器会自动插入释放操作。

Practical rule: Large stack allocations in async fn blow up the future’s size. If you see stack overflows in async code, check for large arrays or deeply nested futures. Use Box::pin() to heap-allocate sub-futures if needed.

实践法则:在 async fn 中进行巨大的栈分配会使 future 的体积飙升。如果你在异步代码中遇到栈溢出,请检查是否有大数组或深度嵌套的 future。必要时使用 Box::pin() 来堆分配子 future。

Exercise: Predict the State Machine / 练习:预测状态机

🏋️ Exercise / 练习(点击展开)

Challenge: Given this async function, sketch the state machine the compiler generates. How many states (enum variants) does it have? What values are stored in each?

挑战:给定这个异步函数,勾勒出编译器生成的状态机。它有多少个状态(枚举变体)?每个状态中存储了什么值?

#![allow(unused)]
fn main() {
async fn pipeline(url: &str) -> Result<usize, Error> {
    let response = fetch(url).await?;
    let body = response.text().await?;
    let parsed = parse(body).await?;
    Ok(parsed.len())
}
}
🔑 Solution / 参考答案

Four states:

五个状态:

  1. Start — stores url / Start —— 存储 url
  2. WaitingFetch — stores url, fetch future / WaitingFetch —— 存储 urlfetch 的 future
  3. WaitingText — stores response, text() future / WaitingText —— 存储 responsetext() 的 future
  4. WaitingParse — stores body, parse future / WaitingParse —— 存储 bodyparse 的 future
  5. Done — returned Ok(parsed.len()) / Done —— 返回了 Ok(parsed.len())

Each .await creates a yield point = a new enum variant. The ? adds early-exit paths but doesn’t add extra states — it’s just a match on the Poll::Ready value.

每个 .await 都会创建一个 yield 点,即一个新的枚举变体。? 增加了提前退出的路径,但并不会增加额外的状态 —— 它仅仅是对 Poll::Ready 值的一个 match 操作。

Key Takeaways — The State Machine Reveal / 关键要点:状态机真相

  • async fn compiles to an enum with one variant per .await point / async fn 会被编译为一个枚举,每个 .await 点对应一个变体
  • The future’s size = max of all variant sizes — large stack values blow it up / Future 的尺寸 = 所有变体尺寸的最大值 —— 巨大的栈分配会使其剧增
  • The compiler inserts drops at state transitions automatically / 编译器在状态转换时会自动插入 drop 操作
  • Use Box::pin() or heap allocation when future size becomes a problem / 当 future 尺寸成为问题时,请使用 Box::pin() 或堆分配

See also / 延伸阅读: Ch 4 — Pin and Unpin / 第 4 章:Pin 与 Unpin for why the generated enum needs pinning, Ch 6 — Building Futures by Hand / 第 6 章:手动构建 Future to build these state machines yourself


6. Building Futures by Hand / 6. 手写 Future 🟡

What you’ll learn / 你将学到:

  • Implementing a TimerFuture with thread-based waking / 基于线程唤醒实现 TimerFuture
  • Building a Join combinator: run two futures concurrently / 构建 Join 组合器:并发运行两个 future
  • Building a Select combinator: race two futures / 构建 Select 组合器:让两个 future 竞速
  • How combinators compose — futures all the way down / 组合器如何嵌套 —— 万物皆可 Future

A Simple Timer Future / 一个简单的定时器 Future

Now let’s build real, useful futures from scratch. This cements the theory from chapters 2-5.

现在让我们从零开始构建真实且有用的 future。这将巩固第 2 至第 5 章中的理论。

TimerFuture: A Complete Example / TimerFuture:完整示例

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn a thread that sets completed=true after the duration
        // 派生一个线程,在指定时长后设置 completed=true
        let thread_shared_state = Arc::clone(&shared_state);
        thread::spawn(move || {
            thread::sleep(duration);
            let mut state = thread_shared_state.lock().unwrap();
            state.completed = true;
            if let Some(waker) = state.waker.take() {
                waker.wake(); // Notify the executor
                              // 通知执行器
            }
        });

        TimerFuture { shared_state }
    }
}

impl Future for TimerFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let mut state = self.shared_state.lock().unwrap();
        if state.completed {
            Poll::Ready(())
        } else {
            // Store the waker so the timer thread can wake us
            // IMPORTANT: Always update the waker — the executor may
            // have changed it between polls
            // 存储 waker 以便计时器线程可以唤醒我们
            // 重要:一定要更新 waker —— 执行器在两次轮询之间可能会更改它
            state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

// Usage:
// async fn example() {
//     println!("Starting timer...");
//     TimerFuture::new(Duration::from_secs(2)).await;
//     println!("Timer done!");
// }
//
// ⚠️ This spawns an OS thread per timer — fine for learning, but in
// production use `tokio::time::sleep` which is backed by a shared
// timer wheel and requires zero extra threads.
// ⚠️ 这种做法会为每个计时器派生一个操作系统线程 —— 仅供学习。
// 在生产环境中请使用 `tokio::time::sleep`,它由共享的时间轮驱动,不需要额外线程。
}

Join: Running Two Futures Concurrently / Join:并发运行两个 Future

Join polls two futures and completes when both finish. This is how tokio::join! works internally:

Join 会轮询两个 future,并在 两者 都完成后才算完成。这正是 tokio::join! 内部的工作原理:

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Polls two futures concurrently, returns both results as a tuple
/// 并发轮询两个 future,并以元组形式返回两个结果
pub struct Join<A, B>
where
    A: Future,
    B: Future,
{
    a: MaybeDone<A>,
    b: MaybeDone<B>,
}

enum MaybeDone<F: Future> {
    Pending(F),
    Done(F::Output),
    Taken, // Output has been taken
}

impl<A, B> Join<A, B>
where
    A: Future,
    B: Future,
{
    pub fn new(a: A, b: B) -> Self {
        Join {
            a: MaybeDone::Pending(a),
            b: MaybeDone::Pending(b),
        }
    }
}

impl<A, B> Future for Join<A, B>
where
    A: Future + Unpin,
    B: Future + Unpin,
{
    type Output = (A::Output, B::Output);

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll A if not done
        if let MaybeDone::Pending(ref mut fut) = self.a {
            if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
                self.a = MaybeDone::Done(val);
            }
        }

        // Poll B if not done
        if let MaybeDone::Pending(ref mut fut) = self.b {
            if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
                self.b = MaybeDone::Done(val);
            }
        }

        // Both done?
        match (&self.a, &self.b) {
            (MaybeDone::Done(_), MaybeDone::Done(_)) => {
                // Take both outputs
                let a_val = match std::mem::replace(&mut self.a, MaybeDone::Taken) {
                    MaybeDone::Done(v) => v,
                    _ => unreachable!(),
                };
                let b_val = match std::mem::replace(&mut self.b, MaybeDone::Taken) {
                    MaybeDone::Done(v) => v,
                    _ => unreachable!(),
                };
                Poll::Ready((a_val, b_val))
            }
            _ => Poll::Pending, // At least one is still pending
        }
    }
}

// Usage:
// let (page1, page2) = Join::new(
//     http_get("https://example.com/a"),
//     http_get("https://example.com/b"),
// ).await;
// Both requests run concurrently!
}

Key insight: “Concurrent” here means interleaved on the same thread. Join doesn’t spawn threads — it polls both futures in the same poll() call. This is cooperative concurrency, not parallelism.

关键洞察:这里的“并发”是指 在同一线程上交替执行Join 并不会派生新线程 —— 它在同一次 poll() 调用中轮询两个 future。这是协作式并发,而不是并行。

graph LR
    subgraph "Future Combinators"
        direction TB
        TIMER["TimerFuture<br/>Single future, wake after delay"]
        JOIN["Join&lt;A, B&gt;<br/>Wait for BOTH"]
        SELECT["Select&lt;A, B&gt;<br/>Wait for FIRST"]
        RETRY["RetryFuture<br/>Re-create on failure"]
    end

    TIMER --> JOIN
    TIMER --> SELECT
    SELECT --> RETRY

    style TIMER fill:#d4efdf,stroke:#27ae60,color:#000
    style JOIN fill:#e8f4f8,stroke:#2980b9,color:#000
    style SELECT fill:#fef9e7,stroke:#f39c12,color:#000
    style RETRY fill:#fadbd8,stroke:#e74c3c,color:#000

Select: Racing Two Futures / Select:让两个 Future 竞速

Select completes when either future finishes first (the other is dropped):

Select其中任意一个 future 先完成时即告完成(另一个会被丢弃):

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub enum Either<A, B> {
    Left(A),
    Right(B),
}

/// Returns whichever future completes first; drops the other
/// 返回最先完成的那个 future,并丢弃另一个
pub struct Select<A, B> {
    a: A,
    b: B,
}

impl<A, B> Select<A, B>
where
    A: Future + Unpin,
    B: Future + Unpin,
{
    pub fn new(a: A, b: B) -> Self {
        Select { a, b }
    }
}

impl<A, B> Future for Select<A, B>
where
    A: Future + Unpin,
    B: Future + Unpin,
{
    type Output = Either<A::Output, B::Output>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll A first
        if let Poll::Ready(val) = Pin::new(&mut self.a).poll(cx) {
            return Poll::Ready(Either::Left(val));
        }

        // Then poll B
        if let Poll::Ready(val) = Pin::new(&mut self.b).poll(cx) {
            return Poll::Ready(Either::Right(val));
        }

        Poll::Pending
    }
}
}

Fairness note: Our Select always polls A first — if both are ready, A always wins. Tokio’s select! macro randomizes the poll order for fairness.

公平性说明:我们的 Select 总是先轮询 A —— 如果两者都就绪,A 总是获胜。Tokio 的 select! 宏会随机化轮询顺序以确保公平。

🏋️ Exercise: Build a RetryFuture / 练习:构建一个 RetryFuture (点击展开)

Challenge: Build a RetryFuture<F, Fut> that takes a closure F: Fn() -> Fut and retries up to N times if the inner future returns Err. It should return the first Ok result or the last Err.

挑战:构建一个 RetryFuture<F, Fut>,它接收一个闭包 F: Fn() -> Fut,并在内部 future 返回 Err 时重试最多 N 次。它应该返回第一个 Ok 结果或最后一个 Err

Hint: You’ll need states for “running attempt” and “all attempts exhausted.”

提示:你需要“运行尝试中”和“尝试已耗尽”的状态。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>> + Unpin,
{
    factory: F,
    current: Option<Fut>,
    remaining: usize,
    last_error: Option<E>,
}

impl<F, Fut, T, E> RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>> + Unpin,
{
    pub fn new(max_attempts: usize, factory: F) -> Self {
        let current = Some((factory)());
        RetryFuture {
            factory,
            current,
            remaining: max_attempts.saturating_sub(1),
            last_error: None,
        }
    }
}

impl<F, Fut, T, E> Future for RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut + Unpin,
    Fut: Future<Output = Result<T, E>> + Unpin,
    T: Unpin,
    E: Unpin,
{
    type Output = Result<T, E>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            if let Some(ref mut fut) = self.current {
                match Pin::new(fut).poll(cx) {
                    Poll::Ready(Ok(val)) => return Poll::Ready(Ok(val)),
                    Poll::Ready(Err(e)) => {
                        self.last_error = Some(e);
                        if self.remaining > 0 {
                            self.remaining -= 1;
                            self.current = Some((self.factory)());
                            // Loop to poll the new future immediately
                        } else {
                            return Poll::Ready(Err(self.last_error.take().unwrap()));
                        }
                    }
                    Poll::Pending => return Poll::Pending,
                }
            } else {
                return Poll::Ready(Err(self.last_error.take().unwrap()));
            }
        }
    }
}
}

Key takeaway: The retry future is itself a state machine: it holds the current attempt and creates new inner futures on failure. This is how combinators compose — futures all the way down.

关键点:Retry future 本身也是一个状态机:它持有当前的尝试并在失败时创建新的内部 future。这就是组合器的组合方式 —— 万物皆可 Future。

Key Takeaways — Building Futures by Hand / 关键要点:手写 Future

  • A future needs three things: state, a poll() implementation, and a waker registration / 一个 future 需要三样东西:状态、poll() 实现和 waker 注册
  • Join polls both sub-futures; Select returns whichever finishes first / Join 轮询两个子 future;Select 返回最先完成的那个
  • Combinators are themselves futures wrapping other futures — it’s turtles all the way down / 组合器本身也是包装其他 future 的 future —— 它是递归嵌套的
  • Building futures by hand gives deep insight, but in production use tokio::join!/select! / 手写 future 能让你获得深刻洞见,但在生产环境中建议使用 tokio::join!/select!

See also / 延伸阅读: Ch 2 — The Future Trait / 第 2 章:Future Trait for the trait definition, Ch 8 — Tokio Deep Dive / 第 8 章:Tokio 深入解析 for production-grade equivalents


7. Executors and Runtimes / 7. 执行器与运行时 🟡

What you’ll learn / 你将学到:

  • What an executor does: poll + sleep efficiently / 执行器的作用:轮询 + 高效休眠
  • The six major runtimes: mio, io_uring, tokio, async-std, smol, embassy / 六大主要运行时:mio、io_uring、tokio、async-std、smol、embassy
  • A decision tree for choosing the right runtime / 选择合适运行时的决策树
  • Why runtime-agnostic library design matters / 为什么运行时无关的库设计很重要

What an Executor Does / 执行器是做什么的

An executor has two jobs:

一个执行器有两项工作:

  1. Poll futures when they’re ready to make progress / 在 future 准备好继续推进时对其进行 Poll(轮询)
  2. Sleep efficiently when no futures are ready (using OS I/O notification APIs) / 在没有 future 就绪时 高效休眠(利用操作系统的 I/O 通知 API)
graph TB
    subgraph Executor["Executor (e.g., tokio)"]
        QUEUE["Task Queue"]
        POLLER["I/O Poller<br/>(epoll/kqueue/io_uring)"]
        THREADS["Worker Thread Pool"]
    end

    subgraph Tasks
        T1["Task 1<br/>(HTTP request)"]
        T2["Task 2<br/>(DB query)"]
        T3["Task 3<br/>(File read)"]
    end

    subgraph OS["Operating System"]
        NET["Network Stack"]
        DISK["Disk I/O"]
    end

    T1 --> QUEUE
    T2 --> QUEUE
    T3 --> QUEUE
    QUEUE --> THREADS
    THREADS -->|"poll()"| T1
    THREADS -->|"poll()"| T2
    THREADS -->|"poll()"| T3
    POLLER <-->|"register/notify"| NET
    POLLER <-->|"register/notify"| DISK
    POLLER -->|"wake tasks"| QUEUE

    style Executor fill:#e3f2fd,color:#000
    style OS fill:#f3e5f5,color:#000

mio: The Foundation Layer / mio:底层基石

mio (Metal I/O) is not an executor — it’s the lowest-level cross-platform I/O notification library. It wraps epoll (Linux), kqueue (macOS/BSD), and IOCP (Windows).

mio (Metal I/O) 并不是一个执行器 —— 它是最底层的跨平台 I/O 通知库。它封装了 epoll (Linux)、kqueue (macOS/BSD) 和 IOCP (Windows)。

#![allow(unused)]
fn main() {
// Conceptual mio usage (simplified):
// mio 的概念性用法(简化版):
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;

let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);

let mut server = TcpListener::bind("0.0.0.0:8080")?;
poll.registry().register(&mut server, Token(0), Interest::READABLE)?;

// Event loop — blocks until something happens
// 事件循环 —— 阻塞直到有事件发生
loop {
    poll.poll(&mut events, None)?; // Sleeps until I/O event
    for event in events.iter() {
        match event.token() {
            Token(0) => { /* server has a new connection */ }
            _ => { /* other I/O ready */ }
        }
    }
}
}

Most developers never touch mio directly — tokio and smol build on top of it.

大多数开发者永远不会直接接触 mio —— tokio 和 smol 都是构建在它之上的。

io_uring: The Completion-Based Future / io_uring:基于完成通知的 Future

Linux’s io_uring (kernel 5.1+) represents a fundamental shift from the readiness-based I/O model that mio/epoll use:

Linux 的 io_uring(内核 5.1+)代表了从 mio/epoll 使用的“就绪通知(readiness-based)”模型到“完成通知(completion-based)”模型的根本转变:

Readiness-based (epoll / mio / tokio):
  1. Ask: "Is this socket readable?"     → epoll_wait()
  2. Kernel: "Yes, it's ready"           → EPOLLIN event
  3. App:   read(fd, buf)                → might still block briefly!

就绪模型 (epoll / mio / tokio):
  1. 询问:“这个 socket 可读吗?”         → epoll_wait()
  2. 内核:“是的,就绪了”                → EPOLLIN 事件
  3. 应用: read(fd, buf)                → 仍可能发生短暂阻塞!

Completion-based (io_uring):
  1. Submit: "Read from this socket into this buffer"  → SQE
  2. Kernel: does the read asynchronously
  3. App:   gets completed result with data            → CQE

完成模型 (io_uring):
  1. 提交:“从这个 socket 读取数据到这个缓冲区”  → SQE
  2. 内核: 异步执行读取操作
  3. 应用: 获取包含数据的完成结果              → CQE
graph LR
    subgraph "Readiness Model (epoll)"
        A1["App: is it ready?"] --> K1["Kernel: yes"]
        K1 --> A2["App: now read()"]
        A2 --> K2["Kernel: here's data"]
    end

    subgraph "Completion Model (io_uring)"
        B1["App: read this for me"] --> K3["Kernel: working..."]
        K3 --> B2["App: got result + data"]
    end

    style B1 fill:#c8e6c9,color:#000
    style B2 fill:#c8e6c9,color:#000

The ownership challenge / 所有权挑战: io_uring requires the kernel to own the buffer until the operation completes. This conflicts with Rust’s standard AsyncRead trait which borrows the buffer. That’s why tokio-uring has different I/O traits:

io_uring 要求内核在操作完成前拥有缓冲区的所有权。这与 Rust 标准的 AsyncRead trait(它借用缓冲区)相冲突。因此 tokio-uring 使用了不同的 I/O trait:

#![allow(unused)]
fn main() {
// Standard tokio (readiness-based) — borrows the buffer:
// 标准 tokio (基于就绪) —— 借用缓冲区:
let n = stream.read(&mut buf).await?;  // buf is borrowed

// tokio-uring (completion-based) — takes ownership of the buffer:
// tokio-uring (基于完成) —— 获取缓冲区的所有权:
let (result, buf) = stream.read(buf).await;  // buf is moved in, returned back
let n = result?;
}
Aspect / 维度epoll (tokio)io_uring (tokio-uring)
Model / 模型Readiness notification / 就绪通知Completion notification / 完成通知
Syscalls / 系统调用epoll_wait + read/writeBatched SQE/CQE ring / 批处理环
Buffer ownership / 缓冲区所有权App retains (&mut buf) / 应用持有借用Ownership transfer (move buf) / 所有权转移 (move)
Platform / 平台Linux, macOS, WindowsLinux 5.1+ only / 仅限 Linux 5.1+
Zero-copy / 零拷贝No (userspace copy) / 无 (用户态拷贝)Yes (registered buffers) / 有 (注册缓冲区)
Maturity / 成熟度Production-ready / 生产就绪Experimental / 实验性

When to use io_uring: High-throughput file I/O or networking where syscall overhead is the bottleneck (databases, storage engines, proxies serving 100k+ connections). For most applications, standard tokio with epoll is the right choice.

何时使用 io_uring:高吞吐量的文件 I/O 或网络场景,且系统调用开销是瓶颈时(如数据库、存储引擎、需要处理 10 万+连接的代理)。对于大多数应用,使用 epoll 的标准 tokio 才是正确选择。

tokio: The Batteries-Included Runtime / tokio:功能完备的运行时

The dominant async runtime in the Rust ecosystem. Used by Axum, Hyper, Tonic, and most production Rust servers.

Rust 生态系统中占主导地位的异步运行时。Axum、Hyper、Tonic 以及大多数生产级 Rust 服务器都在使用它。

// Cargo.toml:
// [dependencies]
// tokio = { version = "1", features = ["full"] }

#[tokio::main]
async fn main() {
    // Spawns a multi-threaded runtime with work-stealing scheduler
    // 派生一个带有工作窃取调度器的多线程运行时
    let handle = tokio::spawn(async {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        "done"
    });

    let result = handle.await.unwrap();
    println!("{result}");
}

tokio features / 特性: Timer, I/O, TCP/UDP, Unix sockets, signal handling, sync primitives (Mutex, RwLock, Semaphore, channels), fs, process, tracing integration.

tokio 特性:计时器、I/O、TCP/UDP、Unix 域套接字、信号处理、同步原语(Mutex、RwLock、Semaphore、通道)、文件系统、进程、tracing 集成。

async-std: The Standard Library Mirror / async-std:标准库镜像

Mirrors the std API with async versions. Less popular than tokio but simpler for beginners.

用异步版本镜像了 std API。虽然不如 tokio 流行,但对初学者来说更简单。

// Cargo.toml:
// [dependencies]
// async-std = { version = "1", features = ["attributes"] }

#[async_std::main]
async fn main() {
    use async_std::fs;
    let content = fs::read_to_string("hello.txt").await.unwrap();
    println!("{content}");
}

smol: The Minimalist Runtime / smol:极简主义运行时

Small, zero-dependency async runtime. Great for libraries that want async without pulling in tokio.

小型、零依赖的异步运行时。非常适合那些想要异步功能但不愿引入整个 tokio 的库。

// Cargo.toml:
// [dependencies]
// smol = "2"

fn main() {
    smol::block_on(async {
        let result = smol::unblock(|| {
            // Runs blocking code on a thread pool
            // 在线程池上运行阻塞代码
            std::fs::read_to_string("hello.txt")
        }).await.unwrap();
        println!("{result}");
    });
}

embassy: Async for Embedded (no_std) / embassy:嵌入式异步 (no_std)

Async runtime for embedded systems. No heap allocation, no std required.

为嵌入式系统设计的异步运行时。无需堆分配,无需 std

// Runs on microcontrollers (e.g., STM32, nRF52, RP2040)
// 运行在微控制器上(如 STM32, nRF52, RP2040)
#[embassy_executor::main]
async fn main(spawner: embassy_executor::Spawner) {
    // Blink an LED with async/await — no RTOS needed!
    // 使用 async/await 闪烁 LED —— 无需 RTOS!
    let mut led = Output::new(p.PA5, Level::Low, Speed::Low);
    loop {
        led.set_high();
        Timer::after(Duration::from_millis(500)).await;
        led.set_low();
        Timer::after(Duration::from_millis(500)).await;
    }
}

Runtime Decision Tree / 运行时决策树

graph TD
    START["Choosing a Runtime"]

    Q1{"Building a<br/>network server?"}
    Q2{"Need tokio ecosystem<br/>(Axum, Tonic, Hyper)?"}
    Q3{"Building a library?"}
    Q4{"Embedded /<br/>no_std?"}
    Q5{"Want minimal<br/>dependencies?"}

    TOKIO["🟢 tokio<br/>Best ecosystem, most popular"]
    SMOL["🔵 smol<br/>Minimal, no ecosystem lock-in"]
    EMBASSY["🟠 embassy<br/>Embedded-first, no alloc"]
    ASYNC_STD["🟣 async-std<br/>std-like API, good for learning"]
    AGNOSTIC["🔵 runtime-agnostic<br/>Use futures crate only"]

    START --> Q1
    Q1 -->|Yes| Q2
    Q1 -->|No| Q3
    Q2 -->|Yes| TOKIO
    Q2 -->|No| Q5
    Q3 -->|Yes| AGNOSTIC
    Q3 -->|No| Q4
    Q4 -->|Yes| EMBASSY
    Q4 -->|No| Q5
    Q5 -->|Yes| SMOL
    Q5 -->|No| ASYNC_STD

    style TOKIO fill:#c8e6c9,color:#000
    style SMOL fill:#bbdefb,color:#000
    style EMBASSY fill:#ffe0b2,color:#000
    style ASYNC_STD fill:#e1bee7,color:#000
    style AGNOSTIC fill:#bbdefb,color:#000

Runtime Comparison Table / 运行时对比表

Feature / 特性tokioasync-stdsmolembassy
Ecosystem / 生态Dominant / 主导Small / 较小Minimal / 极小Embedded / 嵌入式
Multi-threaded / 多线程✅ Work-stealing / 工作窃取❌ (single-core / 单核)
no_std
Timer / 计时器✅ Built-in / 内建✅ Built-in / 内建Via async-io✅ HAL-based / 基于 HAL
I/O✅ Own abstractions / 自有抽象✅ std mirror / std 镜像✅ Via async-io✅ HAL drivers / HAL 驱动
Learning curve / 学习曲线Medium / 中等Low / 低Low / 低High (HW) / 高(涉及硬件)
Binary size / 二进制大小Large / 较大Medium / 中等Small / 较小Tiny / 极微
🏋️ Exercise: Runtime Comparison / 练习:运行时对比 (点击展开)

Challenge: Write the same program using three different runtimes (tokio, smol, and async-std).

挑战:使用三种不同的运行时(tokio、smol 和 async-std)编写相同的程序。

🔑 Solution / 参考答案
// ----- tokio version -----
#[tokio::main]
async fn main() {
    let (url_result, file_result) = tokio::join!(
        async {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            "Response from URL"
        },
        async {
            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
            "Contents of file"
        },
    );
    println!("URL: {url_result}, File: {file_result}");
}

// ----- smol version -----
fn main() {
    smol::block_on(async {
        let (url_result, file_result) = futures_lite::future::zip(
            async {
                smol::Timer::after(std::time::Duration::from_millis(100)).await;
                "Response from URL"
            },
            async {
                smol::Timer::after(std::time::Duration::from_millis(50)).await;
                "Contents of file"
            },
        ).await;
        println!("URL: {url_result}, File: {file_result}");
    });
}

// ----- async-std version -----
#[async_std::main]
async fn main() {
    let (url_result, file_result) = futures::future::join(
        async {
            async_std::task::sleep(std::time::Duration::from_millis(100)).await;
            "Response from URL"
        },
        async {
            async_std::task::sleep(std::time::Duration::from_millis(50)).await;
            "Contents of file"
        },
    ).await;
    println!("URL: {url_result}, File: {file_result}");
}

Key takeaway: The async business logic is identical across runtimes. Only the entry point and timer/IO APIs differ. This is why writing runtime-agnostic libraries (using only std::future::Future) is valuable.

关键点:异步业务逻辑在不同运行时之间是完全相同的。唯一的区别在于入口点和计时器/IO API。这就是为什么编写运行时无关的库(仅使用 std::future::Future)非常有价值。

Key Takeaways — Executors and Runtimes / 关键要点:执行器与运行时

  • An executor’s job: poll futures when woken, sleep efficiently using OS I/O APIs / 执行器的工作:在被唤醒时轮询 future,利用操作系统 I/O API 高效休眠
  • tokio is the default for servers; smol for minimal footprint; embassy for embedded / tokio 是服务器默认选型;smol 适用于极小占用;embassy 用于嵌入式
  • Your business logic should depend on std::future::Future, not a specific runtime / 你的业务逻辑应该依赖 std::future::Future,而不是特定的运行时
  • io_uring (Linux 5.1+) is the future of high-perf I/O but the ecosystem is still maturing / io_uring (Linux 5.1+) 是高性能 I/O 的未来,但生态系统仍在成熟中

See also / 延伸阅读: Ch 8 — Tokio Deep Dive / 第 8 章:Tokio 深入解析 for tokio specifics, Ch 9 — When Tokio Isn’t the Right Fit / 第 9 章:Tokio 不适用的场景 for alternatives


8. Tokio Deep Dive / 8. Tokio 深入解析 🟡

What you’ll learn / 你将学到:

  • Runtime flavors: multi-thread vs current-thread and when to use each / 运行时变体:多线程与单线程,以及各自的适用场景
  • tokio::spawn, the 'static requirement, and JoinHandle / tokio::spawn'static 约束以及 JoinHandle
  • Task cancellation semantics (cancel-on-drop) / 任务取消语义(drop 时取消)
  • Sync primitives: Mutex, RwLock, Semaphore, and all four channel types / 同步原语:Mutex、RwLock、Semaphore 以及四种通道类型

Runtime Flavors: Multi-Thread vs Current-Thread / 运行时变体:多线程与单线程

Tokio offers two runtime configurations:

Tokio 提供了两种运行时配置:

// Multi-threaded (default with #[tokio::main])
// Uses a work-stealing thread pool — tasks can move between threads
// 多线程(#[tokio::main] 默认配置)
// 使用工作窃取线程池 —— 任务可以在不同线程间移动
#[tokio::main]
async fn main() {
    // N worker threads (default = number of CPU cores)
    // Tasks are Send + 'static
    // N 个工作线程(默认 = CPU 核心数)
    // 任务必须满足 Send + 'static
}

// Current-thread — everything runs on one thread
// 单线程 —— 所有内容都在一个线程上运行
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // Single-threaded — tasks don't need to be Send
    // Lighter weight, good for simple tools or WASM
    // 单线程 —— 任务不需要满足 Send
    // 更轻量,适用于简单工具或 WASM
}

// Manual runtime construction:
// 手动构建运行时:
let rt = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(4)
    .enable_all()
    .build()
    .unwrap();

rt.block_on(async {
    println!("Running on custom runtime");
});
graph TB
    subgraph "Multi-Thread (default)"
        MT_Q1["Thread 1<br/>Task A, Task D"]
        MT_Q2["Thread 2<br/>Task B"]
        MT_Q3["Thread 3<br/>Task C, Task E"]
        STEAL["Work Stealing:<br/>idle threads steal from busy ones"]
        MT_Q1 <--> STEAL
        MT_Q2 <--> STEAL
        MT_Q3 <--> STEAL
    end

    subgraph "Current-Thread"
        ST_Q["Single Thread<br/>Task A → Task B → Task C → Task D"]
    end

    style MT_Q1 fill:#c8e6c9,color:#000
    style MT_Q2 fill:#c8e6c9,color:#000
    style MT_Q3 fill:#c8e6c9,color:#000
    style ST_Q fill:#bbdefb,color:#000

tokio::spawn and the ’static Requirement / tokio::spawn 与 ’static 约束

tokio::spawn puts a future onto the runtime’s task queue. Because it might run on any worker thread at any time, the future must be Send + 'static:

tokio::spawn 将一个 future 放入运行时的任务队列。由于它可能在 任何 时间、在 任何一个 工作线程上运行,因此该 future 必须满足 Send + 'static

#![allow(unused)]
fn main() {
use tokio::task;

async fn example() {
    let data = String::from("hello");

    // ✅ Works: move ownership into the task
    // ✅ 正常工作:将所有权 move 进入任务中
    let handle = task::spawn(async move {
        println!("{data}");
        data.len()
    });

    let len = handle.await.unwrap();
    println!("Length: {len}");
}

async fn problem() {
    let data = String::from("hello");

    // ❌ FAILS: data is borrowed, not 'static
    // ❌ 失败:data 是借用的,不是 'static
    // task::spawn(async {
    //     println!("{data}"); // borrows `data` — not 'static
    // });

    // ❌ FAILS: Rc is not Send
    // ❌ 失败:Rc 不是 Send
    // let rc = std::rc::Rc::new(42);
    // task::spawn(async move {
    //     println!("{rc}"); // Rc is !Send — can't cross thread boundary
    // });
}
}

Why 'static? / 为什么需要 'static The spawned task runs independently — it might outlive the scope that created it. The compiler can’t prove the references will remain valid, so it requires owned data.

为什么需要 'static 被派生的任务独立运行 —— 它可能会比创建它的作用域活得更久。编译器无法证明其引用的有效性,因此要求使用拥有所有权的数据。

Why Send? / 为什么需要 Send The task might be resumed on a different thread than where it was suspended. All data held across .await points must be safe to send between threads.

为什么需要 Send 任务可能会在与挂起时不同的线程上恢复执行。所有跨越 .await 点持有的数据都必须能够安全地在线程间发送。

#![allow(unused)]
fn main() {
// Common pattern: clone shared data into the task
// 常见模式:将共享数据克隆到任务中
let shared = Arc::new(config);

for i in 0..10 {
    let shared = Arc::clone(&shared); // Clone the Arc, not the data
                                       // 克隆 Arc,而不是克隆数据
    tokio::spawn(async move {
        process_item(i, &shared).await;
    });
}
}

JoinHandle and Task Cancellation / JoinHandle 与任务取消

#![allow(unused)]
fn main() {
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};

async fn cancellation_example() {
    let handle: JoinHandle<String> = tokio::spawn(async {
        sleep(Duration::from_secs(10)).await;
        "completed".to_string()
    });

    // Cancel the task by dropping the handle? NO — task keeps running!
    // 通过 drop 这个 handle 来取消任务?不 —— 任务会继续运行!
    // drop(handle); // Task continues in the background

    // To actually cancel, call abort():
    // 若要真正取消,请调用 abort():
    handle.abort();

    // Awaiting an aborted task returns JoinError
    // 等待一个被中止的任务会返回 JoinError
    match handle.await {
        Ok(val) => println!("Got: {val}"),
        Err(e) if e.is_cancelled() => println!("Task was cancelled"),
        Err(e) => println!("Task panicked: {e}"),
    }
}
}

Important / 重要:Dropping a JoinHandle does NOT cancel the task in tokio. The task becomes detached and keeps running. You must explicitly call .abort() to cancel it. This is different from dropping a Future directly, which does cancel/drop the underlying computation.

重要:在 tokio 中,丢弃(drop) JoinHandle不会 取消任务。任务会进入 分离(detached) 状态并继续运行。你必须显式调用 .abort() 才能取消它。这与直接丢弃一个 Future 不同,后者确实会取消/停止底层的计算。

Tokio Sync Primitives / Tokio 同步原语

Tokio provides async-aware synchronization primitives. The key principle: don’t use std::sync::Mutex across .await points.

Tokio 提供了异步感知的同步原语。核心原则是:不要跨越 .await 点使用 std::sync::Mutex

#![allow(unused)]
fn main() {
use tokio::sync::{Mutex, RwLock, Semaphore, mpsc, oneshot, broadcast, watch};

// --- Mutex / 互斥锁 ---
// Async mutex: the lock() method is async and won't block the thread
// 异步互斥锁:lock() 方法是异步的,不会阻塞当前线程
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
{
    let mut guard = data.lock().await; // Non-blocking lock
                                       // 非阻塞加锁
    guard.push(4);
} // Guard dropped here — lock released
  // guard 在此处 drop —— 锁被释放

// --- Channels / 通道 ---
// mpsc: Multiple producer, single consumer
// mpsc:多生产者,单消费者
let (tx, mut rx) = mpsc::channel::<String>(100); // Bounded buffer
                                                 // 有界缓冲区

tokio::spawn(async move {
    tx.send("hello".into()).await.unwrap();
});

let msg = rx.recv().await.unwrap();

// oneshot: Single value, single consumer
// oneshot:单值,单消费者
let (tx, rx) = oneshot::channel::<i32>();
tx.send(42).unwrap(); // No await needed — either sends or fails
let val = rx.await.unwrap();

// broadcast: Multiple producers, multiple consumers (all get every message)
// broadcast:多生产者,多消费者(所有人都会收到每一条消息)
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

// watch: Single value, multiple consumers (only latest value)
// watch:单值,多消费者(只保留最新值)
let (tx, rx) = watch::channel(0u64);
tx.send(42).unwrap();
println!("Latest: {}", *rx.borrow());
}

Note: .unwrap() is used for brevity throughout these channel examples. In production, handle send/receive errors gracefully — a failed .send() means the receiver was dropped, and a failed .recv() means the channel is closed.

注意:在这些通道示例中,为了简洁使用了 .unwrap()。在生产环境中,请优雅地处理发送/接收错误 —— 发送失败意味着接收端已释放,接收失败意味着通道已关闭。

graph LR
    subgraph "Channel Types"
        direction TB
        MPSC["mpsc<br/>N→1<br/>Buffered queue"]
        ONESHOT["oneshot<br/>1→1<br/>Single value"]
        BROADCAST["broadcast<br/>N→N<br/>All receivers get all"]
        WATCH["watch<br/>1→N<br/>Latest value only"]
    end

    P1["Producer 1"] --> MPSC
    P2["Producer 2"] --> MPSC
    MPSC --> C1["Consumer"]

    P3["Producer"] --> ONESHOT
    ONESHOT --> C2["Consumer"]

    P4["Producer"] --> BROADCAST
    BROADCAST --> C3["Consumer 1"]
    BROADCAST --> C4["Consumer 2"]

    P5["Producer"] --> WATCH
    WATCH --> C5["Consumer 1"]
    WATCH --> C6["Consumer 2"]

Case Study: Choosing the Right Channel / 案例分析:选择合适的通道

Requirement / 需求Channel / 通道Why / 原因
API handlers → Batchermpsc (bounded)N Producers, 1 Consumer. Bounded for backpressure / N 生产者,1 消费者。通过有界缓冲实现背压,防止 OOM
Config watcher → Rate limiterwatchOnly the latest config matters / 只有最新的配置才有意义。多个读取者(每个 worker)都能看到当前值
Shutdown signal → All componentsbroadcastEvery component must receive the notification independently / 每个组件必须能独立收到关机通知
Single health-check responseoneshotRequest/response pattern — one value, then done / 请求/响应模式 —— 一个值,发完即结束
graph LR
    subgraph "Notification Service"
        direction TB
        API1["API Handler 1"] -->|mpsc| BATCH["Batcher"]
        API2["API Handler 2"] -->|mpsc| BATCH
        CONFIG["Config Watcher"] -->|watch| RATE["Rate Limiter"]
        CTRL["Ctrl+C"] -->|broadcast| API1
        CTRL -->|broadcast| BATCH
        CTRL -->|broadcast| RATE
    end

    style API1 fill:#d4efdf,stroke:#27ae60,color:#000
    style API2 fill:#d4efdf,stroke:#27ae60,color:#000
    style BATCH fill:#e8f4f8,stroke:#2980b9,color:#000
    style CONFIG fill:#fef9e7,stroke:#f39c12,color:#000
    style RATE fill:#fef9e7,stroke:#f39c12,color:#000
    style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000
🏋️ Exercise: Build a Task Pool / 练习:构建任务池 (点击展开)

Challenge: Build a function run_with_limit that accepts a list of async closures and a concurrency limit, executing at most N tasks simultaneously. Use tokio::sync::Semaphore.

挑战:构建一个 run_with_limit 函数,接收一系列异步闭包和一个并发限制,同时执行的任务数不超过 N。请使用 tokio::sync::Semaphore

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Semaphore;

async fn run_with_limit<F, Fut, T>(tasks: Vec<F>, limit: usize) -> Vec<T>
where
    F: FnOnce() -> Fut + Send + 'static,
    Fut: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let semaphore = Arc::new(Semaphore::new(limit));
    let mut handles = Vec::new();

    for task in tasks {
        let permit = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {
            let _permit = permit.acquire().await.unwrap();
            // Permit is held while task runs, then dropped
            // 任务运行时持有 permit,运行结束自动 drop
            task().await
        });
        handles.push(handle);
    }

    let mut results = Vec::new();
    for handle in handles {
        results.push(handle.await.unwrap());
    }
    results
}
}

Key takeaway: Semaphore is the standard way to limit concurrency in tokio. Each task acquires a permit before starting work. When the semaphore is full, new tasks wait asynchronously (non-blocking) until a slot opens.

关键点Semaphore 是 tokio 中限制并发的标准方式。每个任务在开始工作前都会获取一个许可证。当信号量满时,新任务会以异步(非阻塞)方式等待,直到有空槽位放出。

Key Takeaways — Tokio Deep Dive / 关键要点:Tokio 深入解析

  • Use multi_thread for servers (default); current_thread for CLI tools, tests, or !Send types / 服务器建议使用 multi_thread(默认);CLI 工具、测试或 !Send 类型建议使用 current_thread
  • tokio::spawn requires 'static futures — use Arc or channels to share data / tokio::spawn 要求 future 必须是 'static —— 使用 Arc 或通道共享数据
  • Dropping a JoinHandle does not cancel the task — call .abort() explicitly / 丢弃 JoinHandle 不会 取消任务 —— 必须显式调用 .abort()
  • Choose sync primitives by need: Mutex for shared state, Semaphore for concurrency limits, mpsc/oneshot/broadcast/watch for communication / 按需选择同步原语:Mutex 用于共享状态,Semaphore 用于并发限制,四种通道用于通信

See also / 延伸阅读: Ch 9 — When Tokio Isn’t the Right Fit / 第 9 章:Tokio 不适用的场景 for alternatives to spawn, Ch 12 — Common Pitfalls / 第 12 章:常见陷阱 for MutexGuard-across-await bugs


9. When Tokio Isn’t the Right Fit / 9. Tokio 不适用的场景 🟡

What you’ll learn / 你将学到:

  • The 'static problem: when tokio::spawn forces you into Arc everywhere / 'static 问题:当 tokio::spawn 强制你在各处使用 Arc
  • LocalSet for !Send futures / 针对 !Send future 的 LocalSet
  • FuturesUnordered for borrow-friendly concurrency (no spawn needed) / 借用友好的并发工具 FuturesUnordered(无需 spawn)
  • JoinSet for managed task groups / 用于任务组管理的 JoinSet
  • Writing runtime-agnostic libraries / 编写运行时无关的库
graph TD
    START["Need concurrent futures?"] --> STATIC{"Can futures be 'static?"}
    STATIC -->|Yes| SEND{"Are futures Send?"}
    STATIC -->|No| FU["FuturesUnordered<br/>Runs on current task"]
    SEND -->|Yes| SPAWN["tokio::spawn<br/>Multi-threaded"]
    SEND -->|No| LOCAL["LocalSet<br/>Single-threaded"]
    SPAWN --> MANAGE{"Need to track/abort tasks?"}
    MANAGE -->|Yes| JOINSET["JoinSet / TaskTracker"]
    MANAGE -->|No| HANDLE["JoinHandle"]

    style START fill:#f5f5f5,stroke:#333,color:#000
    style FU fill:#d4efdf,stroke:#27ae60,color:#000
    style SPAWN fill:#e8f4f8,stroke:#2980b9,color:#000
    style LOCAL fill:#fef9e7,stroke:#f39c12,color:#000
    style JOINSET fill:#e8daef,stroke:#8e44ad,color:#000
    style HANDLE fill:#e8f4f8,stroke:#2980b9,color:#000

The ’static Future Problem / ’static Future 问题

Tokio’s spawn requires 'static futures. This means you can’t borrow local data in spawned tasks:

Tokio 的 spawn 要求 future 必须是 'static。这意味着你不能在被派生的任务中借用局部数据:

#![allow(unused)]
fn main() {
async fn process_items(items: &[String]) {
    // ❌ Can't do this — items is borrowed, not 'static
    // ❌ 不能这样做 —— items 是借用的,不是 'static
    // for item in items {
    //     tokio::spawn(async {
    //         process(item).await;
    //     });
    // }

    // 😐 Workaround 1: Clone everything
    // 😐 方案 1:克隆所有数据
    for item in items {
        let item = item.clone();
        tokio::spawn(async move {
            process(&item).await;
        });
    }

    // 😐 Workaround 2: Use Arc
    // 😐 方案 2:使用 Arc
    let items = Arc::new(items.to_vec());
    for i in 0..items.len() {
        let items = Arc::clone(&items);
        tokio::spawn(async move {
            process(&items[i]).await;
        });
    }
}
}

This is annoying! In Go, you can just go func() { use(item) } with a closure. In Rust, the ownership system forces you to think about who owns what and how long it lives.

这确实很烦人!在 Go 语言中,你可以直接通过闭包 go func() { use(item) }。但在 Rust 中,所有权系统强制你思考谁拥有什么,以及它能活多久。

Scoped Tasks and Alternatives / 作用域任务与替代方案

Several solutions exist for the 'static problem:

针对 'static 问题,有几种解决方案:

#![allow(unused)]
fn main() {
// 1. tokio::task::LocalSet — run !Send futures on current thread
// 1. tokio::task::LocalSet —— 在当前线程运行 !Send 的 future
use tokio::task::LocalSet;

let local_set = LocalSet::new();
local_set.run_until(async {
    tokio::task::spawn_local(async {
        // Can use Rc, Cell, and other !Send types here
        // 此处可以使用 Rc、Cell 和其他 !Send 类型
        let rc = std::rc::Rc::new(42);
        println!("{rc}");
    }).await.unwrap();
}).await;

// 2. FuturesUnordered — concurrent without spawning
// 2. FuturesUnordered —— 无需 spawn 即可并发
use futures::stream::{FuturesUnordered, StreamExt};

async fn process_items(items: &[String]) {
    let futures: FuturesUnordered<_> = items
        .iter()
        .map(|item| async move {
            // ✅ Can borrow item — no spawn, no 'static needed!
            // ✅ 可以借用 item —— 无需 spawn,不需要满足 'static!
            process(item).await
        })
        .collect();

    // Drive all futures to completion
    // 推进所有 future 直至完成
    futures.for_each(|result| async {
        println!("Result: {result:?}");
    }).await;
}

// 3. tokio JoinSet (tokio 1.21+) — managed set of spawned tasks
// 3. tokio JoinSet (tokio 1.21+) —— 已分配任务的管理集合
use tokio::task::JoinSet;

async fn with_joinset() {
    let mut set = JoinSet::new();

    for i in 0..10 {
        set.spawn(async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            i * 2
        });
    }

    while let Some(result) = set.join_next().await {
        println!("Task completed: {:?}", result.unwrap());
    }
}
}

Lightweight Runtimes for Libraries / 库的轻量级运行时

If you’re writing a library — don’t force users into tokio:

如果你正在编写一个库 —— 不要强迫用户使用 tokio:

#![allow(unused)]
fn main() {
// ❌ BAD: Library forces tokio on users
// ❌ 错误做法:库强制用户依赖 tokio
pub async fn my_lib_function() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    // Now your users MUST use tokio
    // 现在你的用户必须使用 tokio 了
}

// ✅ GOOD: Library is runtime-agnostic
// ✅ 正确做法:库是运行时无关的
pub async fn my_lib_function() {
    // Use only types from std::future and futures crate
    // 仅使用来自 std::future 和 futures crate 的类型
    do_computation().await;
}

// ✅ GOOD: Accept a generic future for I/O operations
// ✅ 正确做法:为 I/O 操作接收泛型 future
pub async fn fetch_with_retry<F, Fut, T, E>(
    operation: F,
    max_retries: usize,
) -> Result<T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    for attempt in 0..max_retries {
        match operation().await {
            Ok(val) => return Ok(val),
            Err(e) if attempt == max_retries - 1 => return Err(e),
            Err(_) => continue,
        }
    }
    unreachable!()
}
}

Rule of thumb: Libraries should depend on futures crate, not tokio. Applications should depend on tokio (or their chosen runtime). This keeps the ecosystem composable.

经验法则:库应该依赖 futures crate,而不是 tokio。应用程序应该依赖 tokio(或其选择的运行时)。这样可以保持生态系统的可组合性。

🏋️ Exercise: FuturesUnordered vs Spawn / 练习:FuturesUnordered 与 Spawn (点击展开)

Challenge: Write the same function two ways — once using tokio::spawn (requires 'static) and once using FuturesUnordered (borrows data).

挑战:用两种方式编写同一个函数 —— 一次使用 tokio::spawn(要求 'static),一次使用 FuturesUnordered(允许借用数据)。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, Duration};

// Version 1: tokio::spawn — requires 'static, must clone
// 版本 1: tokio::spawn —— 要求 'static,必须克隆数据
async fn lengths_with_spawn(items: &[String]) -> Vec<usize> {
    let mut handles = Vec::new();
    for item in items {
        let owned = item.clone(); // Must clone — spawn requires 'static
        handles.push(tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            owned.len()
        }));
    }

    let mut results = Vec::new();
    for handle in handles {
        results.push(handle.await.unwrap());
    }
    results
}

// Version 2: FuturesUnordered — borrows data, no clone needed
// 版本 2: FuturesUnordered —— 借用数据,无需克隆
async fn lengths_without_spawn(items: &[String]) -> Vec<usize> {
    let futures: FuturesUnordered<_> = items
        .iter()
        .map(|item| async move {
            sleep(Duration::from_millis(10)).await;
            item.len() // ✅ Borrows item — no clone!
                       // ✅ 借用 item —— 无需克隆!
        })
        .collect();

    futures.collect().await
}
}

Key takeaway: FuturesUnordered avoids the 'static requirement by running all futures on the current task (no thread migration). The trade-off: all futures share one task — if one blocks, the others stall. Use spawn for CPU-heavy work that should run on separate threads.

关键点FuturesUnordered 通过在当前任务中运行所有 future(不涉及线程迁移)来规避 'static 约束。权衡之处:所有 future 共享同一个任务 —— 如果其中一个阻塞,其他的也会停滞。对于应该在独立线程运行的 CPU 密集型工作,请使用 spawn

Key Takeaways — When Tokio Isn’t the Right Fit / 关键要点:Tokio 不适用的场景

  • FuturesUnordered runs futures concurrently on the current task — no 'static requirement / FuturesUnordered 在当前任务中并发运行 future —— 无需 'static 约束
  • LocalSet enables !Send futures on a single-threaded executor / LocalSet 允许在单线程执行器上运行 !Send 的 future
  • JoinSet (tokio 1.21+) provides managed task groups with automatic cleanup / JoinSet (tokio 1.21+) 提供了带自动清理功能的受管任务组
  • For libraries: depend only on std::future::Future + futures crate, not tokio directly / 对于库:仅依赖 std::future::Future + futures crate,不要直接依赖 tokio

See also / 延伸阅读: Ch 8 — Tokio Deep Dive / 第 8 章:Tokio 深入解析 for when spawn is the right tool, Ch 11 — Streams / 第 11 章:流 for buffer_unordered() as another concurrency limiter


10. Async Traits / 10. 异步 Trait 🟡

What you’ll learn / 你将学到:

  • Why async methods in traits took years to stabilize / 为什么 trait 中的异步方法花了数年才稳定
  • RPITIT: native async trait methods (Rust 1.75+) / RPITIT:原生异步 trait 方法 (Rust 1.75+)
  • The dyn dispatch challenge and trait_variant workaround / dyn 分发挑战与 trait_variant 变通方案
  • Async closures (Rust 1.85+): async Fn() and async FnOnce() / 异步闭包 (Rust 1.85+):async Fn()async FnOnce()
graph TD
    subgraph "Async Trait Approaches"
        direction TB
        RPITIT["RPITIT (Rust 1.75+)<br/>async fn in trait<br/>Static dispatch only"]
        VARIANT["trait_variant<br/>Auto-generates Send variant<br/>Enables dyn dispatch"]
        BOXED["Box&lt;dyn Future&gt;<br/>Manual boxing<br/>Works everywhere"]
        CLOSURE["Async Closures (1.85+)<br/>async Fn() / async FnOnce()<br/>Callbacks & middleware"]
    end

    RPITIT -->|"Need dyn?"| VARIANT
    RPITIT -->|"Pre-1.75?"| BOXED
    CLOSURE -->|"Replaces"| BOXED

    style RPITIT fill:#d4efdf,stroke:#27ae60,color:#000
    style VARIANT fill:#e8f4f8,stroke:#2980b9,color:#000
    style BOXED fill:#fef9e7,stroke:#f39c12,color:#000
    style CLOSURE fill:#e8daef,stroke:#8e44ad,color:#000

The History: Why It Took So Long / 历史回顾:为何耗时弥久

Async methods in traits were Rust’s most requested feature for years. The problem:

Trait 中的异步方法多年来一直是 Rust 用户最渴望的特性。问题在于:

#![allow(unused)]
fn main() {
// This didn't compile until Rust 1.75 (Dec 2023):
// 在 Rust 1.75(2023 年 12 月)之前,这段代码无法编译:
trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
}
// Why? Because async fn returns `impl Future<Output = T>`,
// and `impl Trait` in trait return position wasn't supported.
// 为什么?因为 async fn 返回的是 `impl Future<Output = T>`,
// 而当时并不支持在 trait 的返回值位置使用 `impl Trait`。
}

The fundamental challenge: when a trait method returns impl Future, each implementor returns a different concrete type. The compiler needs to know the size of the return type, but trait methods are dynamically dispatched.

根本挑战在于:当一个 trait 方法返回 impl Future 时,每个实现者返回的都是 不同的具体类型。编译器通常需要知道返回类型的大小,但 trait 方法往往是动态分发的(dynamic dispatch)。

RPITIT: Return Position Impl Trait in Trait / RPITIT:即 Trait 中返回值位置的 Impl Trait

Since Rust 1.75, this just works for static dispatch:

从 Rust 1.75 开始,原生异步方法在静态分发下已经可以正常工作了:

#![allow(unused)]
fn main() {
trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
    // Desugars to:
    // 解糖后等价于:
    // fn get(&self, key: &str) -> impl Future<Output = Option<String>>;
}

struct InMemoryStore {
    data: std::collections::HashMap<String, String>,
}

impl DataStore for InMemoryStore {
    async fn get(&self, key: &str) -> Option<String> {
        self.data.get(key).cloned()
    }
}

// ✅ Works with generics (static dispatch):
// ✅ 适用于泛型(静态分发):
async fn lookup<S: DataStore>(store: &S, key: &str) {
    if let Some(val) = store.get(key).await {
        println!("{key} = {val}");
    }
}
}

dyn Dispatch and Send Bounds / dyn 分发与 Send 约束

The limitation: you can’t use dyn DataStore directly because the compiler doesn’t know the size of the returned future:

局限性:你不能直接使用 dyn DataStore,因为编译器不知道返回的 future 的大小:

#![allow(unused)]
fn main() {
// ❌ Doesn't work:
// ❌ 无法工作:
// async fn lookup_dyn(store: &dyn DataStore, key: &str) { ... }
// Error: the trait `DataStore` is not dyn-compatible because method `get`
//        is `async`

// ✅ Workaround: Return a boxed future
// ✅ 方案:返回手动装箱的 future
trait DynDataStore {
    fn get(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send + '_>>;
}

// Or use the trait_variant macro (see below)
// 或者使用 trait_variant 宏(见后文)
}

The Send problem / Send 问题: In multi-threaded runtimes, spawned tasks must be Send. But async trait methods don’t automatically add Send bounds:

Send 问题:在多线程运行时中,派生的任务必须满足 Send。但异步 trait 方法并不会自动添加 Send 约束:

#![allow(unused)]
fn main() {
trait Worker {
    async fn run(&self); // Future might or might not be Send
                         // Future 可能满足也可能不满足 Send
}

struct MyWorker;

impl Worker for MyWorker {
    async fn run(&self) {
        // If this uses !Send types, the future is !Send
        // 如果这里使用了 !Send 类型,生成的 future 也是 !Send
        let rc = std::rc::Rc::new(42);
        some_work().await;
        println!("{rc}");
    }
}

// ❌ This fails if the future isn't Send:
// ❌ 如果 future 不是 Send,这行代码会报错:
// tokio::spawn(worker.run()); // Requires Send + 'static
}

The trait_variant Crate / trait_variant 库

The trait_variant crate (from the Rust async working group) generates a Send variant automatically:

trait_variant 库(由 Rust 异步小组开发)可以自动生成一个满足 Send 约束的变体:

#![allow(unused)]
fn main() {
// Cargo.toml: trait-variant = "0.1"

#[trait_variant::make(SendDataStore: Send)]
trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String);
}

// Now you have two traits:
// - DataStore: no Send bound on the futures
// - SendDataStore: all futures are Send
// 现在你有了两个 trait:
// - DataStore:对 future 没有 Send 约束
// - SendDataStore:所有 future 都强制满足 Send

// Use SendDataStore when you need to spawn:
// 当你需要 spawn 时使用 SendDataStore:
async fn spawn_lookup(store: Arc<dyn SendDataStore>) {
    tokio::spawn(async move {
        store.get("key").await;
    });
}
}

Quick Reference: Async Traits / 速查表:异步 Trait

Approach / 方案Static Dispatch / 静态分发Dynamic Dispatch / 动态分发SendSyntax Overhead / 语法开销
Native async fn in traitImplicit / 隐式None / 无
trait_variantExplicit / 显式#[trait_variant::make]
Manual Box::pinExplicit / 显式High / 较高
async-trait crate#[async_trait]Medium / 中等

Recommendation: For new code (Rust 1.75+), use native async traits with trait_variant when you need dyn dispatch. The async-trait crate is still widely used but boxes every future — the native approach is zero-cost for static dispatch.

建议:对于新项目(Rust 1.75+),请使用原生的异步 trait;如果需要 dyn 分发,配合 trait_variant 使用。async-trait 库虽然仍被广泛使用,但它会装箱每个 future,而原生方案在静态分发下是零成本的。

Async Closures (Rust 1.85+) / 异步闭包 (Rust 1.85+)

Since Rust 1.85, async closures are stable — closures that capture their environment and return a future:

从 Rust 1.85 开始,异步闭包 已稳定 —— 即捕获环境并返回 future 的闭包:

#![allow(unused)]
fn main() {
// After 1.85: async closures just work
// 1.85 之后:异步闭包正式可用
let fetchers: Vec<_> = urls.iter().map(|url| {
    async move || { reqwest::get(url).await }
    // ↑ This is an async closure — captures url, returns a Future
    // ↑ 这是一个异步闭包 —— 捕获 url,返回一个 Future
}).collect();
}

Async closures implement the new AsyncFn, AsyncFnMut, and AsyncFnOnce traits, which mirror Fn, FnMut, FnOnce:

异步闭包实现了新的 AsyncFnAsyncFnMutAsyncFnOnce trait,它们镜像了传统的 FnFnMutFnOnce

#![allow(unused)]
fn main() {
// Generic function accepting an async closure
// 接收异步闭包的泛型函数
async fn retry<F>(max: usize, f: F) -> Result<String, Error>
where
    F: AsyncFn() -> Result<String, Error>,
{
    for _ in 0..max {
        if let Ok(val) = f().await {
            return Ok(val);
        }
    }
    f().await
}
}

Migration tip: If you have code using Fn() -> impl Future<Output = T>, consider switching to AsyncFn() -> T for cleaner signatures.

迁移建议:如果你有的代码使用了 Fn() -> impl Future<Output = T>,可以考虑切换到 AsyncFn() -> T 以获得更简洁的签名。

🏋️ Exercise: Design an Async Service Trait / 练习:设计一个异步服务 Trait (点击展开)

Challenge: Design a Cache trait with async get and set methods. Implement it twice: once with a HashMap (in-memory) and once with a simulated Redis backend.

挑战:设计一个包含异步 getset 方法的 Cache trait。分别实现两次:一次使用 HashMap(内存版),一次模拟 Redis 后端。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

trait Cache {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String);
}

// --- In-memory implementation / 内存实现 ---
struct MemoryCache {
    store: Mutex<HashMap<String, String>>,
}

impl Cache for MemoryCache {
    async fn get(&self, key: &str) -> Option<String> {
        self.store.lock().await.get(key).cloned()
    }

    async fn set(&self, key: &str, value: String) {
        self.store.lock().await.insert(key.to_string(), value);
    }
}

// --- Simulated Redis implementation / 模拟 Redis 实现 ---
struct RedisCache {
    store: Mutex<HashMap<String, String>>,
    latency: Duration,
}

impl Cache for RedisCache {
    async fn get(&self, key: &str) -> Option<String> {
        sleep(self.latency).await; // Simulate network round-trip / 模拟网络往返
        self.store.lock().await.get(key).cloned()
    }

    async fn set(&self, key: &str, value: String) {
        sleep(self.latency).await;
        self.store.lock().await.insert(key.to_string(), value);
    }
}
}

Key takeaway: The same generic function works with both implementations through static dispatch. No boxing, no allocation overhead.

关键点:同一个泛型函数可以通过静态分发与两种实现协同工作。没有装箱,也没有额外的分配开销。

Key Takeaways — Async Traits / 关键要点:异步 Trait

  • Since Rust 1.75, you can write async fn directly in traits (no #[async_trait] crate needed) / 从 Rust 1.75 开始,你可以直接在 trait 中编写 async fn(无需 #[async_trait] 库)
  • trait_variant::make auto-generates a Send variant for dynamic dispatch / trait_variant::make 可为动态分发自动生成 Send 变体
  • Async closures (async Fn()) stabilized in 1.85 — use for callbacks and middleware / 异步闭包 (async Fn()) 在 1.85 稳定 —— 适用于回调和中间件
  • Prefer static dispatch (<S: Service>) over dyn for performance-critical code / 对于性能敏感的代码,优先选择静态分发 (<S: Service>) 而非 dyn

See also / 延伸阅读: Ch 13 — Production Patterns / 第 13 章:生产模式 for Tower’s Service trait, Ch 6 — Building Futures by Hand / 第 6 章:手动构建 Future for manual trait implementations


11. Streams and AsyncIterator / 11. 流与异步迭代器 🟡

What you’ll learn / 你将学到:

  • The Stream trait: async iteration over multiple values / Stream trait:对多个值进行异步迭代
  • Creating streams: stream::iter, async_stream, unfold / 创建流:stream::iterasync_streamunfold
  • Stream combinators: map, filter, buffer_unordered, fold / 流组合器:mapfilterbuffer_unorderedfold
  • Async I/O traits: AsyncRead, AsyncWrite, AsyncBufRead / 异步 I/O trait:AsyncReadAsyncWriteAsyncBufRead

Stream Trait Overview / 流 Trait 概览

A Stream is to Iterator what Future is to a single value — it yields multiple values asynchronously:

Stream 之于 Iterator,正如 Future 之于单个值 —— 它异步地产生多个值:

#![allow(unused)]
fn main() {
// std::iter::Iterator (synchronous, multiple values)
// std::iter::Iterator(同步,多个值)
trait Iterator {
    type Item;
    fn next(&mut self) -> Option<Self::Item>;
}

// futures::Stream (async, multiple values)
// futures::Stream(异步,多个值)
trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
}
graph LR
    subgraph "Sync"
        VAL["Value<br/>(T)"]
        ITER["Iterator<br/>(multiple T)"]
    end

    subgraph "Async"
        FUT["Future<br/>(async T)"]
        STREAM["Stream<br/>(async multiple T)"]
    end

    VAL -->|"make async"| FUT
    ITER -->|"make async"| STREAM
    VAL -->|"make multiple"| ITER
    FUT -->|"make multiple"| STREAM

    style VAL fill:#e3f2fd,color:#000
    style ITER fill:#e3f2fd,color:#000
    style FUT fill:#c8e6c9,color:#000
    style STREAM fill:#c8e6c9,color:#000

Creating Streams / 创建流

#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;

// 1. From an iterator / 从迭代器转换
let s = stream::iter(vec![1, 2, 3]);

// 2. From an async generator (using async_stream crate) / 从异步生成器(使用 async-stream 库)
// Cargo.toml: async-stream = "0.3"
use async_stream::stream;

fn countdown(from: u32) -> impl futures::Stream<Item = u32> {
    stream! {
        for i in (0..=from).rev() {
            tokio::time::sleep(Duration::from_millis(500)).await;
            yield i;
        }
    }
}

// 3. From a tokio interval / 从 tokio 定时器
let tick_stream = IntervalStream::new(interval(Duration::from_secs(1)));

// 4. From a channel receiver (tokio_stream::wrappers) / 从通道接收端
let (tx, rx) = tokio::sync::mpsc::channel::<String>(100);
let rx_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

// 5. From unfold (generate from async state) / 从 unfold(从异步状态生成)
let s = stream::unfold(0u32, |state| async move {
    if state >= 5 {
        None // Stream ends / 流结束
    } else {
        let next = state + 1;
        Some((state, next)) // yield `state`, new state is `next` / 产出 `state`,新状态为 `next`
    }
});
}

Consuming Streams / 消费流

#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};

async fn stream_examples() {
    let s = stream::iter(vec![1, 2, 3, 4, 5]);

    // for_each — process each item / 处理每一项
    s.for_each(|x| async move {
        println!("{x}");
    }).await;

    // map + collect / 映射 + 收集
    let doubled: Vec<i32> = stream::iter(vec![1, 2, 3])
        .map(|x| x * 2)
        .collect()
        .await;

    // filter / 过滤
    let evens: Vec<i32> = stream::iter(1..=10)
        .filter(|x| futures::future::ready(x % 2 == 0))
        .collect()
        .await;

    // buffer_unordered — process N items concurrently / 并发处理 N 个项
    let results: Vec<_> = stream::iter(vec!["url1", "url2", "url3"])
        .map(|url| async move {
            // Simulate HTTP fetch / 模拟 HTTP 请求
            tokio::time::sleep(Duration::from_millis(100)).await;
            format!("response from {url}")
        })
        .buffer_unordered(10) // Up to 10 concurrent fetches / 最多 10 个并发请求
        .collect()
        .await;

    // take, skip, zip, chain — just like Iterator / 与迭代器类似
    let first_three: Vec<i32> = stream::iter(1..=100)
        .take(3)
        .collect()
        .await;
}
}

Comparison with C# IAsyncEnumerable / 与 C# IAsyncEnumerable 的比较

Feature / 特性Rust StreamC# IAsyncEnumerable<T>
Syntax / 语法stream! { yield x; }await foreach / yield return
Cancellation / 取消Drop the stream / 丢弃流CancellationToken
Backpressure / 背压Consumer controls poll rate / 消费者控制轮询速率Consumer controls MoveNextAsync / 消费者控制 MoveNextAsync
Built-in / 内置支持No (needs futures crate) / 否(需要 futures 库)Yes (since C# 8.0) / 是(自 C# 8.0 起)
Combinators / 组合器.map(), .filter(), .buffer_unordered()LINQ + System.Linq.Async
Error handling / 错误处理Stream<Item = Result<T, E>>Throw in async iterator / 异步迭代器中抛异常
#![allow(unused)]
fn main() {
// Rust: Stream of database rows / Rust:数据库行流
// NOTE: try_stream! (not stream!) is required when using ? inside the body.
// 注意:如果在函数体中使用 ?,需要使用 try_stream! 而不是 stream!。
fn get_users(db: &Database) -> impl Stream<Item = Result<User, DbError>> + '_ {
    try_stream! {
        let mut cursor = db.query("SELECT * FROM users").await?;
        while let Some(row) = cursor.next().await {
            yield User::from_row(row?);
        }
    }
}

// Consume: / 消费:
let mut users = pin!(get_users(&db));
while let Some(result) = users.next().await {
    match result {
        Ok(user) => println!("{}", user.name),
        Err(e) => eprintln!("Error: {e}"),
    }
}
}
🏋️ Exercise: Build an Async Stats Aggregator / 练习:构建异步统计聚合器 (点击展开)

Challenge: Given a stream of sensor readings Stream<Item = f64>, write an async function that consumes the stream and returns (count, min, max, average). Use StreamExt combinators — don’t just collect into a Vec.

挑战:给定一个传感器读数流 Stream<Item = f64>,编写一个异步函数来消费该流,并返回 (count, min, max, average)(计数、最小值、最大值、平均值)。请使用 StreamExt 组合器 —— 不要只是简单地 collect 到 Vec 中。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};

struct Stats {
    count: usize,
    min: f64,
    max: f64,
    sum: f64,
}

impl Stats {
    fn average(&self) -> f64 {
        if self.count == 0 { 0.0 } else { self.sum / self.count as f64 }
    }
}

async fn compute_stats<S: futures::Stream<Item = f64> + Unpin>(stream: S) -> Stats {
    stream
        .fold(
            Stats { count: 0, min: f64::INFINITY, max: f64::NEG_INFINITY, sum: 0.0 },
            |mut acc, value| async move {
                acc.count += 1;
                acc.min = acc.min.min(value);
                acc.max = acc.max.max(value);
                acc.sum += value;
                acc
            },
        )
        .await
}
}

Key takeaway: Stream combinators like .fold() process items one-at-a-time without collecting into memory — essential for processing large or unbounded data streams.

关键点:像 .fold() 这样的流组合器会逐项处理数据,而无需将其全部载入内存 —— 这对于处理大规模或无界数据流至关重要。

Async I/O Traits: AsyncRead, AsyncWrite, AsyncBufRead / 异步 I/O Trait

Just as std::io::Read/Write are the foundation of synchronous I/O, their async counterparts are the foundation of async I/O. These traits are provided by tokio::io (or futures::io for runtime-agnostic code):

就像 std::io::Read/Write 是同步 I/O 的基石,其对应的异步版本则是异步 I/O 的核心。这些 trait 由 tokio::io 提供(或在运行时无关的代码中使用 futures::io):

#![allow(unused)]
fn main() {
// tokio::io — the async versions of std::io traits
// tokio::io —— std::io trait 的异步版本

/// Read bytes from a source asynchronously / 从源异步读取字节
pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,  // Tokio's safe wrapper around uninitialized memory
                                  // Tokio 用于处理未初始化内存的安全封装
    ) -> Poll<io::Result<()>>;
}

/// Write bytes to a sink asynchronously / 向接收端异步写入字节
pub trait AsyncWrite {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>>;

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
}

In practice, you rarely call these poll_* methods directly. Instead, use the extension traits AsyncReadExt and AsyncWriteExt which provide .await-friendly helper methods:

在实践中,你很少直接调用这些 poll_* 方法。相反,你会使用扩展 trait AsyncReadExtAsyncWriteExt,它们提供了支持 .await 的便捷方法:

#![allow(unused)]
fn main() {
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt};
use tokio::net::TcpStream;
use tokio::io::BufReader;

async fn io_examples() -> tokio::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;

    // AsyncWriteExt: write_all, write_u32, write_buf, etc.
    stream.write_all(b"GET / HTTP/1.0\r\n\r\n").await?;

    // AsyncReadExt: read, read_exact, read_to_end, read_to_string
    let mut response = Vec::new();
    stream.read_to_end(&mut response).await?;

    // AsyncBufReadExt: read_line, lines(), split()
    let file = tokio::fs::File::open("config.txt").await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();
    while let Some(line) = lines.next_line().await? {
        println!("{line}");
    }

    Ok(())
}
}
Sync Trait / 同步 TraitAsync Trait (tokio) / 异步 (tokio)Async Trait (futures) / 异步 (futures)Extension Trait / 扩展 Trait
std::io::Readtokio::io::AsyncReadfutures::io::AsyncReadAsyncReadExt
std::io::Writetokio::io::AsyncWritefutures::io::AsyncWriteAsyncWriteExt
std::io::BufReadtokio::io::AsyncBufReadfutures::io::AsyncBufReadAsyncBufReadExt
std::io::Seektokio::io::AsyncSeekfutures::io::AsyncSeekAsyncSeekExt

tokio vs futures I/O traits: They’re similar but not identical — tokio’s AsyncRead uses ReadBuf (handles uninitialized memory safely), while futures::AsyncRead uses &mut [u8]. Use tokio_util::compat to convert between them.

tokio 对比 futures I/O trait:它们相似但不完全相同 —— tokio 的 AsyncRead 使用 ReadBuf(能安全处理未初始化的内存),而 futures::AsyncRead 使用 &mut [u8]。可以使用 tokio_util::compat 在两者之间进行转换。

🏋️ Exercise: Build an Async Line Counter / 练习:构建异步行计数器 (点击展开)

Challenge: Write an async function that takes any AsyncBufRead source and returns the number of non-empty lines.

挑战:编写一个异步函数,接收任何 AsyncBufRead 源,并返回其中非空行的数量。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use tokio::io::AsyncBufReadExt;

async fn count_non_empty_lines<R: tokio::io::AsyncBufRead + Unpin>(
    reader: R,
) -> tokio::io::Result<usize> {
    let mut lines = reader.lines();
    let mut count = 0;
    while let Some(line) = lines.next_line().await? {
        if !line.is_empty() {
            count += 1;
        }
    }
    Ok(count)
}
}

Key takeaway: By programming against AsyncBufRead instead of a concrete type, your I/O code is reusable across files, sockets, pipes, and even in-memory buffers.

关键点:通过面向 AsyncBufRead 编程而不是具体类型,你的 I/O 代码可以在文件、socket、管道甚至内存缓冲区之间无缝复用。

Key Takeaways — Streams and AsyncIterator / 关键要点:流与异步迭代器

  • Stream is the async equivalent of Iterator — yields Poll::Ready(Some(item)) or Poll::Ready(None) / StreamIterator 的异步等价版本 —— 它产出 Poll::Ready(Some(item))Poll::Ready(None)
  • .buffer_unordered(N) processes N stream items concurrently — the key concurrency tool for streams / .buffer_unordered(N) 合发处理 N 个流项 —— 这是流并发的核心工具
  • async_stream::stream! is the easiest way to create custom streams (uses yield) / async_stream::stream! 是创建自定义流最简单的方式(使用 yield
  • AsyncRead/AsyncBufRead enable generic, reusable I/O code across files, sockets, and pipes / AsyncRead/AsyncBufRead 使得在文件、socket 和管道间编写通用、可复用的 I/O 代码成为可能

See also / 延伸阅读: Ch 9 — When Tokio Isn’t the Right Fit / 第 9 章:Tokio 不适用的场景 for FuturesUnordered (related pattern), Ch 13 — Production Patterns / 第 13 章:生产模式 for backpressure with bounded channels


12. Common Pitfalls / 12. 常见陷阱 🔴

What you’ll learn / 你将学到:

  • 9 common async Rust bugs and how to fix each one / 9 种常见的异步 Rust Bug 及其修复方法
  • Why blocking the executor is the #1 mistake (and how spawn_blocking fixes it) / 为什么阻塞执行器是头号错误(以及 spawn_blocking 如何修复它)
  • Cancellation hazards: what happens when a future is dropped mid-await / 取消隐患:当 future 在 await 中途被 drop 时会发生什么
  • Debugging: tokio-console, tracing, #[instrument] / 调试工具:tokio-consoletracing#[instrument]
  • Testing: #[tokio::test], time::pause(), trait-based mocking / 测试:#[tokio::test]time::pause()、基于 trait 的 Mock

Blocking the Executor / 阻塞执行器

The #1 mistake in async Rust: running blocking code on the async executor thread. This starves other tasks.

异步 Rust 中的头号错误:在异步执行器线程上运行阻塞代码。这会导致其他任务由于得不到调度而“饿死”。

#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the entire executor thread
// ❌ 错误:阻塞了整个执行器线程
async fn bad_handler() -> String {
    let data = std::fs::read_to_string("big_file.txt").unwrap(); // BLOCKS!
                                                                    // 阻塞!
    process(&data)
}

// ✅ CORRECT: Offload blocking work to a dedicated thread pool
// ✅ 正确:将阻塞工作卸载到专门的线程池
async fn good_handler() -> String {
    let data = tokio::task::spawn_blocking(|| {
        std::fs::read_to_string("big_file.txt").unwrap()
    }).await.unwrap();
    process(&data)
}

// ✅ ALSO CORRECT: Use tokio's async fs
// ✅ 同样正确:使用 tokio 的异步文件系统接口
async fn also_good_handler() -> String {
    let data = tokio::fs::read_to_string("big_file.txt").await.unwrap();
    process(&data)
}
}
graph TB
    subgraph "❌ Blocking Call on Executor"
        T1_BAD["Thread 1: std::fs::read()<br/>🔴 BLOCKED for 500ms"]
        T2_BAD["Thread 2: handling requests<br/>🟢 Working alone"]
        TASKS_BAD["100 pending tasks<br/>⏳ Starved"]
        T1_BAD -->|"can't poll"| TASKS_BAD
    end

    subgraph "✅ spawn_blocking"
        T1_GOOD["Thread 1: polling futures<br/>🟢 Available"]
        T2_GOOD["Thread 2: polling futures<br/>🟢 Available"]
        BT["Blocking pool thread:<br/>std::fs::read()<br/>🔵 Separate pool"]
        TASKS_GOOD["100 tasks<br/>✅ All making progress"]
        T1_GOOD -->|"polls"| TASKS_GOOD
        T2_GOOD -->|"polls"| TASKS_GOOD
    end

std::thread::sleep vs tokio::time::sleep

#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the executor thread for 5 seconds
// ❌ 错误:阻塞执行器线程 5 秒钟
async fn bad_delay() {
    std::thread::sleep(Duration::from_secs(5)); // Thread can't poll anything else!
                                                // 线程无法轮询任何其他任务!
}

// ✅ CORRECT: Yields to the executor, other tasks can run
// ✅ 正确:礼让执行器,允许其他任务运行
async fn good_delay() {
    tokio::time::sleep(Duration::from_secs(5)).await; // Non-blocking!
                                                       // 非阻塞!
}
}

Holding MutexGuard Across .await / 跨越 .await 持有 MutexGuard

#![allow(unused)]
fn main() {
use std::sync::Mutex; // std Mutex — NOT async-aware
                       // 标准库 Mutex —— 无法感知异步

// ❌ WRONG: MutexGuard held across .await
// ❌ 错误:跨越 .await 持有 MutexGuard
async fn bad_mutex(data: &Mutex<Vec<String>>) {
    let mut guard = data.lock().unwrap();
    guard.push("item".into());
    some_io().await; // 💥 Guard is held here — blocks other threads from locking!
                     // 💥 此处仍持有 guard —— 会阻止其他线程加锁!
    guard.push("another".into());
}
// Also: std::sync::MutexGuard is !Send, so this won't compile
// with tokio's multi-threaded runtime.

// ✅ FIX 1: Scope the guard to drop before .await
// ✅ 修复 1:缩减 guard 作用域,使其在 .await 前释放
async fn good_mutex_scoped(data: &Mutex<Vec<String>>) {
    {
        let mut guard = data.lock().unwrap();
        guard.push("item".into());
    } // Guard dropped here / Guard 在此释放
    some_io().await; // Safe — lock is released / 安全 —— 锁已释放
    {
        let mut guard = data.lock().unwrap();
        guard.push("another".into());
    }
}

// ✅ FIX 2: Use tokio::sync::Mutex (async-aware)
// ✅ 修复 2:使用 tokio::sync::Mutex (支持异步)
use tokio::sync::Mutex as AsyncMutex;

async fn good_async_mutex(data: &AsyncMutex<Vec<String>>) {
    let mut guard = data.lock().await; // Async lock — doesn't block the thread
                                       // 异步锁 —— 不会阻塞线程
    guard.push("item".into());
    some_io().await; // OK — tokio Mutex guard is Send
                     // 没问题 —— tokio Mutex 的 guard 是满足 Send 的
    guard.push("another".into());
}
}

When to use which Mutex / 何时使用哪种 Mutex

  • std::sync::Mutex: Short critical sections with no .await inside / 临界区非常短,且内部没有 .await
  • tokio::sync::Mutex: When you must hold the lock across .await points / 必须跨越 .await 点持有锁时
  • parking_lot::Mutex: Drop-in std replacement, faster, smaller, still no .await / 标准库的直接替代方案,更快,更小,但仍不能在内部使用 .await

Cancellation Hazards / 取消隐患

Dropping a future cancels it — but this can leave things in an inconsistent state:

丢弃(Drop)一个 future 即意味着取消它 —— 但这可能会导致状态不一致:

#![allow(unused)]
fn main() {
// ❌ DANGEROUS: Resource leak on cancellation
// ❌ 危险:取消时可能发生资源泄漏/数据丢失
async fn transfer(from: &Account, to: &Account, amount: u64) {
    from.debit(amount).await;  // If cancelled HERE... / 如果在此处被取消...
    to.credit(amount).await;   // ...money vanishes! / ...钱就不翼而飞了!
}

// ✅ SAFE: Make operations atomic or use compensation
// ✅ 安全:使操作具有原子性或使用补偿机制
async fn safe_transfer(from: &Account, to: &Account, amount: u64) -> Result<(), Error> {
    // Use a database transaction (all-or-nothing)
    // 使用数据库事务(全成功或全失败)
    let tx = db.begin_transaction().await?;
    tx.debit(from, amount).await?;
    tx.credit(to, amount).await?;
    tx.commit().await?; // Only commits if everything succeeded
                        // 仅当所有操作都成功时才提交
    Ok(())
}

// ✅ ALSO SAFE: Use tokio::select! with cancellation awareness
// ✅ 同样安全:使用 tokio::select! 并具备取消感知能力
tokio::select! {
    result = transfer(from, to, amount) => {
        // Transfer completed
        // 传输完成
    }
    _ = shutdown_signal() => {
        // Don't cancel mid-transfer — let it finish
        // Or: roll back explicitly
        // 不要在中途取消传输 —— 让它完成
        // 或者:显式回滚
    }
}
}

No Async Drop / 没有异步 Drop

Rust’s Drop trait is synchronous — you cannot .await inside drop(). This is a frequent source of confusion:

Rust 的 Drop trait 是同步的 —— 你 不能drop() 内部使用 .await。这是非常典型的困惑来源:

#![allow(unused)]
fn main() {
struct DbConnection { /* ... */ }

impl Drop for DbConnection {
    fn drop(&mut self) {
        // ❌ Can't do this — drop() is sync!
        // ❌ 不能这样做 —— drop() 是同步的!
        // self.connection.shutdown().await;

        // ✅ Workaround 1: Spawn a cleanup task (fire-and-forget)
        // ✅ 方案 1:派生一个清理任务(发完不理)
        let conn = self.connection.take();
        tokio::spawn(async move {
            let _ = conn.shutdown().await;
        });

        // ✅ Workaround 2: Use a synchronous close
        // ✅ 方案 2:使用同步关闭
        // self.connection.blocking_close();
    }
}
}

Best practice / 最佳实践:提供一个显式的 async fn close(self) 方法,并在文档中说明调用者应当使用它。仅将 Drop 用作兜底的安全网,而不要将其作为主要的清理路径。

select! Fairness and Starvation / select! 公平性与饿死

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

// ❌ UNFAIR: busy_stream always wins, slow_stream starves
// ❌ 不公平:busy_stream 总是获胜,slow_stream 会饿死
async fn unfair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
    loop {
        tokio::select! {
            Some(v) = fast.recv() => println!("fast: {v}"),
            Some(v) = slow.recv() => println!("slow: {v}"),
            // If both are ready, tokio randomly picks one.
            // But if `fast` is ALWAYS ready, `slow` rarely gets polled.
            // 如果两者都准备就绪,tokio 会随机选择一个。
            // 但如果 `fast` 总是准备就绪,`slow` 很少有机会被轮询。
        }
    }
}

// ✅ FAIR: Use biased select or drain in batches
// ✅ 公平:使用 biased 模式或分批处理
async fn fair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
    loop {
        tokio::select! {
            biased; // Always check in order — explicit priority
                    // 总是按顺序检查 —— 显式优先级

            Some(v) = slow.recv() => println!("slow: {v}"),  // Priority! / 优先!
            Some(v) = fast.recv() => println!("fast: {v}"),
        }
    }
}
}

Accidental Sequential Execution / 无意的顺序执行

#![allow(unused)]
fn main() {
// ❌ SEQUENTIAL: Takes 2 seconds total
// ❌ 顺序执行:总共耗时 2 秒
async fn slow() {
    let a = fetch("url_a").await; // 1 second
                                  // 1 秒
    let b = fetch("url_b").await; // 1 second (waits for a to finish first!)
                                  // 1 秒(等待 a 完成后才开始!)
}

// ✅ CONCURRENT: Takes 1 second total
// ✅ 并发执行:总共耗时 1 秒
async fn fast() {
    let (a, b) = tokio::join!(
        fetch("url_a"), // Both start immediately / 两者同时开始
        fetch("url_b"),
    );
}

// ✅ ALSO CONCURRENT: Using let + join
// ✅ 同样并发:使用 let + join
async fn also_fast() {
    let fut_a = fetch("url_a"); // Create future (lazy — not started yet)
                                // 创建 future(惰性 —— 尚未开始)
    let fut_b = fetch("url_b"); // Create future
                                // 创建 future
    let (a, b) = tokio::join!(fut_a, fut_b); // NOW both run concurrently
                                            // 现在两者并发运行
}
}

Trap / 陷阱let a = fetch(url).await; let b = fetch(url).await; 是串行的! The second .await doesn’t start until the first finishes. Use join! or spawn for concurrency. 第二个 .await 直到第一个完成后才会开始。请使用 join!spawn 来实现真正的并发。

Case Study: Debugging a Hung Production Service / 案例分析:调试卡死的生产服务

A real-world scenario: a service handles requests fine for 10 minutes, then stops responding. No errors in logs. CPU at 0%.

真实场景:一个服务在前 10 分钟运行良好,然后突然停止响应。日志中没有错误,CPU 占用率为 0%。

Diagnosis steps / 诊断步骤:

  1. Attach tokio-console — reveals 200+ tasks stuck in Pending state / 挂载 tokio-console —— 发现 200 多个任务卡在 Pending 状态
  2. Check task details — all waiting on the same Mutex::lock().await / 检查任务详情 —— 全部在等待同一个 Mutex::lock().await
  3. Root cause — one task held a std::sync::MutexGuard across an .await and panicked, poisoning the mutex. All other tasks now fail on lock().unwrap() 根因 —— 某个任务在跨越 .await 时持有了 std::sync::MutexGuard 并发生了 panic,导致 mutex 被投毒(poisoning)。所有其他任务现在在 lock().unwrap() 上失败。

The fix / 修复方案:

Before (broken) / 之前 (有问题)After (fixed) / 之后 (已修复)
std::sync::Mutextokio::sync::Mutex
.lock().unwrap() across .awaitScope lock before .await / 在 .await 之前缩小锁的作用域
No timeout on lock acquisitiontokio::time::timeout(dur, mutex.lock()) / 为加锁操作添加超时
No recovery on poisoned mutextokio::sync::Mutex doesn’t poison / tokio::sync::Mutex 不会投毒

Prevention checklist / 预防清单:

  • Use tokio::sync::Mutex if the guard crosses any .await / 如果 guard 跨越任何 .await,请使用 tokio::sync::Mutex
  • Add #[tracing::instrument] to async functions for span tracking / 为异步函数添加 #[tracing::instrument] 以进行 span 跟踪
  • Run tokio-console in staging to catch hung tasks early / 在 staging 环境中运行 tokio-console 以尽早发现卡住的任务
  • Add health check endpoints that verify task responsiveness / 添加健康检查端点以验证任务响应能力
🏋️ Exercise: Spot the Bugs / 练习:找 Bug (click to expand / 点击展开)

Challenge: Find all the async pitfalls in this code and fix them.

挑战:找出这段代码中所有的异步陷阱并修复它们。

#![allow(unused)]
fn main() {
use std::sync::Mutex;

async fn process_requests(urls: Vec<String>) -> Vec<String> {
    let results = Mutex::new(Vec::new());
    
    for url in &urls {
        let response = reqwest::get(url).await.unwrap().text().await.unwrap();
        std::thread::sleep(std::time::Duration::from_millis(100)); // Rate limit
                                                                    // 限速
        let mut guard = results.lock().unwrap();
        guard.push(response);
        expensive_parse(&guard).await; // Parse all results so far
                                       // 解析目前所有结果
    }
    
    results.into_inner().unwrap()
}
}
🔑 Solution / 参考答案

Bugs found / 发现的 Bug:

  1. Sequential fetches — URLs are fetched one at a time instead of concurrently 顺序抓取 —— URL 是一个接一个抓取的,而不是并发的
  2. std::thread::sleep — Blocks the executor thread std::thread::sleep —— 阻塞了执行器线程
  3. MutexGuard held across .awaitguard is alive when expensive_parse is awaited MutexGuard 跨越 .await 持有 —— 在 await expensive_parse 时,guard 依然存活
  4. No concurrency — Should use join! or FuturesUnordered 没有并发 —— 应该使用 join!FuturesUnordered
#![allow(unused)]
fn main() {
use tokio::sync::Mutex;
use std::sync::Arc;
use futures::stream::{self, StreamExt};

async fn process_requests(urls: Vec<String>) -> Vec<String> {
    // Fix 4: Process URLs concurrently with buffer_unordered
    // 修复 4:使用 buffer_unordered 并发处理 URL
    let results: Vec<String> = stream::iter(urls)
        .map(|url| async move {
            let response = reqwest::get(&url).await.unwrap().text().await.unwrap();
            // Fix 2: Use tokio::time::sleep instead of std::thread::sleep
            // 修复 2:使用 tokio::time::sleep 替代 std::thread::sleep
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            response
        })
        .buffer_unordered(10) // Up to 10 concurrent requests
                              // 最多 10 个并发请求
        .collect()
        .await;

    // Fix 3: Parse after collecting — no mutex needed at all!
    // 修复 3:收集后解析 —— 完全不需要 mutex!
    for result in &results {
        expensive_parse(result).await;
    }

    results
}
}

Key takeaway / 关键要点:Often you can restructure async code to eliminate mutexes entirely. Collect results with streams/join, then process. Simpler, faster, no deadlock risk. 通常你可以重构异步代码以完全消除 mutex。使用 streams/join 收集结果,然后进行处理。这样更简单、更快,没有死锁风险。


Debugging Async Code / 调试异步代码

Async stack traces are notoriously cryptic — they show the executor’s poll loop rather than your logical call chain. Here are the essential debugging tools.

异步调用栈的跟踪记录是出了名的难懂 —— 它们显示的是执行器的轮询循环,而不是你的逻辑调用链。以下是必不可少的调试工具。

tokio-console: Real-Time Task Inspector / 任务实时检查器

tokio-console gives you an htop-like view of every spawned task: its state, poll duration, waker activity, and resource usage.

tokio-console 为你提供了一个类似于 htop 的界面,可以查看每一个派生的任务:其状态、轮询持续时间、唤醒器活动和资源使用情况。

# Cargo.toml
[dependencies]
console-subscriber = "0.4"
tokio = { version = "1", features = ["full", "tracing"] }
#[tokio::main]
async fn main() {
    console_subscriber::init(); // Replaces the default tracing subscriber
                                // 替换默认的 tracing 订阅者
    // ... rest of your application
    // ... 应用程序的其余部分
}

Then in another terminal:

然后在另一个终端中:

$ RUSTFLAGS="--cfg tokio_unstable" cargo run   # Required compile-time flag
                                               # 必需的编译时标志
$ tokio-console                                # Connects to 127.0.0.1:6669
                                               # 连接到 127.0.0.1:6669

tracing + #[instrument]: Structured Logging for Async / 异步结构化日志

The tracing crate understands Future lifetimes. Spans stay open across .await points, giving you a logical call stack even when the OS thread has moved on:

tracing 库能够理解 Future 的生命周期。Span 会跨越 .await 点保持开启状态,即使操作系统线程已经转移,也能为你提供逻辑调用栈:

#![allow(unused)]
fn main() {
use tracing::{info, instrument};

#[instrument(skip(db_pool), fields(user_id = %user_id))]
async fn handle_request(user_id: u64, db_pool: &Pool) -> Result<Response> {
    info!("looking up user");
    let user = db_pool.get_user(user_id).await?;  // span stays open across .await
                                                  // span 跨越 .await 保持开启
    info!(email = %user.email, "found user");
    let orders = fetch_orders(user_id).await?;     // still the same span
                                                   // 仍然是同一个 span
    Ok(build_response(user, orders))
}
}

Output (with tracing_subscriber::fmt::json()):

输出(使用 tracing_subscriber::fmt::json()):

{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"message":"looking up user"}
{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"fields":{"email":"a@b.com"},"message":"found user"}

Debugging Checklist / 调试清单

Symptom / 症状Likely Cause / 可能原因Tool / 工具
Task hangs forever / 任务永久挂起Missing .await or deadlocked Mutex / 缺少 .awaitMutex 死锁tokio-console task view / tokio-console 任务视图
Low throughput / 低吞吐量Blocking call on async thread / 异步线程上的阻塞调用tokio-console poll-time histogram / tokio-console 轮询时间直方图
Future is not SendNon-Send type held across .await / 跨越 .await 持有非 Send 类型Compiler error + #[instrument] to locate / 编译器错误 + #[instrument] 定位
Mysterious cancellation / 神秘取消Parent select! dropped a branch / 父级 select! 丢弃了一个分支tracing span lifecycle events / tracing span 生命周期事件

Tip / 提示:Enable RUSTFLAGS="--cfg tokio_unstable" to get task-level metrics in tokio-console. This is a compile-time flag, not a runtime one. 启用 RUSTFLAGS="--cfg tokio_unstable" 以在 tokio-console 中获取任务级指标。这是一个编译时标志,而不是运行时标志。

Testing Async Code / 测试异步代码

Async code introduces unique testing challenges — you need a runtime, time control, and strategies for testing concurrent behavior.

异步代码带来了独特的测试挑战 —— 你需要运行时、时间控制以及测试并发行为的策略。

Basic async tests with #[tokio::test]:

使用 #[tokio::test] 编写基础异步测试:

#![allow(unused)]
fn main() {
// Cargo.toml
// [dev-dependencies]
// tokio = { version = "1", features = ["full", "test-util"] }

#[tokio::test]
async fn test_basic_async() {
    let result = fetch_data().await;
    assert_eq!(result, "expected");
}

// Single-threaded test (useful for !Send types):
// 单线程测试(对 !Send 类型有用):
#[tokio::test(flavor = "current_thread")]
async fn test_single_threaded() {
    let rc = std::rc::Rc::new(42);
    let val = async { *rc }.await;
    assert_eq!(val, 42);
}

// Multi-threaded with explicit worker count:
// 多线程测试,带显式工作线程数:
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_behavior() {
    // Tests race conditions with real concurrency
    // 测试真实并发下的竞态条件
    let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
    let c1 = counter.clone();
    let c2 = counter.clone();
    let (a, b) = tokio::join!(
        tokio::spawn(async move { c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
        tokio::spawn(async move { c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
    );
    a.unwrap();
    b.unwrap();
    assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 2);
}
}

Time manipulation / 时间操控 — test timeouts without actually waiting:

时间控制 —— 无需实际等待即可测试超时:

#![allow(unused)]
fn main() {
use tokio::time::{self, Duration, Instant};

#[tokio::test]
async fn test_timeout_behavior() {
    // Pause time — sleep() advances instantly, no real wall-clock delay
    // 暂停时间 —— sleep() 立即前进,没有实际的挂钟延迟
    time::pause();

    let start = Instant::now();
    time::sleep(Duration::from_secs(3600)).await; // "waits" 1 hour — takes 0ms
                                                  // “等待” 1 小时 —— 实际耗时 0ms
    assert!(start.elapsed() >= Duration::from_secs(3600));
    // Test ran in milliseconds, not an hour!
    // 测试在毫秒内运行,而不是一小时!
}

#[tokio::test]
async fn test_retry_timing() {
    time::pause();

    // Test that our retry logic waits the expected durations
    // 测试我们的重试逻辑等待了预期的持续时间
    let start = Instant::now();
    let result = retry_with_backoff(|| async {
        Err::<(), _>("simulated failure")
    }, 3, Duration::from_secs(1))
    .await;

    assert!(result.is_err());
    // 1s + 2s + 4s = 7s of backoff (exponential)
    // 1 秒 + 2 秒 + 4 秒 = 7 秒的退避(指数级)
    assert!(start.elapsed() >= Duration::from_secs(7));
}

#[tokio::test]
async fn test_deadline_exceeded() {
    time::pause();

    let result = tokio::time::timeout(
        Duration::from_secs(5),
        async {
            // Simulate slow operation
            // 模拟慢速操作
            time::sleep(Duration::from_secs(10)).await;
            "done"
        }
    ).await;

    assert!(result.is_err()); // Timed out
                              // 超时
}
}

Mocking async dependencies — use trait objects or generics:

模拟异步依赖 —— 使用 trait 对象或泛型:

#![allow(unused)]
fn main() {
// Define a trait for the dependency:
// 定义依赖的 trait:
trait Storage {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String);
}

// Production implementation:
// 生产实现:
struct RedisStorage { /* ... */ }
impl Storage for RedisStorage {
    async fn get(&self, key: &str) -> Option<String> {
        // Real Redis call
        // 真实的 Redis 调用
        todo!()
    }
    async fn set(&self, key: &str, value: String) {
        todo!()
    }
}

// Test mock:
// 测试 Mock:
struct MockStorage {
    data: std::sync::Mutex<std::collections::HashMap<String, String>>,
}

impl MockStorage {
    fn new() -> Self {
        MockStorage { data: std::sync::Mutex::new(std::collections::HashMap::new()) }
    }
}

impl Storage for MockStorage {
    async fn get(&self, key: &str) -> Option<String> {
        self.data.lock().unwrap().get(key).cloned()
    }
    async fn set(&self, key: &str, value: String) {
        self.data.lock().unwrap().insert(key.to_string(), value);
    }
}

// Tested function is generic over Storage:
// 被测函数是 Storage 的泛型:
async fn cache_lookup<S: Storage>(store: &S, key: &str) -> String {
    match store.get(key).await {
        Some(val) => val,
        None => {
            let val = "computed".to_string();
            store.set(key, val.clone()).await;
            val
        }
    }
}

#[tokio::test]
async fn test_cache_miss_then_hit() {
    let mock = MockStorage::new();

    // First call: miss → computes and stores
    // 第一次调用:未命中 → 计算并存储
    let val = cache_lookup(&mock, "key1").await;
    assert_eq!(val, "computed");

    // Second call: hit → returns stored value
    // 第二次调用:命中 → 返回存储的值
    let val = cache_lookup(&mock, "key1").await;
    assert_eq!(val, "computed");
    assert!(mock.data.lock().unwrap().contains_key("key1"));
}
}

Testing channels and task communication / 测试通道和任务通信

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_producer_consumer() {
    let (tx, mut rx) = tokio::sync::mpsc::channel(10);

    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.unwrap();
        }
        // tx dropped here — channel closes
        // tx 在此被 drop —— 通道关闭
    });

    let mut received = Vec::new();
    while let Some(val) = rx.recv().await {
        received.push(val);
    }

    assert_eq!(received, vec![0, 1, 2, 3, 4]);
}
}
Test Pattern / 测试模式When to Use / 何时使用Key Tool / 关键工具
#[tokio::test]All async tests / 所有异步测试tokio = { features = ["macros", "rt"] }
time::pause()Testing timeouts, retries, periodic tasks / 测试超时、重试、周期性任务tokio::time::pause()
Trait mocking / Trait 模拟Testing business logic without I/O / 在没有 I/O 的情况下测试业务逻辑Generic <S: Storage> / 泛型 <S: Storage>
current_thread flavor / current_thread 模式Testing !Send types or deterministic scheduling / 测试 !Send 类型或确定性调度#[tokio::test(flavor = "current_thread")]
multi_thread flavor / multi_thread 模式Testing race conditions / 测试竞态条件#[tokio::test(flavor = "multi_thread")]

Key Takeaways — Common Pitfalls / 关键要点:常见陷阱

  • Never block the executor — use spawn_blocking for CPU/sync work / 永远不要阻塞执行器 —— 对 CPU 密集型/同步工作使用 spawn_blocking
  • Never hold a MutexGuard across .await — scope locks tightly or use tokio::sync::Mutex / 永远不要跨越 .await 持有 MutexGuard —— 严格控制锁的作用域或使用 tokio::sync::Mutex
  • Cancellation drops the future instantly — use “cancel-safe” patterns for partial operations / 取消会立即 drop future —— 对部分操作使用“取消安全”模式
  • Use tokio-console and #[tracing::instrument] for debugging async code / 使用 tokio-console#[tracing::instrument] 调试异步代码
  • Test async code with #[tokio::test] and time::pause() for deterministic timing / 使用 #[tokio::test]time::pause() 测试异步代码以实现确定性计时

See also / 延伸阅读: Ch 8 — Tokio Deep Dive / 第 8 章:Tokio 深入解析 for sync primitives, Ch 13 — Production Patterns / 第 13 章:生产模式 for graceful shutdown and structured concurrency


13. Production Patterns / 13. 生产模式 🔴

What you’ll learn / 你将学到:

  • Graceful shutdown with watch channels and select! / 使用 watch 通道和 select! 实现优雅停机
  • Backpressure: bounded channels prevent OOM / 背压机制:有界通道防止内存溢出 (OOM)
  • Structured concurrency: JoinSet and TaskTracker / 结构化并发:JoinSetTaskTracker
  • Timeouts, retries, and exponential backoff / 超时、重试与指数级退避
  • Error handling: thiserror vs anyhow, the double-? pattern / 错误处理:thiserror 对比 anyhow,“双问号”模式
  • Tower: the middleware pattern used by axum, tonic, and hyper / Tower:axum、tonic 和 hyper 使用的中间件模式

Graceful Shutdown / 优雅停机

Production servers must shut down cleanly — finish in-flight requests, flush buffers, close connections:

生产环境的服务器必须能够干净地关闭 —— 完成正在进行的请求、刷新缓冲区、关闭连接:

use tokio::signal;
use tokio::sync::watch;

async fn main_server() {
    // Create a shutdown signal channel / 创建一个停机信号通道
    let (shutdown_tx, shutdown_rx) = watch::channel(false);

    // Spawn the server / 启动服务器
    let server_handle = tokio::spawn(run_server(shutdown_rx.clone()));

    // Wait for Ctrl+C / 等待 Ctrl+C 信号
    signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
    println!("Shutdown signal received, finishing in-flight requests...");

    // Notify all tasks to shut down / 通知所有任务关闭
    shutdown_tx.send(true).unwrap();

    // Wait for server to finish (with timeout) / 等待服务器完成(带超时限制)
    match tokio::time::timeout(
        std::time::Duration::from_secs(30),
        server_handle,
    ).await {
        Ok(Ok(())) => println!("Server shut down gracefully"),
        Ok(Err(e)) => eprintln!("Server error: {e}"),
        Err(_) => eprintln!("Server shutdown timed out — forcing exit"),
    }
}
sequenceDiagram
    participant OS as OS Signal
    participant Main as Main Task
    participant WCH as watch Channel
    participant W1 as Worker 1
    participant W2 as Worker 2

    OS->>Main: SIGINT (Ctrl+C)
    Main->>WCH: send(true)
    WCH-->>W1: changed()
    WCH-->>W2: changed()

    Note over W1: Finish current request
    Note over W2: Finish current request

    W1-->>Main: Task complete
    W2-->>Main: Task complete
    Main->>Main: All workers done → exit

Backpressure with Bounded Channels / 有界通道提供的背压机制

Unbounded channels can lead to OOM if the producer is faster than the consumer. Always use bounded channels in production:

如果不加限制,当生产者速度快于消费者时,无界通道会导致内存溢出(OOM)。在生产环境中,请务必使用有界通道:

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

async fn backpressure_example() {
    // Bounded channel: max 100 items buffered
    // 有界通道:缓冲区最多存放 100 个项
    let (tx, mut rx) = mpsc::channel::<WorkItem>(100);

    // Producer: slows down naturally when buffer is full
    // 生产者:当缓冲区满时会自动减速
    let producer = tokio::spawn(async move {
        for i in 0..1_000_000 {
            // send() is async — waits if buffer is full
            // send() 是异步的 —— 如果缓冲区满,它会等待
            // This creates natural backpressure! / 这就产生了天然的背压!
            tx.send(WorkItem { id: i }).await.unwrap();
        }
    });

    // Consumer: processes items at its own pace
    // 消费者:按自己的步调处理项
    let consumer = tokio::spawn(async move {
        while let Some(item) = rx.recv().await {
            process(item).await; // Slow processing is OK — producer waits
                                 // 即使处理很慢也没关系 —— 生产者会等待
        }
    });

    let _ = tokio::join!(producer, consumer);
}
}

Structured Concurrency: JoinSet and TaskTracker / 结构化并发:JoinSet 与 TaskTracker

JoinSet groups related tasks and ensures they all complete:

JoinSet 可将相关任务分组,并确保它们全部完成:

#![allow(unused)]
fn main() {
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};

async fn structured_concurrency() {
    let mut set = JoinSet::new();

    // Spawn a batch of tasks / 派生一批任务
    for url in get_urls() {
        set.spawn(async move {
            fetch_and_process(url).await
        });
    }

    // Collect all results (order not guaranteed) / 收集所有结果(不保证顺序)
    let mut results = Vec::new();
    while let Some(result) = set.join_next().await {
        match result {
            Ok(Ok(data)) => results.push(data),
            Ok(Err(e)) => eprintln!("Task error: {e}"),
            Err(e) => eprintln!("Task panicked: {e}"),
        }
    }

    // ALL tasks are done here / 所有任务在此处均已完成
    println!("Processed {} items", results.len());
}
}

Timeouts and Retries / 超时与重试

#![allow(unused)]
fn main() {
use tokio::time::{timeout, sleep, Duration};

// Simple timeout / 简单的超时处理
async fn with_timeout() -> Result<Response, Error> {
    match timeout(Duration::from_secs(5), fetch_data()).await {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(e)) => Err(Error::Fetch(e)),
        Err(_) => Err(Error::Timeout),
    }
}

// Exponential backoff retry / 指数级退避重试
async fn retry_with_backoff<F, Fut, T, E>(
    max_attempts: u32,
    base_delay_ms: u64,
    operation: F,
) -> Result<T, E>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<T, E>>,
    E: std::fmt::Display,
{
    let mut delay = Duration::from_millis(base_delay_ms);

    for attempt in 1..=max_attempts {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) => {
                if attempt == max_attempts {
                    return Err(e);
                }
                sleep(delay).await;
                delay *= 2; // Exponential backoff / 指数级退避
            }
        }
    }
    unreachable!()
}
}

Production tip — add jitter / 生产环境提示 —— 加入抖动:The function above uses pure exponential backoff, but in production many clients failing simultaneously will all retry at the same intervals (thundering herd). Add random jitter so retries spread out over time.

生产建议:上述函数使用的是纯指数退避,但在生产环境中,如果许多客户端同时失败,它们可能会在相同的时间间隔重试(造成惊群效应)。请加入随机 抖动(jitter),使重试在时间上分散开。

Error Handling in Async Code / 异步代码中的错误处理

Async introduces unique error propagation challenges — spawned tasks create error boundaries, timeout errors wrap inner errors, and ? interacts differently when futures cross task boundaries.

异步带来了独特的错误传播挑战 —— 派生的任务会创建错误边界,超时错误会包装内部错误,当 future 跨越任务边界时,? 的交互方式也会有所不同。

thiserror vs anyhow — choosing the right tool:

thiserror 对比 anyhow —— 选择合适的工具:

#![allow(unused)]
fn main() {
// thiserror: Define typed errors for libraries and public APIs
// thiserror:为库和公共 API 定义强类型错误
use thiserror::Error;

#[derive(Error, Debug)]
enum DiagError {
    #[error("IPMI command failed: {0}")]
    Ipmi(#[from] IpmiError),

    #[error("Sensor {sensor} out of range: {value}°C")]
    OverTemp { sensor: String, value: f64 },

    #[error("Operation timed out")]
    Timeout,
}

// anyhow: Quick error handling for applications and prototypes
// anyhow:用于应用程序和原型的快速错误处理
use anyhow::{Context, Result};

async fn run_diagnostics() -> Result<()> {
    let config = load_config()
        .await
        .context("Failed to load diagnostic config")?; // 添加上下文信息

    run_gpu_test(&config)
        .await
        .context("GPU diagnostic failed")?; // 链式添加上下文

    Ok(())
}
}
Crate / 库Use When / 适用场景Error Type / 错误类型Matching / 匹配方式
thiserrorLibrary code, public APIs / 库代码、公共 APIenum MyError { ... }match err { MyError::Timeout => ... }
anyhowApplications, CLI tools / 应用程序、命令行工具anyhow::Errorerr.downcast_ref::<MyError>()

The double-? pattern with tokio::spawn:

使用了 tokio::spawn“双问号”模式

#![allow(unused)]
fn main() {
async fn spawn_with_errors() -> Result<String, AppError> {
    let handle = tokio::spawn(async {
        let resp = reqwest::get("https://example.com").await?;
        Ok::<_, reqwest::Error>(resp.text().await?)
    });

    // Double ?: First ? unwraps JoinError (task panic), second ? unwraps inner Result
    // 双问号:第一个 ? 解开 JoinError(任务崩溃),第二个 ? 解开内部的 Result
    let result = handle.await??;
    Ok(result)
}
}

Tower: The Middleware Pattern / Tower:中间件模式

The Tower crate defines a composable Service trait — the backbone of async middleware in Rust:

Tower 库定义了一个可组合的 Service trait —— 这是 Rust 异步中间件的基石:

#![allow(unused)]
fn main() {
use tower::{ServiceBuilder, timeout::TimeoutLayer, limit::RateLimitLayer};
use std::time::Duration;

let service = ServiceBuilder::new()
    .layer(TimeoutLayer::new(Duration::from_secs(10)))       // Outermost: timeout / 最外层:超时
    .layer(RateLimitLayer::new(100, Duration::from_secs(1))) // Then: rate limit / 然后:限流
    .service(my_handler);                                     // Innermost: your code / 最内层:你的业务代码
}

Why this matters: If you’ve used ASP.NET or Express.js middleware, Tower is the Rust equivalent. It’s how production Rust services add cross-cutting concerns without code duplication.

为什么这很重要:如果你使用过 ASP.NET 或 Express.js 的中间件,Tower 就是 Rust 中的对应方案。它是生产级 Rust 服务在不重复代码的情况下添加横切关注点的方式。

Exercise: Graceful Shutdown with Worker Pool / 练习:带有工作池的优雅停机

🏋️ Exercise (click to expand / 点击展开)

Challenge: Build a task processor with a channel-based work queue, N worker tasks, and graceful shutdown on Ctrl+C. Workers should finish in-flight work before exiting.

挑战:构建一个任务处理器,包含基于通道的工作队列、N 个工作任务,并在收到 Ctrl+C 时实现优雅停机。工作任务应在退出前完成当前正在处理的工作。

🔑 Solution / 参考答案
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};

struct WorkItem { id: u64, payload: String }

#[tokio::main]
async fn main() {
    let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
    let (shutdown_tx, shutdown_rx) = watch::channel(false);
    let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));

    let mut handles = Vec::new();
    for id in 0..4 {
        let rx = work_rx.clone();
        let mut shutdown = shutdown_rx.clone();
        handles.push(tokio::spawn(async move {
            loop {
                let item = {
                    let mut rx = rx.lock().await;
                    tokio::select! {
                        item = rx.recv() => item,
                        _ = shutdown.changed() => {
                            if *shutdown.borrow() { None } else { continue }
                        }
                    }
                };
                match item {
                    Some(work) => {
                        println!("Worker {id}: processing {}", work.id);
                        sleep(Duration::from_millis(200)).await;
                    }
                    None => break,
                }
            }
        }));
    }

    // Submit work ... / 提交工作 ...

    // On Ctrl+C: signal shutdown, wait for workers
    // 在 Ctrl+C 时:发出停机信号,等待工作任务完成
    tokio::signal::ctrl_c().await.unwrap();
    shutdown_tx.send(true).unwrap();
    for h in handles { let _ = h.await; }
    println!("Shut down cleanly.");
}

Key Takeaways — Production Patterns / 关键要点:生产模式

  • Use a watch channel + select! for coordinated graceful shutdown / 使用 watch 通道 + select! 进行协调一致的优雅停机
  • Bounded channels (mpsc::channel(N)) provide backpressure / 有界通道(mpsc::channel(N))提供了 背压 (backpressure) 机制
  • JoinSet and TaskTracker provide structured concurrency / JoinSetTaskTracker 为 Rust 带来了 结构化并发 (structured concurrency)
  • Always add timeouts to network operations / 务必为网络操作添加超时处理
  • Tower’s Service trait is the standard middleware pattern / Tower 的 Service trait 是标准的中间件模式

See also / 延伸阅读: Ch 8 — Tokio Deep Dive / 第 8 章:Tokio 深入解析 for channels, Ch 12 — Common Pitfalls / 第 12 章:常见陷阱 for cancellation hazards


Exercises / 练习

Exercise 1: Async Echo Server / 练习 1:异步 Echo 服务器

Build a TCP echo server that handles multiple clients concurrently.

构建一个可以并发处理多个客户端的 TCP echo(回显)服务器。

Requirements / 要求

  • Listen on 127.0.0.1:8080 / 监听 127.0.0.1:8080
  • Accept connections and echo back each line / 接收连接并回显每一行内容
  • Handle client disconnections gracefully / 优雅地处理客户端断开连接
  • Print a log when clients connect/disconnect / 在客户端连接/断开时打印日志
🔑 Solution / 参考答案
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Echo server listening on :8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("[{addr}] Connected");

        tokio::spawn(async move {
            let (reader, mut writer) = socket.into_split();
            let mut reader = BufReader::new(reader);
            let mut line = String::new();

            loop {
                line.clear();
                match reader.read_line(&mut line).await {
                    Ok(0) => {
                        println!("[{addr}] Disconnected");
                        break;
                    }
                    Ok(_) => {
                        print!("[{addr}] Echo: {line}");
                        if writer.write_all(line.as_bytes()).await.is_err() {
                            println!("[{addr}] Write error, disconnecting");
                            break;
                        }
                    }
                    Err(e) => {
                        eprintln!("[{addr}] Read error: {e}");
                        break;
                    }
                }
            }
        });
    }
}

Exercise 2: Concurrent URL Fetcher with Rate Limiting / 练习 2:带限流的并发 URL 抓取器

Fetch a list of URLs concurrently, with at most 5 concurrent requests.

并发抓取一组 URL,且限制最多同时进行 5 个并发请求。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

async fn fetch_urls(urls: Vec<String>) -> Vec<Result<String, String>> {
    // buffer_unordered(5) ensures at most 5 futures are polled
    // concurrently — no separate Semaphore needed here.
    // buffer_unordered(5) 确保最多同时轮询 5 个 future
    // —— 此处不需要额外的信号量 (Semaphore)。
    let results: Vec<_> = stream::iter(urls)
        .map(|url| {
            async move {
                println!("Fetching: {url}");

                match reqwest::get(&url).await {
                    Ok(resp) => match resp.text().await {
                        Ok(body) => Ok(body),
                        Err(e) => Err(format!("{url}: {e}")),
                    },
                    Err(e) => Err(format!("{url}: {e}")),
                }
            }
        })
        .buffer_unordered(5) // ← This alone limits concurrency to 5 / 仅靠此项即可限制并发数为 5
        .collect()
        .await;

    results
}

// NOTE: Use Semaphore when you need to limit concurrency across
// independently spawned tasks (tokio::spawn). Use buffer_unordered
// when processing a stream. Don't combine both for the same limit.
// 注意:如果需要限制独立派生任务 (tokio::spawn) 的并发,请使用信号量。
// 如果处理流,请使用 buffer_unordered。不要为了同一个限制而混用两者。
}

Exercise 3: Graceful Shutdown with Worker Pool / 练习 3:带有工作池的优雅停机

Build a task processor with:

  • A channel-based work queue
  • N worker tasks consuming from the queue
  • Graceful shutdown on Ctrl+C: stop accepting, finish in-flight work

构建一个任务处理器,包含:

  • 一个基于通道的工作队列
  • N 个从队列中提取任务的工作任务
  • 在收到 Ctrl+C 时实现优雅停机:停止接收新任务,并完成正在进行的工作

Exercise 4: Build a Simple Async Mutex from Scratch / 练习 4:从零开始构建一个简单的异步 Mutex

Implement an async-aware mutex using channels (without using tokio::sync::Mutex).

使用通道实现一个支持异步的 Mutex(不要直接使用 tokio::sync::Mutex)。

Hint / 提示:可以使用容量为 1 的 tokio::sync::mpsc 通道作为信号量。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
use std::cell::UnsafeCell;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

pub struct SimpleAsyncMutex<T> {
    data: Arc<UnsafeCell<T>>,
    semaphore: Arc<Semaphore>,
}

// SAFETY: Access to T is serialized by the semaphore (max 1 permit).
unsafe impl<T: Send> Send for SimpleAsyncMutex<T> {}
unsafe impl<T: Send> Sync for SimpleAsyncMutex<T> {}

pub struct SimpleGuard<T> {
    data: Arc<UnsafeCell<T>>,
    _permit: OwnedSemaphorePermit, // Dropped on guard drop → releases lock
                                    // Guard 释放时 drop,从而释放锁
}

impl<T> SimpleAsyncMutex<T> {
    pub fn new(value: T) -> Self {
        SimpleAsyncMutex {
            data: Arc::new(UnsafeCell::new(value)),
            semaphore: Arc::new(Semaphore::new(1)),
        }
    }

    pub async fn lock(&self) -> SimpleGuard<T> {
        let permit = self.semaphore.clone().acquire_owned().await.unwrap();
        SimpleGuard {
            data: self.data.clone(),
            _permit: permit,
        }
    }
}

impl<T> std::ops::Deref for SimpleGuard<T> {
    type Target = T;
    fn deref(&self) -> &T {
        // SAFETY: We hold the only semaphore permit.
        // 安全性:我们持有唯一的信号量许可。
        unsafe { &*self.data.get() }
    }
}

impl<T> std::ops::DerefMut for SimpleGuard<T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.data.get() }
    }
}
}

Key takeaway: Async mutexes are typically built on top of semaphores. The semaphore provides the async wait mechanism.

关键要点:异步 Mutex 通常构建在信号量之上。信号量提供了异步等待机制。


Exercise 5: Stream Pipeline / 练习 5:流流水线

Build a data processing pipeline using streams:

  1. Generate numbers 1..=100
  2. Filter to even numbers
  3. Map each to its square
  4. Process 10 at a time concurrently
  5. Collect results

使用流构建一个数据处理流水线:

  1. 生成数字 1..=100
  2. 过滤出偶数
  3. 将每个数映射为其平方
  4. 同时并发处理 10 个项
  5. 收集结果

Exercise 6: Implement Select with Timeout / 练习 6:实现带超时的 Select

Without using tokio::select! or tokio::time::timeout, implement a function that races a future against a deadline.

不使用 tokio::select!tokio::time::timeout,实现一个让 future 与截止时间进行竞速的函数。

🔑 Solution / 参考答案
#![allow(unused)]
fn main() {
impl<F: Future + Unpin> Future for Timeout<F> {
    type Output = Either<F::Output, ()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Check if the main future is done / 检查主 future 是否完成
        if let Poll::Ready(val) = Pin::new(&mut self.future).poll(cx) {
            return Poll::Ready(Either::Left(val));
        }

        // Check if the timer expired / 检查定时器是否到期
        if let Poll::Ready(()) = Pin::new(&mut self.timer).poll(cx) {
            return Poll::Ready(Either::Right(()));
        }

        Poll::Pending
    }
}
}

Key takeaway: select/timeout is just polling two futures and seeing which completes first.

关键要点select/timeout 本质上就是轮询两个 future 并观察哪一个先完成。


Summary and Reference Card / 总结与速查表

Quick Reference Card / 快速速查表

Async Mental Model / 异步思维模型

┌─────────────────────────────────────────────────────┐
│  async fn → State Machine (enum) → impl Future     │
│  异步函数 → 状态机 (枚举值) → 实现 Future trait        │
│  .await   → poll() the inner future                 │
│  .await   → 轮询 (poll) 内部的 future                │
│  executor → loop { poll(); sleep_until_woken(); }   │
│  执行器   → 循环 { 轮询(); 睡眠直到被唤醒(); }         │
│  waker    → "hey executor, poll me again"           │
│  唤醒器   → “嘿执行器,请再次轮询我”                  │
│  Pin      → "promise I won't move in memory"        │
│  固定 (Pin) → “承诺我不会在内存中移动”                 │
└─────────────────────────────────────────────────────┘

Common Patterns Cheat Sheet / 常用模式备忘录

Goal / 目标Use / 方案
Run two futures concurrently / 并发运行两个 futuretokio::join!(a, b)
Race two futures / 让两个 future 竞速tokio::select! { ... }
Spawn a background task / 派生后台任务tokio::spawn(async { ... })
Run blocking code in async / 在异步中运行阻塞代码`tokio::task::spawn_blocking(
Limit concurrency / 限制并发数Semaphore::new(N)
Collect many task results / 收集多个任务结果JoinSet
Share state across tasks / 在任务间共享状态Arc<Mutex<T>> 或通道
Graceful shutdown / 优雅停机watch::channel + select!
Process a stream N-at-a-time / 每次处理流中的 N 个项.buffer_unordered(N)
Timeout a future / 为 future 设置超时tokio::time::timeout(dur, fut)
Retry with backoff / 退避重试自定义组合器(参见第 13 章)

Pinning Quick Reference / 固定 (Pinning) 快速指南

Situation / 场景Use / 方案
Pin a future on the heap / 在堆上固定 futureBox::pin(fut)
Pin a future on the stack / 在栈上固定 futuretokio::pin!(fut)
Pin an Unpin type / 固定一个 Unpin 类型Pin::new(&mut val) — 安全且无开销
Return a pinned trait object / 返回固定的 trait 对象-> Pin<Box<dyn Future<Output = T> + Send>>

Channel Selection Guide / 通道选择指南

Channel / 通道Producers / 生产者Consumers / 消费者Values / 值Use When / 适用场景
mpsc多 (N)1流 (Stream)工作队列、事件总线
oneshot11单个 (Single)请求/响应、完成通知
broadcast多 (N)多 (N)全部接收全部扇出通知、停机信号
watch1多 (N)仅最新值配置更新、健康状态

Mutex Selection Guide / Mutex 选择指南

Mutex / 互斥锁Use When / 适用场景
std::sync::Mutex锁持有时间极短,且绝不跨越 .await
tokio::sync::Mutex必须跨越 .await 点持有锁
parking_lot::Mutex高争用、无 .await、追求极致性能
tokio::sync::RwLock多读少写,且锁需跨越 .await

Decision Quick Reference / 决策指南

Need concurrency? / 需要并发吗?
├── I/O-bound → async/await / I/O 密集型 → async/await
├── CPU-bound → rayon / std::thread / CPU 密集型 → rayon 或 std::thread
└── Mixed → spawn_blocking for CPU parts / 混合型 → 对 CPU 部分使用 spawn_blocking

Choosing runtime? / 选择哪种运行时?
├── Server app → tokio / 服务器应用 → tokio
├── Library → runtime-agnostic (futures crate) / 库 → 运行时无关 (使用 futures 库)
├── Embedded → embassy / 嵌入式 → embassy
└── Minimal → smol / 最小化 → smol

Need concurrent futures? / 需要并发 future 吗?
├── Can be 'static + Send → tokio::spawn / 满足 'static + Send → tokio::spawn
├── Can be 'static + !Send → LocalSet / 满足 'static + !Send → LocalSet
├── Can't be 'static → FuturesUnordered / 不满足 'static → FuturesUnordered
└── Need to track/abort → JoinSet / 需要跟踪/中止 → JoinSet

Common Error Messages and Fixes / 常见错误消息与修复

Error / 错误Cause / 原因Fix / 修复
future is not Send跨越 .await 持有了 !Send 类型缩小作用域使其在 .await 前被释放,或使用 current_thread 运行时
borrowed value does not live long enough (in spawn)tokio::spawn 要求 'static 生命周期使用 Arcclone()FuturesUnordered
the trait Future is not implemented for ()遗漏了 .await为异步调用添加 .await
cannot borrow as mutable (in poll)自引用借用正确使用 Pin<&mut Self>(参见第 4 章)
Program hangs silently / 程序静默挂起忘记调用 waker.wake()确保每个 Pending 路径都注册并触发了唤醒器

Further Reading / 延伸阅读

Resource / 资源Why / 理由
Tokio Tutorial官方上手指南 —— 对第一个项目来说非常棒
Async Book (official)在语言层面涵盖 FuturePinStream
Jon Gjengset — Crust of Rust: async/await2 小时的深入解析,包含现场编码
Alice Ryhl — Actors with Tokio有状态生产服务的架构模式
Without Boats — Pin, Unpin, and why Rust needs them语言设计者的原始设计思路
Tokio mini-Redis完整的异步 Rust 项目 —— 研究级的生产代码
Tower documentationaxum、tonic 和 hyper 使用的中间件/服务架构

End of Async Rust Training Guide / Async Rust 培训指南结束

Capstone Project: Async Chat Server / 终极项目:异步聊天服务器

This project integrates patterns from across the book into a single, production-style application. You’ll build a multi-room async chat server using tokio, channels, streams, graceful shutdown, and proper error handling.

本项目将本书中提到的各种模式整合到一个生产风格的应用中。你将使用 tokio、通道、流、优雅停机以及完善的错误处理来构建一个 多房间异步聊天服务器

Estimated time / 预计耗时:4–6 hours / 4-6 小时 | Difficulty / 难度:★★★

What you’ll practice / 你将练习:

  • tokio::spawn and the 'static requirement (Ch 8) / tokio::spawn 及其 'static 要求(第 8 章)
  • Channels: mpsc for messages, broadcast for rooms, watch for shutdown (Ch 8) / 通道:用于消息的 mpsc、用于房间的 broadcast、用于停机的 watch(第 8 章)
  • Streams: reading lines from TCP connections (Ch 11) / 流:从 TCP 连接中读取行(第 11 章)
  • Common pitfalls: cancellation safety, MutexGuard across .await (Ch 12) / 常见陷阱:取消安全性、跨 .await 持有 MutexGuard(第 12 章)
  • Production patterns: graceful shutdown, backpressure (Ch 13) / 生产模式:优雅停机、背压(第 13 章)
  • Async traits for pluggable backends (Ch 10) / 异步 trait 用于可插拔后端(第 10 章)

The Problem / 项目需求

Build a TCP chat server where:

构建一个满足以下要求的 TCP 聊天服务器:

  1. Clients connect via TCP and join named rooms / 客户端通过 TCP 连接并加入指定的房间
  2. Messages are broadcast to all clients in the same room / 消息会广播给同房间内的所有客户端
  3. Commands: /join <room>, /nick <name>, /rooms, /quit / 命令:加入房间、修改昵称、查看房间列表、退出
  4. The server shuts down gracefully on Ctrl+C / 服务器在收到 Ctrl+C 时可以优雅地停机
graph LR
    C1["Client 1<br/>(Alice)"] -->|TCP| SERVER["Chat Server"]
    C2["Client 2<br/>(Bob)"] -->|TCP| SERVER
    C3["Client 3<br/>(Carol)"] -->|TCP| SERVER

    SERVER --> R1["#general<br/>broadcast channel"]
    SERVER --> R2["#rust<br/>broadcast channel"]

    R1 -->|msg| C1
    R1 -->|msg| C2
    R2 -->|msg| C3

    CTRL["Ctrl+C"] -->|watch| SERVER

    style SERVER fill:#e8f4f8,stroke:#2980b9,color:#000
    style R1 fill:#d4efdf,stroke:#27ae60,color:#000
    style R2 fill:#d4efdf,stroke:#27ae60,color:#000
    style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000

Step 1: Basic TCP Accept Loop / 第 1 步:基础 TCP 接收循环

Start with a server that accepts connections and echoes lines back:

首先实现一个能够接收连接并回显内容的服务器:

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Chat server listening on :8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("[{addr}] Connected");

        tokio::spawn(async move {
            let (reader, mut writer) = socket.into_split();
            let mut reader = BufReader::new(reader);
            let mut line = String::new();

            loop {
                line.clear();
                match reader.read_line(&mut line).await {
                    Ok(0) | Err(_) => break,
                    Ok(_) => {
                        let _ = writer.write_all(line.as_bytes()).await;
                    }
                }
            }
            println!("[{addr}] Disconnected");
        });
    }
}

Your job / 你的任务:Verify this compiles and works with telnet localhost 8080. / 验证此代码可编译,并通过 telnet localhost 8080 进行测试。

Step 2: Room State with Broadcast Channels / 第 2 步:使用广播通道管理房间状态

Each room is a broadcast::Sender. All clients in a room subscribe to receive messages.

每个房间都是一个 broadcast::Sender。房间内的所有客户端都订阅该通道以接收消息。

#![allow(unused)]
fn main() {
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};

type RoomMap = Arc<RwLock<HashMap<String, broadcast::Sender<String>>>>;

fn get_or_create_room(rooms: &mut HashMap<String, broadcast::Sender<String>>, name: &str) -> broadcast::Sender<String> {
    rooms.entry(name.to_string())
        .or_insert_with(|| {
            let (tx, _) = broadcast::channel(100); // 100-message buffer / 100 条消息的缓冲区
            tx
        })
        .clone()
}
}

Your job / 你的任务:Implement room state so that clients can join rooms and messages are broadcast to the sender’s current room. / 实现房间状态管理,使客户端可以加入房间,且消息会广播给发送者所在的房间。

💡 Hint — Client task structure / 提示 —— 客户端任务结构

Each client task needs two concurrent loops / 每个客户端任务都需要两个并发循环:

  1. Read from TCP → parse commands or broadcast to room / 从 TCP 读取 → 解析命令或广播到房间
  2. Read from broadcast receiver → write to TCP / 从广播接收器读取 → 写入 TCP

Use tokio::select! to run both / 使用 tokio::select! 同时运行两者:

#![allow(unused)]
fn main() {
loop {
    tokio::select! {
        result = reader.read_line(&mut line) => { /* ... */ }
        result = room_rx.recv() => { /* ... */ }
    }
}
}

Step 3: Commands / 第 3 步:指令系统

Implement the command protocol / 实现指令协议:

Command / 命令Action / 动作
/join <room>Leave current room, join new room / 离开当前房间,加入新房间
/nick <name>Change display name / 修改显示昵称
/roomsList all active rooms / 列出所有活跃房间
/quitDisconnect gracefully / 优雅断开连接
Anything else / 其他内容Broadcast as a chat message / 作为聊天消息广播

Step 4: Graceful Shutdown / 第 4 步:优雅停机

Add Ctrl+C handling to stop accepting and exit cleanly. / 添加 Ctrl+C 处理逻辑以停止接收并平衡退出。

#![allow(unused)]
fn main() {
use tokio::sync::watch;

let (shutdown_tx, shutdown_rx) = watch::channel(false);

// In the accept loop / 在接收循环中:
loop {
    tokio::select! {
        result = listener.accept() => { /* ... */ }
        _ = tokio::signal::ctrl_c() => {
            shutdown_tx.send(true)?;
            break;
        }
    }
}
}

Step 5: Error Handling and Edge Cases / 第 5 步:错误处理与边界情况

Production-harden the server / 强化服务器稳定性:

  1. Lagging receivers: Handle RecvError::Lagged(n) if a slow client misses messages. / 滞后接收者:如果慢速客户端遗漏了消息,请妥善处理 RecvError::Lagged(n)
  2. Backpressure: The broadcast channel buffer is bounded. / 背压:广播通道的缓冲区是有界的。
  3. Timeout: Disconnect clients that are idle for >5 minutes. / 超时:断开空闲超过 5 分钟的客户端。

Step 6: Integration Test / 第 6 步:集成测试

Write a test that starts the server, connects two clients, and verifies message delivery. / 编写一个测试,启动服务器并连接两个客户端,验证消息是否可以成功送达。

Evaluation Criteria / 评估标准

Criterion / 准则Target / 目标
Concurrency / 并发性Multiple clients in multiple rooms, no blocking / 多个房间、多个客户端,互不阻塞
Correctness / 正确性Messages only go to clients in the same room / 消息仅发送给同房间的客户端
Graceful shutdown / 优雅停机Ctrl+C drains messages and exits cleanly / Ctrl+C 后处理完消息并安全退出
Error handling / 错误处理Lagged receivers, disconnections, timeouts handled / 处理好滞后、断连和超时

Extension Ideas / 扩展思路

  1. Persistent history: Store last N messages per room. / 持久化历史记录:存储每个房间最后 N 条消息。
  2. WebSocket support: Accept WebSocket clients using tokio-tungstenite. / WebSocket 支持:支持 WebSocket 客户端。
  3. TLS: Add tokio-rustls for encrypted connections. / TLS:添加加密连接支持。