底层探秘: Future 执行器与任务调度
异步编程背后到底藏有什么秘密?究竟是哪只幕后之手在操纵这一切?如果你对这些感兴趣,就继续看下去,否则可以直接跳过,因为本章节的内容对于一个 API 工程师并没有太多帮助。
但是如果你希望能深入理解 Rust
的 async/.await
代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 async
运行时或相关工具,那么本章节终究不会辜负于你。
Future 特征
Future
特征是 Rust 异步编程的核心,毕竟异步函数是异步编程的核心,而 Future
恰恰是异步函数的返回值和被执行的关键。
首先,来给出 Future
的定义:它是一个能产出值的异步计算(虽然该值可能为空,例如 ()
)。光看这个定义,可能会觉得很空洞,我们来看看一个简化版的 Future
特征:
在上一章中,我们提到过 Future
需要被执行器poll
(轮询)后才能运行,诺,这里 poll
就来了,通过调用该方法,可以推进 Future
的进一步执行,直到被切走为止( 这里不好理解,但是你只需要知道 Future
并不能保证在一次 poll
中就被执行完,后面会详解介绍)。
若在当前 poll
中, Future
可以被完成,则会返回 Poll::Ready(result)
,反之则返回 Poll::Pending
, 并且安排一个 wake
函数:当未来 Future
准备好进一步执行时, 该函数会被调用,然后管理该 Future
的执行器(例如上一章节中的block_on
函数)会再次调用 poll
方法,此时 Future
就可以继续执行了。
如果没有 wake
方法,那执行器无法知道某个 Future
是否可以继续被执行,除非执行器定期的轮询每一个 Future
,确认它是否能被执行,但这种作法效率较低。而有了 wake
,Future
就可以主动通知执行器,然后执行器就可以精确的执行该 Future
。 这种“事件通知 -> 执行”的方式要远比定期对所有 Future
进行一次全遍历来的高效。
也许大家还是迷迷糊糊的,没事,我们用一个例子来说明下。考虑一个需要从 socket
读取数据的场景:如果有数据,可以直接读取数据并返回 Poll::Ready(data)
, 但如果没有数据,Future
会被阻塞且不会再继续执行,此时它会注册一个 wake
函数,当 socket
数据准备好时,该函数将被调用以通知执行器:我们的 Future
已经准备好了,可以继续执行。
下面的 SocketRead
结构体就是一个 Future
:
这种 Future
模型允许将多个异步操作组合在一起,同时还无需任何内存分配。不仅仅如此,如果你需要同时运行多个 Future
或链式调用多个 Future
,也可以通过无内存分配的状态机实现,例如:
上面代码展示了如何同时运行多个 Future
, 且在此过程中没有任何内存分配,让并发编程更加高效。 类似的,多个Future
也可以一个接一个的连续运行:
这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用 Future
特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的 Future
特征有何不同之处。
首先这里多了一个 Pin
,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的 Future
,因为无法被移动,所以它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: struct MyFut { a: i32, ptr_to_a: *const i32 }
。 而对于 async/await
来说,Pin
是不可或缺的关键特性。
其次,从 wake: fn()
变成了 &mut Context<'_>
。意味着 wake
函数可以携带数据了,为何要携带数据?考虑一个真实世界的场景,一个复杂应用例如 web 服务器可能有数千连接同时在线,那么同时就有数千 Future
在被同时管理着,如果不能携带数据,当一个 Future
调用 wake
后,执行器该如何知道是哪个 Future
调用了 wake
,然后进一步去 poll
对应的 Future
?没有办法!那之前的例子为啥就可以使用没有携带数据的 wake
? 因为足够简单,不存在歧义性。
总之,在正式场景要进行 wake
,就必须携带上数据。 而 Context
类型通过提供一个 Waker
类型的值,就可以用来唤醒特定的的任务。
使用 Waker 来唤醒任务
对于 Future
来说,第一次被 poll
时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 poll
进而继续往下执行,该通知就是通过 Waker
类型完成的。
Waker
提供了一个 wake()
方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 Future
再次进行 poll
操作。
构建一个定时器
下面一起来实现一个简单的定时器 Future
。为了让例子尽量简单,当计时器创建时,我们会启动一个线程接着让该线程进入睡眠,等睡眠结束后再通知给 Future
。
注意本例子还会在后面继续使用,因此我们重新创建一个工程来演示:使用 cargo new --lib timer_future
来创建一个新工程,在 lib
包的根路径 src/lib.rs
中添加以下内容:
继续来实现 Future
定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 Future
,由于是多线程环境,我们需要使用 Arc<Mutex<T>>
来作为一个共享状态,用于在新线程和 Future
定时器间共享。
下面给出 Future
的具体实现:
代码很简单,只要新线程设置了 shared_state.completed = true
,那任务就能顺利结束。如果没有设置,会为当前的任务克隆一份 Waker
,这样新线程就可以使用它来唤醒当前的任务。
最后,再来创建一个 API 用于构建定时器和启动计时线程:
至此,一个简单的定时器 Future
就已创建成功,那么该如何使用它呢?相信部分爱动脑筋的读者已经猜到了:我们需要创建一个执行器,才能让程序动起来。
执行器 Executor
Rust 的 Future
是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 async
函数中使用 .await
来调用另一个 async
函数,但是这个只能解决 async
内部的问题,那么这些最外层的 async
函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 executor
。
执行器会管理一批 Future
(最外层的 async
函数),然后通过不停地 poll
推动它们直到完成。 最开始,执行器会先 poll
一次 Future
,后面就不会主动去 poll
了,而是等待 Future
通过调用 wake
函数来通知它可以继续,它才会继续去 poll
。这种 wake 通知然后 poll 的方式会不断重复,直到 Future
完成。
构建执行器
下面我们将实现一个简单的执行器,它可以同时并发运行多个 Future
。例子中,需要用到 futures
包的 ArcWake
特征,它可以提供一个方便的途径去构建一个 Waker
。编辑 Cargo.toml
,添加下面依赖:
在之前的内容中,我们在 src/lib.rs
中创建了定时器 Future
,现在在 src/main.rs
中来创建程序的主体内容,开始之前,先引入所需的包:
执行器需要从一个消息通道( channel
)中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器 poll
。
下面再来添加一个方法用于生成 Future
, 然后将它放入任务通道中:
在执行器 poll
一个 Future
之前,首先需要调用 wake
方法进行唤醒,然后再由 Waker
负责调度该任务并将其放入任务通道中。创建 Waker
的最简单的方式就是实现 ArcWake
特征,先来为我们的任务实现 ArcWake
特征,这样它们就能被转变成 Waker
然后被唤醒:
当任务实现了 ArcWake
特征后,它就变成了 Waker
,在调用 wake()
对其唤醒后会将任务复制一份所有权( Arc
),然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 poll
执行:
恭喜!我们终于拥有了自己的执行器,下面再来写一段代码使用该执行器去运行之前的定时器 Future
:
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 生成一个任务
spawner.spawn(async {
println!("howdy!");
// 创建定时器Future,并等待它完成
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
drop(spawner);
// 运行执行器直到任务队列为空
// 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!`
executor.run();
}
执行器和系统 IO
前面我们一起看过一个使用 Future
从 Socket
中异步读取数据的例子:
该例子中,Future
将从 Socket
读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 Future
。当数据准备好后,会调用 wake()
函数将该 Future
的任务放入任务通道中,等待执行器的 poll
。
关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决:
set_readable_callback
方法到底是怎么工作的?怎么才能知道socket
中的数据已经可以被读取了?
关于第二点,其中一个简单粗暴的方法就是使用一个新线程不停的检查 socket
中是否有了数据,当有了后,就调用 wake()
函数。该方法确实可以满足需求,但是性能着实太低了,需要为每个阻塞的 Future
都创建一个单独的线程!
在现实世界中,该问题往往是通过操作系统提供的 IO
多路复用机制来完成,例如 Linux
中的 epoll
,FreeBSD
和 macOS
中的 kqueue
,Windows
中的 IOCP
, Fuchisa
中的 ports
等(可以通过 Rust 的跨平台包 mio
来使用它们)。借助 IO 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 IO 事件,一旦某个事件完成就立即退出阻塞并返回数据。相关实现类似于以下代码:
这样,我们只需要一个执行器线程,它会接收 IO 事件并将其分发到对应的 Waker
中,接着后者会唤醒相关的任务,最终通过执行器 poll
后,任务可以顺利地继续执行, 这种 IO 读取流程可以不停的循环,直到 socket
关闭。