学习rust_day14

Streams

“消息传递”中异步的recv方法会随着时间产生一系列项目,这称为stream

StreamExt

Ext是Rust社区中使用另外一个trait扩展某个trait的常见模式

  • 简单来说
    • Stream trait定义了一个低级接口,有效地结合了Iterator和Future traits
    • StreamExt在Stream之上提供了一组更高级的APl,包括next方法以及类似于Iterator trait提供的其他实用方法
  • Stream和StreamExt尚未成为Rust标准库的一部分,但大多数生态系统中的crate使用相同的定义

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use trpl::StreamExt;

fn main() {
trpl::run(
async{
let values = [1, 2, 3, 4, 5, 6,7, 8, 9, 10];
let iter = values.iter().map(|n| n*2);
let mut stream = trpl::stream_from_iter(iter);

while let Some(value) = stream.next().await {
println!("{value}");
}
});
}

Composing Streams
组合流

  • 许多概念天然适合用stream来表示:
    • 队列中逐渐可用的项目
    • 文件系统中逐步拉取的数据块(数据集太大时)
    • 网络上随时间到达的数据
    • 实时通信(如VebSocket)
  • Streams其实就是Futures,可以与任意类型的Future组合着使用

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
trpl::run(
async {
let mut messages = get_messages();

while let Some(message) = messages.next().await {
println!("{message}");
}
}
);
}

fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();

let messages = ["a", "b", "c" , "d", "e", "f", "g", "h", "i", "j"];
for message in messages {
tx.send(format!("Message: {message}")).unwrap();
}

ReceiverStream::new(rx)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use trpl::{ReceiverStream, Stream, StreamExt};
use std::{pin::pin, time::Duration};

fn main() {
trpl::run(
async {
let mut messages = pin!(get_messages().timeout(Duration::from_millis(200)));

while let Some(result) = messages.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}")
}
}
}
);
}

fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();

trpl::spawn_task(
async move {
let messages = ["a", "b", "c" , "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index %2 == 0 {100} else {300};
trpl::sleep(Duration::from_millis(time_to_sleep)).await;

tx.send(format!("Message: {message}")).unwrap();
}
}
);

ReceiverStream::new(rx)
}

合并流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use trpl::{ReceiverStream, Stream, StreamExt};
use std::{pin::pin, time::Duration};

fn main() {
trpl::run(
async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.throttle(Duration::from_millis(100)) //限制该函数100ms调用一次
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20); //merge将两个流合并,take限制只从流中拉取20个
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}")
}
}
}
);
}

fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();

trpl::spawn_task(
async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;

if let Err(send_error) = tx.send(count) {
eprint!("Error: {count} : {send_error}");
break;
}
}
}
);

ReceiverStream::new(rx)
}

fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();

trpl::spawn_task(
async move {
let messages = ["a", "b", "c" , "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index %2 == 0 {100} else {300};
trpl::sleep(Duration::from_millis(time_to_sleep)).await;

if let Err(send_error) = tx.send(format!("Message: {message}")) {
eprintln!("Error: {message} : {send_error}");
break;
}
}
}
);

ReceiverStream::new(rx)
}

Traits for Async

异步主要的Traits

Pin

  • Pin是针对(类)指针类型(如&、&mut、Box和 Rc)的包装器
    • 从技术上讲,Pin适用于实现Deref或DerefMut特性的类型,但这实际上等同于只适用于指针
  • Pin本身不是指针,也没有像Rc和Arc那样具有引用计数等自身行为
  • 它纯粹是编译器可以用来强制约束指针使用的工具

简单来说,使用Pin修饰某对象使其无法在内存中移动以防止该对象对其自身的引用失效。

Unpin & !Unpin

  • Unpin是一个标记特性(marker trait),它本身没有功能
    • 标记特性的存在只是为了告诉编译器,在特定上下文中使用实现给定trait的类型是安全的
    • Unpi通知编译器,给定类型不需要维持关于“这个值是否可以安全移动”的任何保证
  • 就像Send和Sync一样,编译器会自动为所有可以证明安全的类型实现Unpin

学习rust_day14
https://zlsf-zl.github.io/2026/03/24/学习rust-day14/
作者
ZLSF
发布于
2026年3月24日
许可协议