学习rust_day13

异步编程

  • 并行性Parallelism:同时执行多个操作
  • 并发性Concurrency:在操作间进行切换
  • 阻塞操作Blocking Operations:阻止程序继续执行直到操作完成
  • 非阻塞操作Non-blocking Operations:允许程序在等待时执行其他任务

操作类型

  • CPU密集型(CPU-bound):受处理器能力限制(如视频导出)
  • IO密集型O-bound):受输入/输出速度限制(如文件下载)

并行与并发

  • 并发性(Concurrency):一个执行单元处理多个任务,通过任务切换实现
  • 并行性(Parallelism):多个执行单元同时处理不同任务
  • 串行性(Serial Work):任务必须按特定顺序一个接一个地完成

Futures 和 异步语法

核心元素

  • Future(未来量):一个可能现在还未准备好(就绪),但将来会准备好(就绪)的值
    • 在其他语言中也称为task或promise
    • 在Rust中,Future是实现了Future trait的类型
  • async关键字:用于代码块或函数,来表示可被中断和恢复
    • 将函数或代码块转换为返回Future的形式
  • await关键字:用于等待Future就绪
    • 提供暂停和恢复执行的点
    • “轮询”(polling)是检查Future值是否可用的过程

Future的特点

  • Rust编译器将async/await代码转换为使用Future trait的等效代码
    • 类似于for循环被转换为使用Iterator trait
  • 开发者可以为自定义数据类型实现Future trait
    • 提供统一接口但允许不同的异步操作实现

trpl一整合了我们需要的类型、trait和函数,主要来自futures和tokio这两个核心异步库

目标:专注于异步编程学习,避免生态系统干扰

工具:使用trpl库(The Rust Programming Language)

  • 整合futures和tokio的核心功能
  • futures:异步实验的官方家园,定义了Future特性
  • tokio:最流行的异步运行时,广泛用于Web开发

设计:

  • trpl重导出类型、函数和trait,简化学习
  • 隐藏复杂细节,专注于异步核心

使用异步编程获得某网页的title

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use trpl::Html;

fn main() {
let args: Vec<String> = std::env::args().collect();

trpl::run(
async {
let url = &args[1];
match page_title(url).await {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} hand no title")
}
}
)
}

async fn page_title(url: &str) -> Option<String> {
//get方法是async的,所以需要await来进行等待,或者说future是惰性的,需要await强制执行
//当await等待响应时,程序不会阻塞而可以执行其他任务
let response_text = trpl::get(url).await.text().await;
Html::parse(&response_text)
.select_first("title")
.map(|title_element| title_element.inner_html())
}

异步编程抓取两个网页的title,谁先抓取成功先显示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use trpl::{Either, Html};

fn main() {
let args: Vec<String> = std::env::args().collect();

trpl::run(
async {
let title_fut_1 = page_title(&args[1]);
let title_fut_2 = page_title(&args[2]);

let (url, mayber_title) =
match trpl::race(title_fut_1, title_fut_2).await { //race是竞争函数,那个参数先完成就返回谁
Either::Left(left) => left,
Either::Right(right) => right
};

println!("{url} returned first");
match mayber_title {
Some(title) => println!("Its page title is: {title}"),
None => println!("Someting Error")
}
}
)
}

async fn page_title(url: &str) -> (&str, Option<String>) {
let text = trpl::get(url).await.text().await;
let title = Html::parse(&text)
.select_first("title")
.map(|title| title.inner_html());
(url, title)
}

使用Async实现并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::time::Duration;

fn main() {
trpl::run(
async {
let handle = trpl::spawn_task(
async {
for i in 1..10 {
println!("hi numver {i} from the first task");
trpl::sleep(Duration::from_millis(500)).await;
}
}
);

for i in 1..5 {
println!("hi number {i} from the second task");
trpl::sleep(Duration::from_millis(500)).await;
}

handle.await.unwrap(); //防止主task运行完成后副task运行未完成
}
)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::time::Duration;

fn main() {
trpl::run(
async {
let fut1 = async {
for i in 1..10 {
println!("hi numver {i} from the first task");
trpl::sleep(Duration::from_millis(500)).await;
}
};

let fut2 = async{
for i in 1..5 {
println!("hi number {i} from the second task");
trpl::sleep(Duration::from_millis(500)).await;
}
};

trpl::join(fut1, fut2).await; //防止主task运行完成后副task运行未完成
}
)
}

使用消息传递在两个任务上计数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::time::Duration;

fn main() {
trpl::run(
async {
let (tx, mut rx) = trpl::channel();

let tx_fut = async move {

let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future")
];

for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}

};

let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received {value}");
}
};

trpl::join(tx_fut, rx_fut).await;
}
);
}

处理任意数量的Future

1
2
trpl::join!(tx1_fut, tx2_fut, tx3_fut).await; //传入任意数量的Future
trpl::join_all(futures).await; //传入一个集合的Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//处理动态数量相同类型的Future
use std::time::Duration;
use std::pin::{pin, Pin};

fn main() {
trpl::run(
async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();

let tx_fut = pin!(async move {

let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future")
];

for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}

});

let rx_fut = pin!(async {
while let Some(value) = rx.recv().await {
println!("received {value}");
}
});

let tx1_fut = pin!(async move {

let vals = vec![
String::from("some"),
String::from("thing"),
String::from("doing"),
String::from("now")
];

for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}

});

//trpl::join3(tx_fut, tx1_fut, rx_fut).await;

let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
vec![tx1_fut, tx_fut, rx_fut];

trpl::join_all(futures).await;
}
);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//处理不同类型固定数量的Future
fn main() {
trpl::run(
async {
let a = async { 1u32 };
let b = async { "Hello!" };
let c = async { true };

let (a_result, b_result, c_result) = trpl::join!(a, b, c);
println!("{a_result}, {b_result}, {c_result}");
}
);
}

竞争的Futures

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::time::Duration;

fn main() {
trpl::run(
async {
let slow = async {
println!("slow started");
trpl::sleep(Duration::from_millis(100)).await;
//使用trpl::yield_now().await;可以将控制权交还给运行时,你最好这么做
println!("slow finished");
};

let fast = async {
println!("fast started");
trpl::sleep(Duration::from_millis(50)).await;
println!("fast finished");
};

trpl::race(slow, fast).await; //其参数顺序决定异步块启动的顺序
}
);
}

异步抽象

将多个Future组合在一起创建一种新的模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::time::Duration;
use trpl::Either;

fn main() {
trpl::run(
async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"I finished"
};

match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeded with {message}"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
}
);
}

async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration
) -> Result<F::Output, Duration> {
match trpl::race(future_to_try, trpl::sleep(max_time)).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time)
}
}

学习rust_day13
https://zlsf-zl.github.io/2026/03/22/学习rust-day13/
作者
ZLSF
发布于
2026年3月22日
许可协议