异步编程
- 并行性Parallelism:同时执行多个操作
- 并发性Concurrency:在操作间进行切换
- 阻塞操作Blocking Operations:阻止程序继续执行直到操作完成
- 非阻塞操作Non-blocking Operations:允许程序在等待时执行其他任务
操作类型
- CPU密集型(CPU-bound):受处理器能力限制(如视频导出)
- IO密集型O-bound):受输入/输出速度限制(如文件下载)
并行与并发
- 并发性(Concurrency):一个执行单元处理多个任务,通过任务切换实现
- 并行性(Parallelism):多个执行单元同时处理不同任务
- 串行性(Serial Work):任务必须按特定顺序一个接一个地完成
Futures 和 异步语法
核心元素
- Future(未来量):一个可能现在还未准备好(就绪),但将来会准备好(就绪)的值
- 在其他语言中也称为task或promise
- 在Rust中,Future是实现了Future trait的类型
- async关键字:用于代码块或函数,来表示可被中断和恢复
- await关键字:用于等待Future就绪
- 提供暂停和恢复执行的点
- “轮询”(polling)是检查Future值是否可用的过程
Future的特点
- Rust编译器将async/await代码转换为使用Future trait的等效代码
- 类似于for循环被转换为使用Iterator trait
- 开发者可以为自定义数据类型实现Future trait
trpl一整合了我们需要的类型、trait和函数,主要来自futures和tokio这两个核心异步库
目标:专注于异步编程学习,避免生态系统干扰
工具:使用trpl库(The Rust Programming Language)
- 整合futures和tokio的核心功能
- futures:异步实验的官方家园,定义了Future特性
- tokio:最流行的异步运行时,广泛用于Web开发
设计:
- trpl重导出类型、函数和trait,简化学习
- 隐藏复杂细节,专注于异步核心
使用异步编程获得某网页的title
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| use trpl::Html;
fn main() { let args: Vec<String> = std::env::args().collect(); trpl::run( async { let url = &args[1]; match page_title(url).await { Some(title) => println!("The title for {url} was {title}"), None => println!("{url} hand no title") } } ) }
async fn page_title(url: &str) -> Option<String> { let response_text = trpl::get(url).await.text().await; Html::parse(&response_text) .select_first("title") .map(|title_element| title_element.inner_html()) }
|
异步编程抓取两个网页的title,谁先抓取成功先显示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| use trpl::{Either, Html};
fn main() { let args: Vec<String> = std::env::args().collect();
trpl::run( async { let title_fut_1 = page_title(&args[1]); let title_fut_2 = page_title(&args[2]);
let (url, mayber_title) = match trpl::race(title_fut_1, title_fut_2).await { Either::Left(left) => left, Either::Right(right) => right }; println!("{url} returned first"); match mayber_title { Some(title) => println!("Its page title is: {title}"), None => println!("Someting Error") } } ) }
async fn page_title(url: &str) -> (&str, Option<String>) { let text = trpl::get(url).await.text().await; let title = Html::parse(&text) .select_first("title") .map(|title| title.inner_html()); (url, title) }
|
使用Async实现并发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| use std::time::Duration;
fn main() { trpl::run( async { let handle = trpl::spawn_task( async { for i in 1..10 { println!("hi numver {i} from the first task"); trpl::sleep(Duration::from_millis(500)).await; } } );
for i in 1..5 { println!("hi number {i} from the second task"); trpl::sleep(Duration::from_millis(500)).await; }
handle.await.unwrap(); } ) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| use std::time::Duration;
fn main() { trpl::run( async { let fut1 = async { for i in 1..10 { println!("hi numver {i} from the first task"); trpl::sleep(Duration::from_millis(500)).await; } };
let fut2 = async{ for i in 1..5 { println!("hi number {i} from the second task"); trpl::sleep(Duration::from_millis(500)).await; } };
trpl::join(fut1, fut2).await; } ) }
|
使用消息传递在两个任务上计数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| use std::time::Duration;
fn main() { trpl::run( async { let (tx, mut rx) = trpl::channel();
let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future") ];
for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; }
};
let rx_fut = async { while let Some(value) = rx.recv().await { println!("received {value}"); } };
trpl::join(tx_fut, rx_fut).await; } ); }
|
处理任意数量的Future
1 2
| trpl::join!(tx1_fut, tx2_fut, tx3_fut).await; trpl::join_all(futures).await;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| use std::time::Duration; use std::pin::{pin, Pin};
fn main() { trpl::run( async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone();
let tx_fut = pin!(async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future") ];
for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; }
});
let rx_fut = pin!(async { while let Some(value) = rx.recv().await { println!("received {value}"); } });
let tx1_fut = pin!(async move { let vals = vec![ String::from("some"), String::from("thing"), String::from("doing"), String::from("now") ];
for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; }
});
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, tx_fut, rx_fut]; trpl::join_all(futures).await; } ); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| fn main() { trpl::run( async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true };
let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); } ); }
|
竞争的Futures
例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| use std::time::Duration;
fn main() { trpl::run( async { let slow = async { println!("slow started"); trpl::sleep(Duration::from_millis(100)).await; println!("slow finished"); };
let fast = async { println!("fast started"); trpl::sleep(Duration::from_millis(50)).await; println!("fast finished"); };
trpl::race(slow, fast).await; } ); }
|
异步抽象
将多个Future组合在一起创建一种新的模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| use std::time::Duration; use trpl::Either;
fn main() { trpl::run( async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "I finished" };
match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeded with {message}"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } } ); }
async fn timeout<F: Future>( future_to_try: F, max_time: Duration ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time) } }
|