Асинхронно програмиране
async/.await
16 декември 2025
Паралелизъм и concurrency
Паралелизъм и concurrency
- паралелизъм - изпълняваме няколко задачи едновременно върху няколко нишки
Паралелизъм и concurrency
- паралелизъм - изпълняваме няколко задачи едновременно върху няколко нишки
- concurrency - прогресираме изпълнението на няколко задачи едновременно, като позволяваме на всяка задача да се изпълнява за малък период от време и ги редуваме
Паралелизъм и concurrency
- паралелизъм - изпълняваме няколко задачи едновременно върху няколко нишки
- concurrency - прогресираме изпълнението на няколко задачи едновременно, като позволяваме на всяка задача да се изпълнява за малък период от време и ги редуваме
Двете понятия са ортогонални:
- concurrency без паралелизъм - върху една нишка
- concurrency с паралелизъм - върху множество нишки
Приложение
- паралелизъм - задачи с тежки сметки
Приложение
- паралелизъм - задачи с тежки сметки
- concurrency - множество леки задачи с много I/O операции
Приложение
- паралелизъм - задачи с тежки сметки
- concurrency - множество леки задачи с много I/O операции
- networking, уеб сървъри, …
Пример
TCP чат клиент (sync)
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
fn main() {
let mut args = std::env::args();
let _ = args.next(); // skip executable name
let name = args.next().unwrap_or_else(|| {
println!("Usage: ./client NAME");
std::process::exit(1);
});
let addr = "127.0.0.1:8080".to_string();
let mut socket = TcpStream::connect(addr).expect("failed to connect");
socket
.write_all(format!("{name}\n").as_bytes())
.expect("write failed");
let mut socket_writer = socket.try_clone().expect("clone failed");
let socket_reader = socket;
let h1 = std::thread::spawn(move || {
let stdin = std::io::stdin().lock();
for line in stdin.lines() {
let line = line.expect("stdin read failed");
socket_writer
.write_all(format!("{line}\n").as_bytes())
.expect("socket write failed");
}
});
let h2 = std::thread::spawn(move || {
let buf_reader = BufReader::new(socket_reader);
for line in buf_reader.lines() {
let line = line.expect("socket read failed");
println!("{}", line);
}
});
h1.join().unwrap();
h2.join().unwrap();
}
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
fn main() {
let mut args = std::env::args();
let _ = args.next(); // skip executable name
let name = args.next().unwrap_or_else(|| {
println!("Usage: ./client NAME");
std::process::exit(1);
});
let addr = "127.0.0.1:8080".to_string();
let mut socket = TcpStream::connect(addr).expect("failed to connect");
socket
.write_all(format!("{name}\n").as_bytes())
.expect("write failed");
let mut socket_writer = socket.try_clone().expect("clone failed");
let socket_reader = socket;
let h1 = std::thread::spawn(move || {
let stdin = std::io::stdin().lock();
for line in stdin.lines() {
let line = line.expect("stdin read failed");
socket_writer
.write_all(format!("{line}\n").as_bytes())
.expect("socket write failed");
}
});
let h2 = std::thread::spawn(move || {
let buf_reader = BufReader::new(socket_reader);
for line in buf_reader.lines() {
let line = line.expect("socket read failed");
println!("{}", line);
}
});
h1.join().unwrap();
h2.join().unwrap();
}
Пример
TCP чат сървър (sync)
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::sync::mpsc;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);
enum ClientMsg {
Connected { name: String, chan: mpsc::SyncSender<BroadcastMsg> },
Message { text: String },
}
struct BroadcastMsg {
from: String,
text: String,
}
fn main() {
let addr = "127.0.0.1:8080".to_string();
let listener = std::net::TcpListener::bind(&addr).expect("listen failed");
eprintln!("[log] listening on: {}", addr);
let (sender, receiver) = mpsc::sync_channel(100);
let mut next_id = 0;
std::thread::spawn(move || server(receiver));
loop {
let (socket, socket_addr) = listener.accept().expect("accept failed");
eprintln!("[log] client connected from {socket_addr}");
// "split" socket into reader and writer parts
let socket_writer = socket.try_clone().expect("clone failed");
let socket_reader = socket;
let sender = sender.clone();
let id = ClientId(next_id);
next_id += 1;
let (broadcast_sender, broadcast_receiver) = mpsc::sync_channel(10);
std::thread::spawn(move || client_reader(id, socket_reader, sender, broadcast_sender));
std::thread::spawn(move || client_writer(socket_writer, broadcast_receiver));
}
}
fn server(receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
struct Client {
name: String,
chan: mpsc::SyncSender<BroadcastMsg>,
}
let mut clients = HashMap::new();
for msg in receiver.iter() {
match msg {
(id, ClientMsg::Connected { name, chan }) => {
clients.insert(id, Client { name, chan });
},
(id, ClientMsg::Message { text }) => {
clients
.iter()
.filter(|(client_id, _client)| **client_id != id)
.for_each(|(_, client)| {
let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
});
}
}
}
}
fn client_reader(
id: ClientId,
socket_reader: TcpStream,
sender: mpsc::SyncSender<(ClientId, ClientMsg)>,
broadcast_sender: mpsc::SyncSender<BroadcastMsg>,
) {
let reader = BufReader::new(socket_reader);
let mut lines = reader.lines();
match lines.next() {
Some(Ok(first_line)) => {
eprintln!("[log] client({}) name={}", id.0, first_line);
sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).unwrap()
},
_ => panic!("socket read failed"),
}
while let Some(Ok(line)) = lines.next() {
eprintln!("[log] client({}) message={}", id.0, line);
sender.send((id, ClientMsg::Message { text: line })).unwrap();
}
}
fn client_writer(
mut socket_writer: TcpStream,
broadcast_receiver: mpsc::Receiver<BroadcastMsg>
) {
for msg in broadcast_receiver.iter() {
let line = format!("{}> {}", msg.from, msg.text);
socket_writer.write_all(format!("{line}\n").as_bytes()).unwrap();
}
}
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::sync::mpsc;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);
enum ClientMsg {
Connected { name: String, chan: mpsc::SyncSender },
Message { text: String },
}
struct BroadcastMsg {
from: String,
text: String,
}
fn main() {
let addr = "127.0.0.1:8080".to_string();
let listener = std::net::TcpListener::bind(&addr).expect("listen failed");
eprintln!("[log] listening on: {}", addr);
let (sender, receiver) = mpsc::sync_channel(100);
let mut next_id = 0;
std::thread::spawn(move || server(receiver));
loop {
let (socket, socket_addr) = listener.accept().expect("accept failed");
eprintln!("[log] client connected from {socket_addr}");
// "split" socket into reader and writer parts
let socket_writer = socket.try_clone().expect("clone failed");
let socket_reader = socket;
let sender = sender.clone();
let id = ClientId(next_id);
next_id += 1;
let (broadcast_sender, broadcast_receiver) = mpsc::sync_channel(10);
std::thread::spawn(move || client_reader(id, socket_reader, sender, broadcast_sender));
std::thread::spawn(move || client_writer(socket_writer, broadcast_receiver));
}
}
fn server(receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
struct Client {
name: String,
chan: mpsc::SyncSender,
}
let mut clients = HashMap::new();
for msg in receiver.iter() {
match msg {
(id, ClientMsg::Connected { name, chan }) => {
clients.insert(id, Client { name, chan });
},
(id, ClientMsg::Message { text }) => {
clients
.iter()
.filter(|(client_id, _client)| **client_id != id)
.for_each(|(_, client)| {
let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
});
}
}
}
}
fn client_reader(
id: ClientId,
socket_reader: TcpStream,
sender: mpsc::SyncSender<(ClientId, ClientMsg)>,
broadcast_sender: mpsc::SyncSender,
) {
let reader = BufReader::new(socket_reader);
let mut lines = reader.lines();
match lines.next() {
Some(Ok(first_line)) => {
eprintln!("[log] client({}) name={}", id.0, first_line);
sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).unwrap()
},
_ => panic!("socket read failed"),
}
while let Some(Ok(line)) = lines.next() {
eprintln!("[log] client({}) message={}", id.0, line);
sender.send((id, ClientMsg::Message { text: line })).unwrap();
}
}
fn client_writer(
mut socket_writer: TcpStream,
broadcast_receiver: mpsc::Receiver
) {
for msg in broadcast_receiver.iter() {
let line = format!("{}> {}", msg.from, msg.text);
socket_writer.write_all(format!("{line}\n").as_bytes()).unwrap();
}
}
Пример
TCP чат сървър (sync)
Кодът е коректен
(ако изключим липсата на error handling и това, че не поддържа откачане на клиенти)
Но не е оптимален - използва повече ресурси (нишки), отколкото е необходимо
- това не е проблем при малък брой клиенти
- но няма да скалира добре при стотици клиенти
Пример
TCP чат сървър (sync)
Наблюдение - през повечето време повечето нишки са в режим на изчакване:
- главна нишка - блокирана на системно извикване
accept - нишка "server" - блокирана на
recvот канал - нишка "client_reader" - блокирана на
readот socket - нишка "client_writer" - блокирана на
recvот канал илиwriteв socket
Можем ли да постигнем същата функционалност без блокиращи операции?
Async I/O
Това е често срещан проблем.
Всяка операционна система предоставя начин за асинхронно изпълнение на входно/изходни операции:
epollпод Линукс (иio_uring)kqueueпод BSD/MacOSIOCPпод Windows
epoll и kqueue позволяват:
- по даден списък файлови дескриптори
- чрез едно системно извикване (syscall)
- да се провери кои файлови дескриптори са готови за четене или писане
- т.е.
readилиwriteняма да блокира
- т.е.
IOCP и io_uring се използват за същото, но ползват различен интерфейс (completion based vs poll based).
Async I/O
epoll
Псевдокод:
let listener_fd = tcp_listener.as_fd();
let client0_fd = clients[0].tcp_stream.as_fd();
let client1_fd = clients[1].tcp_stream.as_fd();
epollfd := epoll_create();
epoll_ctl(epollfd, .CTL_ADD, listener_fd, {.IN}) // notify when fd is ready for READ operation
epoll_ctl(epollfd, .CTL_ADD, client0_fd, {.IN}) // notify when fd is ready for READ operation
epoll_ctl(epollfd, .CTL_ADD, client1_fd, {.IN}) // notify when fd is ready for READ operation
loop {
events := epoll_wait(epollfd);
for evt in events {
cond {
evt.fd == listener_fd => {
stream := tcp_listener.accept(); // will not block
// etc ...
}
evt.fd in [client0_fd, client1_fd] => {
id := fd_to_client(evt.fd);
msg := clients[id].tcp_stream.read(); // will not block
// etc ...
}
}
}
}
Async I/O
Езици и библиотеки, които поддържат асинхронен вход/изход, предоставят различни абстракции върху тази функционалност на операционната система
- Python, JavaScript, Go, …
- libuv (C), boost::asio (C++), …
Async I/O имплементации
Callback функции
- задаваме на библиотеката каква входно/изходна операция трябва да се извърши
- задаваме closure, който да се извика след като операцията е готова
// псевдокод (базирано на c++ boost::asio)
void process_request(shared_ptr<State> state) {
async_connect(io_executor, state->socket, bind(on_connected, state));
}
void on_connected(shared_ptr<State> state) {
state->request_buffer = prepare_request();
async_write(socket, state->request_buffer, bind(on_send_complete, state));
}
void on_send_complete(shared_ptr<State> state) {
asyn_read(socket, state->response_buffer, bind(on_read_complete, state));
}
void on_read_complete(shared_ptr<State> state) {
auto state->headers = parse_headers(state->response_buffer);
auto state->body = parse_body(state->response_buffer);
post(io_executor, bind(process, state));
}
void process(shared_ptr<State>) {
/* ... */
}
Имплементации
Callback функции
Недостатъци:
- по-трудна за проследяване логика
- няма как да пазим локални променливи между асинхронните операции
- нужна е голяма промяна на кода за рефакториране от синхронен вариант до вариант с callbacks
Имплементации
Coroutines
Предимства:
- кода изглежда като обикновена синхронна функция
- но изпълнението може да се прекъсне по всяко време
- и да се продължи по-късно
Недостатъци:
- изисква специален runtime
Rust използва вариант на корутини
Асинхронно програмиране в Rust
Пример
Директен превод на синхронния код.
Има място за още подобрения.
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);
enum ClientMsg {
Connected { name: String, chan: mpsc::Sender<BroadcastMsg> },
Message { text: String },
}
struct BroadcastMsg {
from: String,
text: String,
}
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();
let listener = tokio::net::TcpListener::bind(&addr).await.expect("listen failed");
eprintln!("[log] listening on: {}", addr);
let (sender, receiver) = mpsc::channel(100);
let mut next_id = 0;
tokio::task::spawn(server(receiver));
loop {
let (socket, socket_addr) = listener.accept().await.expect("accept failed");
eprintln!("[log] client connected from {socket_addr}");
// "split" socket into reader and writer parts
let (socket_reader, socket_writer) = socket.into_split();
let sender = sender.clone();
let id = ClientId(next_id);
next_id += 1;
let (broadcast_sender, broadcast_receiver) = mpsc::channel(10);
tokio::task::spawn(client_reader(id, socket_reader, sender, broadcast_sender));
tokio::task::spawn(client_writer(socket_writer, broadcast_receiver));
}
}
async fn server(mut receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
struct Client {
name: String,
chan: mpsc::Sender<BroadcastMsg>,
}
let mut clients = HashMap::new();
while let Some(msg) = receiver.recv().await {
match msg {
(id, ClientMsg::Connected { name, chan }) => {
clients.insert(id, Client { name, chan });
},
(id, ClientMsg::Message { text }) => {
clients
.iter()
.filter(|(client_id, _client)| **client_id != id)
.for_each(|(_, client)| {
let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
});
}
}
}
}
async fn client_reader(
id: ClientId,
socket_reader: tokio::net::tcp::OwnedReadHalf,
sender: mpsc::Sender<(ClientId, ClientMsg)>,
broadcast_sender: mpsc::Sender<BroadcastMsg>,
) {
let reader = BufReader::new(socket_reader);
let mut lines = reader.lines();
match lines.next_line().await {
Ok(Some(first_line)) => {
eprintln!("[log] client({}) name={}", id.0, first_line);
sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).await.unwrap()
},
_ => panic!("socket read failed"),
}
while let Ok(Some(line)) = lines.next_line().await {
eprintln!("[log] client({}) message={}", id.0, line);
sender.send((id, ClientMsg::Message { text: line })).await.unwrap();
}
}
async fn client_writer(
mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
mut broadcast_receiver: mpsc::Receiver<BroadcastMsg>
) {
while let Some(msg) = broadcast_receiver.recv().await {
let line = format!("{}> {}", msg.from, msg.text);
socket_writer.write_all(format!("{line}\n").as_bytes()).await.unwrap();
}
}
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);
enum ClientMsg {
Connected { name: String, chan: mpsc::Sender },
Message { text: String },
}
struct BroadcastMsg {
from: String,
text: String,
}
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();
let listener = tokio::net::TcpListener::bind(&addr).await.expect("listen failed");
eprintln!("[log] listening on: {}", addr);
let (sender, receiver) = mpsc::channel(100);
let mut next_id = 0;
tokio::task::spawn(server(receiver));
loop {
let (socket, socket_addr) = listener.accept().await.expect("accept failed");
eprintln!("[log] client connected from {socket_addr}");
// "split" socket into reader and writer parts
let (socket_reader, socket_writer) = socket.into_split();
let sender = sender.clone();
let id = ClientId(next_id);
next_id += 1;
let (broadcast_sender, broadcast_receiver) = mpsc::channel(10);
tokio::task::spawn(client_reader(id, socket_reader, sender, broadcast_sender));
tokio::task::spawn(client_writer(socket_writer, broadcast_receiver));
}
}
async fn server(mut receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
struct Client {
name: String,
chan: mpsc::Sender,
}
let mut clients = HashMap::new();
while let Some(msg) = receiver.recv().await {
match msg {
(id, ClientMsg::Connected { name, chan }) => {
clients.insert(id, Client { name, chan });
},
(id, ClientMsg::Message { text }) => {
clients
.iter()
.filter(|(client_id, _client)| **client_id != id)
.for_each(|(_, client)| {
let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
});
}
}
}
}
async fn client_reader(
id: ClientId,
socket_reader: tokio::net::tcp::OwnedReadHalf,
sender: mpsc::Sender<(ClientId, ClientMsg)>,
broadcast_sender: mpsc::Sender,
) {
let reader = BufReader::new(socket_reader);
let mut lines = reader.lines();
match lines.next_line().await {
Ok(Some(first_line)) => {
eprintln!("[log] client({}) name={}", id.0, first_line);
sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).await.unwrap()
},
_ => panic!("socket read failed"),
}
while let Ok(Some(line)) = lines.next_line().await {
eprintln!("[log] client({}) message={}", id.0, line);
sender.send((id, ClientMsg::Message { text: line })).await.unwrap();
}
}
async fn client_writer(
mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
mut broadcast_receiver: mpsc::Receiver
) {
while let Some(msg) = broadcast_receiver.recv().await {
let line = format!("{}> {}", msg.from, msg.text);
socket_writer.write_all(format!("{line}\n").as_bytes()).await.unwrap();
}
}
Асинхронно програмиране в Rust
async блокове
- декларира операция, която би могла да се изпълни асинхронно
- не се изпълнява код на момента
- само се конструира обект, който съдържа състояние
- подобно на ламбда функция
let future = async {
let answ = 42;
println!("the answer is {answ}");
answ
};
fn main() {
let future = async {
let answ = 42;
println!("the answer is {answ}");
answ
};
}
Асинхронно програмиране в Rust
async функции
- декларира се с
async fn - извикването на функцията не изпълнява код
- извикването на функцията връща обект, имплементиращ
trait Future
async fn forty_two() -> u32 {
let answ = 42;
println!("the answer is {answ}");
answ
}
async fn forty_two() -> u32 {
let answ = 42;
println!("the answer is {answ}");
answ
}
fn main() {}
се разгъва до
fn forty_two() -> impl Future<Output = u32> {
async {
let answ = 42;
println!("the answer is {answ}");
answ
}
}
Асинхронно програмиране в Rust
Future
Futureобектите съдържат автомат (state machine)- докъде е стигнало изпълнението на асинхронната операция
Асинхронно програмиране в Rust
Future
Futureобектите съдържат автомат (state machine)- докъде е стигнало изпълнението на асинхронната операция
Futureобектите са мързеливи- изпълняват асинхронната операция, когато им се извика
.await
Async/.await в Rust
.await
.awaitе постфиксен оператор.awaitможе да се използва само вasync fnилиasync {}
async fn five() -> u8 {
println!("getting five");
5
}
async fn ten() -> u8 {
five().await + five().await
}
fn main() {}
async fn five() -> u8 {
println!("getting five");
5
}
async fn ten() -> u8 {
five().await + five().await
}
Async/.await в Rust
.await
.awaitе постфиксен оператор.awaitможе да се използва само вasync fnилиasync {}
async fn five() -> u8 {
println!("getting five");
5
}
async fn ten() -> u8 {
five().await + five().await
}
fn main() {
let x = ::futures::executor::block_on(async {
ten().await
});
}
getting five getting five
async fn five() -> u8 {
println!("getting five");
5
}
async fn ten() -> u8 {
five().await + five().await
}
fn main() {
let x = ::futures::executor::block_on(async {
ten().await
});
}
Async/.await в Rust
runtime agnostic
Rust няма вграден async runtime
- Rust предоставя async/await синтаксиса
- Rust предоставя фундаменталните типове и trait-ове
- за изпълнение на асинхронен код е необходим async runtime
- който се предоставя от външни библиотеки
Executors
Tokio
- https://tokio.rs/
- https://docs.rs/tokio
- най-големия и най-често използвания framework
Executors
Tokio
- https://tokio.rs/
- https://docs.rs/tokio
- най-големия и най-често използвания framework
tokioе async екосистема- https://github.com/tokio-rs/tracing - logging, profiling
- и др.
Executors
Други
- https://docs.rs/pollster
- минимален executor
- функция
block_on - синхронно изпълнение на един future
- и нищо повече (не поддържа async io, task-ове, …)
Futures екосистемата
- преди да бъдат добавени към
stdfutures съществуваха в rust екосистемата като библиотеката futures
Futures екосистемата
- преди да бъдат добавени към
stdfutures съществуваха в rust екосистемата като библиотеката futures - в стандартната библиотека са стабилизирани част от
futures 0.3
Futures екосистемата
- преди да бъдат добавени към
stdfutures съществуваха в rust екосистемата като библиотеката futures - в стандартната библиотека са стабилизирани част от
futures 0.3 - но има още неща, които не са добавени
- композиране на futures
StreamиSinktraitsAsyncReadиAsyncWritetraitsselect!,join!block_on- и други полезни неща
Tokio
Нормално главната функция на програмата не може да е async.
Tokio предоставя макрос, който скрива това
#[tokio::main]
async fn main() {
}
#[tokio::main]
async fn main() {
}
Tokio
Не е нужно да се използва макроса, същото може да се постигне и на ръка.
Този начин също така позволява настройване на библиотеката
use tokio::runtime::Runtime;
fn main() {
let runtime = Runtime::new().unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box<dyn std::error::Error>> {
/* ... */
}
use tokio::runtime::Runtime;
fn main() {
let runtime = Runtime::new().unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box> {
/* ... */
Ok(())
}
Tokio
По подразбиране tokio стартира многонишков runtime.
Това в много случаи не е необходимо - tokio ни дава concurrency, много често не ни е нужен и паралелизъм. В такива случаи можем да настроим tokio да използва еднонишков runtime.
use tokio::runtime::Builder;
fn main() {
let runtime = Builder::new_current_thread()
.build()
.unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box<dyn std::error::Error>> {
/* ... */
}
use tokio::runtime::Builder;
fn main() {
let runtime = Builder::new_current_thread()
.build()
.unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box> {
/* ... */
Ok(())
}
Tokio
select
Можем да обединим сървърната имплементация в един task.
Със select! можем да реагираме на първото събитие, което се случи.
Това би позволило да имаме споделени mutable данни между двата ръкава - защото всичко се случва в тялото на една функция.
async fn run() {
let addr = "127.0.0.1:8080".to_string();
let listener = tokio::net::TcpListener::bind(&addr).await.expect("listen failed");
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, socket_addr) = accept_result.expect("accept failed");
// etc..
}
msg = receiver.recv() => {
match msg {
Some((id, ClientMsg::Connected { name, chan })) => {
// etc..
},
Some((id, ClientMsg::Message { text })) => {
// etc..
}
None => unreachable!(),
}
}
}
}
}
Tokio
select
Прочетете внимателно докъментацията на select.
- при всяко завъртане на цикъла създаваме нови future-и
listener.accept()иreceiver.recv(). - единия future ще върне резултат
- другия ще бъде канцелиран
- за тези типове изрично пише в документацията, че това е ОК
- за други би могло да доведе до загуба на данни
Futures
cancellation
- няма задължение future да бъде изпълнен до края
- може да бъде канцелиран по всяко време като се унищожи future обекта
- това означава, че при канцелиране се извиква деструктора (
drop) на future-а - но деструктора е синхронна функция
- т.е. future-а няма възможност да направи асинхронно почистване
- това е проблем в Rust екосистемата, за който няма ясно решение
Оцветяване на функции
- What color is your function?
- синхронните функции са сини
- асинхронните функции са червени
- двете не се смесват много добре
Оцветяване на функции
- What color is your function?
- синхронните функции са сини
- асинхронните функции са червени
- двете не се смесват много добре
- от синхронен код не можем да използваме асинхронни функции
- от асинхронен код можем да използваме синхронни функции - но понякога това е грешно
Оцветяване на функции
Това води до раздвояване на екосистемата
std::net::TcpListenervstokio::net::TcpListenerstd::net::TcpStreamvstokio::net::TcpStreamstd::io::Readvstokio::io::AsyncRead/tokio::io::AsyncReadExtstd::io::BufReadervstokio::io::BufReaderstd::sync::mpsc::channelvstokio::sync::mpsc::channel
Оцветяване на функции
В асиннхронен код трябва да не използваме синхронни блокиращи операции.
Асинхронните операции блокират текущия task
Runtime-а може да продължи да изпълнява други задачи пред това време.
Синхронните операции блокират нишката на операционната система.
Това може да блокира целия runtime.
Особено ако използваме еднонишков runtime (concurrency without parallism).
Как работят Futures
Как работят Futures
По детайлен поглед над:
- как езика имплементира futures
- как runtime-ите изпълняват futures
Как работят Futures
Trait Future
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- функцията
pollсе извиква от runtime-а
Как работят Futures
State machine
async fnиasync {}генерират автомат, под формата наenum- кодът може да бъде прекъснат при всяко извикване на
.await - future-а пази информация до кой await point е стигнало изпълнението
- и какви са стойностите на локалните променливи
use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;
async fn write_one(
mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
mut receiver: mpsc::Receiver<BroadcastMsg>,
) {
let recv_future = receiver.recv();
let opt_msg = recv_future.await; // <-- await point 1
if let Some(msg) = opt_msg {
let line = format!("{}> {}\n", msg.from, msg.text);
let write_future = socket_writer.write_all(line.as_bytes());
let write_result = write_future.await; // <-- await point 2
write_result.unwrap();
}
}
fn main() {}
struct BroadcastMsg { from: String, text: String, }
use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;
async fn write_one(
mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
mut receiver: mpsc::Receiver,
) {
let recv_future = receiver.recv();
let opt_msg = recv_future.await; // <-- await point 1
if let Some(msg) = opt_msg {
let line = format!("{}> {}\n", msg.from, msg.text);
let write_future = socket_writer.write_all(line.as_bytes());
let write_result = write_future.await; // <-- await point 2
write_result.unwrap();
}
}
генерира future, подобен на следното
enum FutWriteOne {
Init,
AtAwait1 { recv_future: Fut1 },
AtAwait2 { line: String, write_future: Fut2 },
Done,
}
Как работят Futures
State machine
Някои езици имплементират корутини като "леки"/зелени нишки.
Всяка корутина си има свои:
- стек
- регистри
- instruction pointer
При спирането на една корутина и пускането на друга се прави еквивалентното на context switch, но изцяло в user space.
Rust не работи така.
Как работят Futures
State machine
В Rust всичката информация се съдържа във future структурата (включително локални променливи от стека).
Няма допълнителен стек и регистри.
Ако имаме
main_fut = FutWriteOne::AtAwait1 { recv_future }
Тогава
- runtime-а ще извика
main_fut.poll() FutWriteOne::pollще извикаrecv_future.poll()- и т.н., докато не се стигне до последния future
Как работят Futures
Pin
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- какво означава
Pin<&mut Self>?
Как работят Futures
Pin
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- какво означава
Pin<&mut Self>? Pinмаркира, че от този момент нататък стойносттаSelfняма да бъде премествана в паметта
Как работят Futures
Pin
- нормално в safe Rust не можем да създадем self-referential структура - тип, който държи указател към себе си
- но това се случва много често при future-и
Pinпозволява на компилатора да генерира такива типове и те да са безопасни за използване
async fn example() {
let data = 10;
let a = &data;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("{}", a);
}
fn main() {}
async fn example() {
let data = 10;
let a = &data;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("{}", a);
}
Би генерирало нещо подобно на
enum ExampleFut {
Init,
AtAwait1 { data: i32, a: &i32, sleep_fut: tokio::time::Sleep },
// ^^^^ - референция към `data`
Done,
}
Как работят Futures
Waker
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- какво е
std::task::Context?
Как работят Futures
Waker
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- какво е
std::task::Context? - стойност, създадена от runtime-а
- дава достъп до
std::task::Waker
Как работят Futures
Waker
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- какво е
std::task::Context? - стойност, създадена от runtime-а
- дава достъп до
std::task::Waker - какво е
std::task::Waker?
Как работят Futures
Waker
Как runtime-а изпълнява future-и?
Извиква fut.poll()
Как работят Futures
Waker
Как runtime-а изпълнява future-и?
Извиква fut.poll()
- ако върне
Poll::Ready(value)- future-а е изпълнен до край
Как работят Futures
Waker
Как runtime-а изпълнява future-и?
Извиква fut.poll()
- ако върне
Poll::Ready(value)- future-а е изпълнен до край - ако върне
Poll::Pending- future не е готов, трябва да се извикаpollпо-късно
Как работят Futures
Waker
Как runtime-а изпълнява future-и?
Извиква fut.poll()
- ако върне
Poll::Ready(value)- future-а е изпълнен до край - ако върне
Poll::Pending- future не е готов, трябва да се извикаpollпо-късно- но колко по-късно?
Как работят Futures
Waker
Как runtime-а изпълнява future-и?
Извиква fut.poll()
- ако върне
Poll::Ready(value)- future-а е изпълнен до край - ако върне
Poll::Pending- future не е готов, трябва да се извикаpollпо-късно- но колко по-късно?
- runtime-а не знае
Как работят Futures
Waker
Как runtime-а изпълнява future-и?
Извиква fut.poll()
- ако върне
Poll::Ready(value)- future-а е изпълнен до край - ако върне
Poll::Pending- future не е готов, трябва да се извикаpollпо-късно- но колко по-късно?
- runtime-а не знае
Wakerсе използва, за да може future-ът да каже на runtime-а кога има смисъл да бъде poll-нат
Как работят Futures
Waker
async fn example() {
let data = 10;
let a = &data;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("{}", a);
}
fn main() {}
async fn example() {
let data = 10;
let a = &data;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("{}", a);
}
sleep_fut = tokio::time::sleep(...);- извиква се
sleep_fut.poll(ctx); - то подава на tokio runtime
waker = ctx.waker()- и казва "извикай
waker().wake()след 1 секунда"
- и казва "извикай
- и връща
Poll::Pending
Как работят Futures
Waker
async fn example(
mut socket: tokio::net::TcpStream,
mut receiver: tokio::sync::mpsc::Receiver<String>,
) {
let text = receiver.recv().await.unwrap(); // <-- await point 1
socket.write_all(text.as_bytes()).await.unwrap(); // <-- await point 2
}
use tokio::io::AsyncWriteExt;
fn main() {}
async fn example(
mut socket: tokio::net::TcpStream,
mut receiver: tokio::sync::mpsc::Receiver,
) {
let text = receiver.recv().await.unwrap(); // <-- await point 1
socket.write_all(text.as_bytes()).await.unwrap(); // <-- await point 2
}
- await point 1
recv_fut = receiver.recv()- извиква се
recv_fut.poll(ctx)- то запазва
waker = ctx.waker()в канала - при следващото
sender.send(msg)ще се извикаwaker.wake()
- то запазва
- и връща
Poll::Pending
Как работят Futures
Waker
async fn example(
mut socket: tokio::net::TcpStream,
mut receiver: tokio::sync::mpsc::Receiver<String>,
) {
let text = receiver.recv().await.unwrap(); // <-- await point 1
socket.write_all(text.as_bytes()).await.unwrap(); // <-- await point 2
}
use tokio::io::AsyncWriteExt;
fn main() {}
async fn example(
mut socket: tokio::net::TcpStream,
mut receiver: tokio::sync::mpsc::Receiver,
) {
let text = receiver.recv().await.unwrap(); // <-- await point 1
socket.write_all(text.as_bytes()).await.unwrap(); // <-- await point 2
}
- await point 2
write_fut = socket.write_all(...)- извиква се
write_fut.poll(ctx)- то подава
waker = ctx.waker()на tokio runtime - и казва "извикай
waker.wake(), когато сокета е готов за четене"
- то подава
- и връща
Poll::Pending
Executors
Съвместимост
- не всички async библиотеки са съвместими една с друга!
Executors
Съвместимост
- не всички async библиотеки са съвместими една с друга!
- функционалности, които могат да работят под всеки executor:
- канали -
mpsc::channel, … - синхронизационни примитиви -
Mutex, … - комбинатори -
Join,Race, … - асинхронни задачи (task-ове), които не използват директно I/O
- код, който използва само
Futuretrait-а
- канали -
Executors
Съвместимост
- не всички async библиотеки са съвместими една с друга!
- функционалности, които могат да работят под всеки executor:
- канали -
mpsc::channel, … - синхронизационни примитиви -
Mutex, … - комбинатори -
Join,Race, … - асинхронни задачи (task-ове), които не използват директно I/O
- код, който използва само
Futuretrait-а
- канали -
- функционалности, които могат да работят само на определен executor
- асинхронен вход/изход (
AsyncRead,AsyncWrite, networking, …) - таймери
- код, който има тясно взаимодействие с event loop-а на executor-а
- асинхронен вход/изход (
Executors
Съвместимост
- не всички async библиотеки са съвместими една с друга!
- функционалности, които могат да работят под всеки executor:
- канали -
mpsc::channel, … - синхронизационни примитиви -
Mutex, … - комбинатори -
Join,Race, … - асинхронни задачи (task-ове), които не използват директно I/O
- код, който използва само
Futuretrait-а
- канали -
- функционалности, които могат да работят само на определен executor
- асинхронен вход/изход (
AsyncRead,AsyncWrite, networking, …) - таймери
- код, който има тясно взаимодействие с event loop-а на executor-а
- асинхронен вход/изход (
- https://rust-lang.github.io/async-book/08_ecosystem/00_chapter.html
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
- блокиращите операции блокират текущата нишка
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
- блокиращите операции блокират текущата нишка
- async executor-ите ползват ограничено количество нишки за изпълнение на задачи
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
- блокиращите операции блокират текущата нишка
- async executor-ите ползват ограничено количество нишки за изпълнение на задачи
- в резултат можем лесно да си забавим или тотално забием програмата
Подводни камъни
Примитиви за синхронизация
- примитивите в
std::syncблокират текущата нишка
Подводни камъни
Примитиви за синхронизация
- примитивите в
std::syncблокират текущата нишка - вместо това трябва да се използват async версии, които блокират само текущата задача
Подводни камъни
Примитиви за синхронизация
- примитивите в
std::syncблокират текущата нишка - вместо това трябва да се използват async версии, които блокират само текущата задача
tokio::sync
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
- може да се пуска нова нишка за всяка операция
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
- може да се пуска нова нишка за всяка операция
- или да се използва thread pool
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
- може да се пуска нова нишка за всяка операция
- или да се използва thread pool
- tokio има
tokio::task::spawn_blocking
Подводни камъни
Задържане на променливи между await състояния
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тук
let val1 = some_async_op(&big_vec).await;
let val2 = other_async_op().await;
val1 + val2
// Деструктора на `big_vec` се извиква тук, в края на scope-а.
// Това означава, че променливата трябва да се пази жива през цялото време
// докато future-а е жив - ненужно заемане на памет
}
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тук
let val1 = some_async_op(&big_vec).await;
let val2 = other_async_op().await;
val1 + val2
// Деструктора на `big_vec` се извиква тук, в края на scope-а.
// Това означава, че променливата трябва да се пази жива през цялото време
// докато future-а е жив - ненужно заемане на памет
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}
Подводни камъни
Задържане на променливи между await състояния
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тука
let val1 = some_async_op(&big_vec).await;
// Ръчно деструктираме променливата.
// Така тя няма да се пази в състоянието на future-а при
// следващите await точки.
drop(big_vec);
let val2 = other_async_op().await;
val1 + val2
}
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тука
let val1 = some_async_op(&big_vec).await;
// Ръчно деструктираме променливата.
// Така тя няма да се пази в състоянието на future-а при
// следващите await точки.
drop(big_vec);
let val2 = other_async_op().await;
val1 + val2
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}
Допълнителни материали
- https://rust-lang.github.io/async-book/ - официалната книга за асинхронен Ръст
- https://book.async.rs/ - книгата за async std, имат добър туториал за имплементация на чат сървър
- https://www.youtube.com/watch?v=L7X0vpAU-sU - презентация за async-std