Stream
大家有没有想过, Rust 中的迭代器在迭代时能否异步进行?若不可以,是不是有相应的解决方案?
以上的问题其实很重要,因为在实际场景中,迭代一个集合,然后异步的去执行是很常见的需求,好在 Tokio 为我们提供了 stream
,我们可以在异步函数中对其进行迭代,甚至和迭代器 Iterator
一样,stream
还能使用适配器,例如 map
! Tokio 在 StreamExt
特征上定义了常用的适配器。
要使用 stream
,目前还需要手动引入对应的包:
#![allow(unused)] fn main() { tokio-stream = "0.1" }
stream 没有放在
tokio
包的原因在于标准库中的Stream
特征还没有稳定,一旦稳定后,stream
将移动到tokio
中来
迭代
目前, Rust 语言还不支持异步的 for
循环,因此我们需要 while let
循环和 StreamExt::next()
一起使用来实现迭代的目的:
use tokio_stream::StreamExt; #[tokio::main] async fn main() { let mut stream = tokio_stream::iter(&[1, 2, 3]); while let Some(v) = stream.next().await { println!("GOT = {:?}", v); } }
和迭代器 Iterator
类似,next()
方法返回一个 Option<T>
,其中 T
是从 stream
中获取的值的类型。若收到 None
则意味着 stream
迭代已经结束。
mini-redis 广播
下面我们来实现一个复杂一些的 mini-redis 客户端,完整代码见这里。
在开始之前,首先启动一下完整的 mini-redis 服务器端:
$ mini-redis-server
use tokio_stream::StreamExt; use mini_redis::client; async fn publish() -> mini_redis::Result<()> { let mut client = client::connect("127.0.0.1:6379").await?; // 发布一些数据 client.publish("numbers", "1".into()).await?; client.publish("numbers", "two".into()).await?; client.publish("numbers", "3".into()).await?; client.publish("numbers", "four".into()).await?; client.publish("numbers", "five".into()).await?; client.publish("numbers", "6".into()).await?; Ok(()) } async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber.into_stream(); tokio::pin!(messages); while let Some(msg) = messages.next().await { println!("got = {:?}", msg); } Ok(()) } #[tokio::main] async fn main() -> mini_redis::Result<()> { tokio::spawn(async { publish().await }); subscribe().await?; println!("DONE"); Ok(()) }
上面生成了一个异步任务专门用于发布消息到 min-redis 服务器端的 numbers
消息通道中。然后,在 main
中,我们订阅了 numbers
消息通道,并且打印从中接收到的消息。
还有几点值得注意的:
into_stream
会将Subscriber
变成一个stream
- 在
stream
上调用next
方法要求该stream
被固定住(pinned
),因此需要调用tokio::pin!
关于 Pin 的详细解读,可以阅读这篇文章
大家可以去掉 pin!
的调用,然后观察下报错,若以后你遇到这种错误,可以尝试使用下 pin!
。
此时,可以运行下我们的客户端代码看看效果(别忘了先启动前面提到的 mini-redis 服务端):
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
在了解了 stream
的基本用法后,我们再来看看如何使用适配器来扩展它。
适配器
在前面章节中,我们了解了迭代器有两种适配器:
- 迭代器适配器,会将一个迭代器转变成另一个迭代器,例如
map
,filter
等 - 消费者适配器,会消费掉一个迭代器,最终生成一个值,例如
collect
可以将迭代器收集成一个集合
与迭代器类似,stream
也有适配器,例如一个 stream
适配器可以将一个 stream
转变成另一个 stream
,例如 map
、take
和 filter
。
在之前的客户端中,subscribe
订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .take(3); }
这里关键就在于 take
适配器,它会限制 stream
只能生成最多 n
条消息。运行下看看结果:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
程序终于可以正常结束了。现在,让我们过滤 stream
中的消息,只保留数字类型的值:
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .take(3); }
运行后输出:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })
需要注意的是,适配器的顺序非常重要,.filter(...).take(3)
和 .take(3).filter(...)
的结果可能大相径庭,大家可以自己尝试下。
现在,还有一件事要做,咱们的消息被不太好看的 Ok(...)
所包裹,现在通过 map
适配器来简化下:
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .map(|msg| msg.unwrap().content) .take(3); }
注意到 msg.unwrap
了吗?大家可能会以为我们是出于示例的目的才这么用,实际上并不是,由于 filter
的先执行, map
中的 msg
只能是 Ok(...)
,因此 unwrap
非常安全。
got = b"1"
got = b"3"
got = b"6"
还有一点可以改进的地方:当 filter
和 map
一起使用时,你往往可以用一个统一的方法来实现 filter_map
。
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .filter_map(|msg| match msg { Ok(msg) if msg.content.len() == 1 => Some(msg.content), _ => None, }) .take(3); }
想要学习更多的适配器,可以看看 StreamExt
特征。
实现 Stream 特征
如果大家还没忘记 Future
特征,那 Stream
特征相信你也会很快记住,因为它们非常类似:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; fn size_hint(&self) -> (usize, Option<usize>) { (0, None) } } }
Stream::poll_next()
函数跟 Future::poll
很相似,区别就是前者为了从 stream
收到多个值需要重复的进行调用。 就像在 深入async
章节提到的那样,当一个 stream
没有做好返回一个值的准备时,它将返回一个 Poll::Pending
,同时将任务的 waker
进行注册。一旦 stream
准备好后, waker
将被调用。
通常来说,如果想要手动实现一个 Stream
,需要组合 Future
和其它 Stream
。下面,还记得在深入async
中构建的 Delay Future
吗?现在让我们来更进一步,将它转换成一个 stream
,每 10 毫秒生成一个值,总共生成 3 次:
#![allow(unused)] fn main() { use tokio_stream::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; struct Interval { rem: usize, delay: Delay, } impl Stream for Interval { type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { if self.rem == 0 { // 去除计时器实现 return Poll::Ready(None); } match Pin::new(&mut self.delay).poll(cx) { Poll::Ready(_) => { let when = self.delay.when + Duration::from_millis(10); self.delay = Delay { when }; self.rem -= 1; Poll::Ready(Some(())) } Poll::Pending => Poll::Pending, } } } }
async-stream
手动实现 Stream
特征实际上是相当麻烦的事,不幸地是,Rust 语言的 async/await
语法目前还不能用于定义 stream
,虽然相关的工作已经在进行中。
作为替代方案,async-stream
包提供了一个 stream!
宏,它可以将一个输入转换成 stream
,使用这个包,上面的代码可以这样实现:
#![allow(unused)] fn main() { use async_stream::stream; use std::time::{Duration, Instant}; stream! { let mut when = Instant::now(); for _ in 0..3 { let delay = Delay { when }; delay.await; yield (); when += Duration::from_millis(10); } } }
嗯,看上去还是相当不错的,代码可读性大幅提升!
是不是发现了一个关键字 yield
,他是用来配合生成器使用的。详见原文