Асинхронно програмиране

async/.await

16 декември 2025

Паралелизъм и concurrency

Паралелизъм и concurrency

Паралелизъм и concurrency

Паралелизъм и concurrency

Двете понятия са ортогонални:

Приложение

Приложение

Приложение

Пример

TCP чат клиент (sync)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;

fn main() {
    let mut args = std::env::args();
    let _ = args.next(); // skip executable name
    let name = args.next().unwrap_or_else(|| {
        println!("Usage: ./client NAME");
        std::process::exit(1);
    });

    let addr = "127.0.0.1:8080".to_string();
    let mut socket = TcpStream::connect(addr).expect("failed to connect");

    socket
        .write_all(format!("{name}\n").as_bytes())
        .expect("write failed");

    let mut socket_writer = socket.try_clone().expect("clone failed");
    let socket_reader = socket;

    let h1 = std::thread::spawn(move || {
        let stdin = std::io::stdin().lock();

        for line in stdin.lines() {
            let line = line.expect("stdin read failed");

            socket_writer
                .write_all(format!("{line}\n").as_bytes())
                .expect("socket write failed");
        }
    });

    let h2 = std::thread::spawn(move || {
        let buf_reader = BufReader::new(socket_reader);
        for line in buf_reader.lines() {
            let line = line.expect("socket read failed");
            println!("{}", line);
        }
    });

    h1.join().unwrap();
    h2.join().unwrap();
}
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;

fn main() {
    let mut args = std::env::args();
    let _ = args.next(); // skip executable name
    let name = args.next().unwrap_or_else(|| {
        println!("Usage: ./client NAME");
        std::process::exit(1);
    });

    let addr = "127.0.0.1:8080".to_string();
    let mut socket = TcpStream::connect(addr).expect("failed to connect");

    socket
        .write_all(format!("{name}\n").as_bytes())
        .expect("write failed");

    let mut socket_writer = socket.try_clone().expect("clone failed");
    let socket_reader = socket;

    let h1 = std::thread::spawn(move || {
        let stdin = std::io::stdin().lock();

        for line in stdin.lines() {
            let line = line.expect("stdin read failed");

            socket_writer
                .write_all(format!("{line}\n").as_bytes())
                .expect("socket write failed");
        }
    });

    let h2 = std::thread::spawn(move || {
        let buf_reader = BufReader::new(socket_reader);
        for line in buf_reader.lines() {
            let line = line.expect("socket read failed");
            println!("{}", line);
        }
    });

    h1.join().unwrap();
    h2.join().unwrap();
}

Пример

TCP чат сървър (sync)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::sync::mpsc;

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);

enum ClientMsg {
    Connected { name: String, chan: mpsc::SyncSender<BroadcastMsg> },
    Message { text: String },
}

struct BroadcastMsg {
    from: String,
    text: String,
}

fn main() {
    let addr = "127.0.0.1:8080".to_string();
    let listener = std::net::TcpListener::bind(&addr).expect("listen failed");
    eprintln!("[log] listening on: {}", addr);

    let (sender, receiver) = mpsc::sync_channel(100);
    let mut next_id = 0;

    std::thread::spawn(move || server(receiver));

    loop {
        let (socket, socket_addr) = listener.accept().expect("accept failed");
        eprintln!("[log] client connected from {socket_addr}");

        // "split" socket into reader and writer parts
        let socket_writer = socket.try_clone().expect("clone failed");
        let socket_reader = socket;

        let sender = sender.clone();
        let id = ClientId(next_id);
        next_id += 1;

        let (broadcast_sender, broadcast_receiver) = mpsc::sync_channel(10);

        std::thread::spawn(move || client_reader(id, socket_reader, sender, broadcast_sender));
        std::thread::spawn(move || client_writer(socket_writer, broadcast_receiver));
    }
}

fn server(receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
    struct Client {
        name: String,
        chan: mpsc::SyncSender<BroadcastMsg>,
    }

    let mut clients = HashMap::new();

    for msg in receiver.iter() {
        match msg {
            (id, ClientMsg::Connected { name, chan }) => {
                clients.insert(id, Client { name, chan });
            },
            (id, ClientMsg::Message { text }) => {
                clients
                    .iter()
                    .filter(|(client_id, _client)| **client_id != id)
                    .for_each(|(_, client)| {
                        let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
                    });
            }
        }
    }
}

fn client_reader(
    id: ClientId,
    socket_reader: TcpStream,
    sender: mpsc::SyncSender<(ClientId, ClientMsg)>,
    broadcast_sender: mpsc::SyncSender<BroadcastMsg>,
) {
    let reader = BufReader::new(socket_reader);
    let mut lines = reader.lines();

    match lines.next() {
        Some(Ok(first_line)) => {
            eprintln!("[log] client({}) name={}", id.0, first_line);
            sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).unwrap()
        },
        _ => panic!("socket read failed"),
    }

    while let Some(Ok(line)) = lines.next() {
        eprintln!("[log] client({}) message={}", id.0, line);
        sender.send((id, ClientMsg::Message { text: line })).unwrap();
    }
}

fn client_writer(
    mut socket_writer: TcpStream,
    broadcast_receiver: mpsc::Receiver<BroadcastMsg>
) {
    for msg in broadcast_receiver.iter() {
        let line = format!("{}> {}", msg.from, msg.text);
        socket_writer.write_all(format!("{line}\n").as_bytes()).unwrap();
    }
}
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::sync::mpsc;

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);

enum ClientMsg {
    Connected { name: String, chan: mpsc::SyncSender },
    Message { text: String },
}

struct BroadcastMsg {
    from: String,
    text: String,
}

fn main() {
    let addr = "127.0.0.1:8080".to_string();
    let listener = std::net::TcpListener::bind(&addr).expect("listen failed");
    eprintln!("[log] listening on: {}", addr);

    let (sender, receiver) = mpsc::sync_channel(100);
    let mut next_id = 0;

    std::thread::spawn(move || server(receiver));

    loop {
        let (socket, socket_addr) = listener.accept().expect("accept failed");
        eprintln!("[log] client connected from {socket_addr}");

        // "split" socket into reader and writer parts
        let socket_writer = socket.try_clone().expect("clone failed");
        let socket_reader = socket;

        let sender = sender.clone();
        let id = ClientId(next_id);
        next_id += 1;

        let (broadcast_sender, broadcast_receiver) = mpsc::sync_channel(10);

        std::thread::spawn(move || client_reader(id, socket_reader, sender, broadcast_sender));
        std::thread::spawn(move || client_writer(socket_writer, broadcast_receiver));
    }
}

fn server(receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
    struct Client {
        name: String,
        chan: mpsc::SyncSender,
    }

    let mut clients = HashMap::new();

    for msg in receiver.iter() {
        match msg {
            (id, ClientMsg::Connected { name, chan }) => {
                clients.insert(id, Client { name, chan });
            },
            (id, ClientMsg::Message { text }) => {
                clients
                    .iter()
                    .filter(|(client_id, _client)| **client_id != id)
                    .for_each(|(_, client)| {
                        let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
                    });
            }
        }
    }
}

fn client_reader(
    id: ClientId,
    socket_reader: TcpStream,
    sender: mpsc::SyncSender<(ClientId, ClientMsg)>,
    broadcast_sender: mpsc::SyncSender,
) {
    let reader = BufReader::new(socket_reader);
    let mut lines = reader.lines();

    match lines.next() {
        Some(Ok(first_line)) => {
            eprintln!("[log] client({}) name={}", id.0, first_line);
            sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).unwrap()
        },
        _ => panic!("socket read failed"),
    }

    while let Some(Ok(line)) = lines.next() {
        eprintln!("[log] client({}) message={}", id.0, line);
        sender.send((id, ClientMsg::Message { text: line })).unwrap();
    }
}

fn client_writer(
    mut socket_writer: TcpStream,
    broadcast_receiver: mpsc::Receiver
) {
    for msg in broadcast_receiver.iter() {
        let line = format!("{}> {}", msg.from, msg.text);
        socket_writer.write_all(format!("{line}\n").as_bytes()).unwrap();
    }
}

Пример

TCP чат сървър (sync)

Кодът е коректен
(ако изключим липсата на error handling и това, че не поддържа откачане на клиенти)

Но не е оптимален - използва повече ресурси (нишки), отколкото е необходимо

Пример

TCP чат сървър (sync)

Наблюдение - през повечето време повечето нишки са в режим на изчакване:

Можем ли да постигнем същата функционалност без блокиращи операции?

Async I/O

Това е често срещан проблем.
Всяка операционна система предоставя начин за асинхронно изпълнение на входно/изходни операции:

epoll и kqueue позволяват:

IOCP и io_uring се използват за същото, но ползват различен интерфейс (completion based vs poll based).

Async I/O

epoll

Псевдокод:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
let listener_fd = tcp_listener.as_fd();
let client0_fd = clients[0].tcp_stream.as_fd();
let client1_fd = clients[1].tcp_stream.as_fd();

epollfd := epoll_create();
epoll_ctl(epollfd, .CTL_ADD, listener_fd, {.IN}) // notify when fd is ready for READ operation
epoll_ctl(epollfd, .CTL_ADD, client0_fd, {.IN})  // notify when fd is ready for READ operation
epoll_ctl(epollfd, .CTL_ADD, client1_fd, {.IN})  // notify when fd is ready for READ operation

loop {
    events := epoll_wait(epollfd);

    for evt in events {
        cond {
            evt.fd == listener_fd => {
                stream := tcp_listener.accept();  // will not block
                // etc ...
            }
            evt.fd in [client0_fd, client1_fd] => {
                id := fd_to_client(evt.fd);
                msg := clients[id].tcp_stream.read(); // will not block
                // etc ...
            }
        }
    }
}

Async I/O

Езици и библиотеки, които поддържат асинхронен вход/изход, предоставят различни абстракции върху тази функционалност на операционната система

Async I/O имплементации

Callback функции

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// псевдокод (базирано на c++ boost::asio)
void process_request(shared_ptr<State> state) {
    async_connect(io_executor, state->socket, bind(on_connected, state));
}

void on_connected(shared_ptr<State> state) {
    state->request_buffer = prepare_request();
    async_write(socket, state->request_buffer, bind(on_send_complete, state));
}

void on_send_complete(shared_ptr<State> state) { 
    asyn_read(socket, state->response_buffer, bind(on_read_complete, state));
}

void on_read_complete(shared_ptr<State> state) {
    auto state->headers = parse_headers(state->response_buffer);
    auto state->body = parse_body(state->response_buffer);

    post(io_executor, bind(process, state));
}

void process(shared_ptr<State>) {
    /* ... */
}

Имплементации

Callback функции

Недостатъци:

Имплементации

Coroutines

Предимства:

Недостатъци:

Rust използва вариант на корутини

Асинхронно програмиране в Rust

Пример

Директен превод на синхронния код.
Има място за още подобрения.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);

enum ClientMsg {
    Connected { name: String, chan: mpsc::Sender<BroadcastMsg> },
    Message { text: String },
}

struct BroadcastMsg {
    from: String,
    text: String,
}

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await.expect("listen failed");
    eprintln!("[log] listening on: {}", addr);

    let (sender, receiver) = mpsc::channel(100);
    let mut next_id = 0;

    tokio::task::spawn(server(receiver));

    loop {
        let (socket, socket_addr) = listener.accept().await.expect("accept failed");
        eprintln!("[log] client connected from {socket_addr}");

        // "split" socket into reader and writer parts
        let (socket_reader, socket_writer) = socket.into_split();

        let sender = sender.clone();
        let id = ClientId(next_id);
        next_id += 1;

        let (broadcast_sender, broadcast_receiver) = mpsc::channel(10);

        tokio::task::spawn(client_reader(id, socket_reader, sender, broadcast_sender));
        tokio::task::spawn(client_writer(socket_writer, broadcast_receiver));
    }
}

async fn server(mut receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
    struct Client {
        name: String,
        chan: mpsc::Sender<BroadcastMsg>,
    }

    let mut clients = HashMap::new();

    while let Some(msg) = receiver.recv().await {
        match msg {
            (id, ClientMsg::Connected { name, chan }) => {
                clients.insert(id, Client { name, chan });
            },
            (id, ClientMsg::Message { text }) => {
                clients
                    .iter()
                    .filter(|(client_id, _client)| **client_id != id)
                    .for_each(|(_, client)| {
                        let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
                    });
            }
        }
    }
}

async fn client_reader(
    id: ClientId,
    socket_reader: tokio::net::tcp::OwnedReadHalf,
    sender: mpsc::Sender<(ClientId, ClientMsg)>,
    broadcast_sender: mpsc::Sender<BroadcastMsg>,
) {
    let reader = BufReader::new(socket_reader);
    let mut lines = reader.lines();

    match lines.next_line().await {
        Ok(Some(first_line)) => {
            eprintln!("[log] client({}) name={}", id.0, first_line);
            sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).await.unwrap()
        },
        _ => panic!("socket read failed"),
    }

    while let Ok(Some(line)) = lines.next_line().await {
        eprintln!("[log] client({}) message={}", id.0, line);
        sender.send((id, ClientMsg::Message { text: line })).await.unwrap();
    }
}

async fn client_writer(
    mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
    mut broadcast_receiver: mpsc::Receiver<BroadcastMsg>
) {
    while let Some(msg) = broadcast_receiver.recv().await {
        let line = format!("{}> {}", msg.from, msg.text);
        socket_writer.write_all(format!("{line}\n").as_bytes()).await.unwrap();
    }
}
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ClientId(pub i32);

enum ClientMsg {
    Connected { name: String, chan: mpsc::Sender },
    Message { text: String },
}

struct BroadcastMsg {
    from: String,
    text: String,
}

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await.expect("listen failed");
    eprintln!("[log] listening on: {}", addr);

    let (sender, receiver) = mpsc::channel(100);
    let mut next_id = 0;

    tokio::task::spawn(server(receiver));

    loop {
        let (socket, socket_addr) = listener.accept().await.expect("accept failed");
        eprintln!("[log] client connected from {socket_addr}");

        // "split" socket into reader and writer parts
        let (socket_reader, socket_writer) = socket.into_split();

        let sender = sender.clone();
        let id = ClientId(next_id);
        next_id += 1;

        let (broadcast_sender, broadcast_receiver) = mpsc::channel(10);

        tokio::task::spawn(client_reader(id, socket_reader, sender, broadcast_sender));
        tokio::task::spawn(client_writer(socket_writer, broadcast_receiver));
    }
}

async fn server(mut receiver: mpsc::Receiver<(ClientId, ClientMsg)>) {
    struct Client {
        name: String,
        chan: mpsc::Sender,
    }

    let mut clients = HashMap::new();

    while let Some(msg) = receiver.recv().await {
        match msg {
            (id, ClientMsg::Connected { name, chan }) => {
                clients.insert(id, Client { name, chan });
            },
            (id, ClientMsg::Message { text }) => {
                clients
                    .iter()
                    .filter(|(client_id, _client)| **client_id != id)
                    .for_each(|(_, client)| {
                        let _ = client.chan.try_send(BroadcastMsg { from: client.name.clone(), text: text.clone() });
                    });
            }
        }
    }
}

async fn client_reader(
    id: ClientId,
    socket_reader: tokio::net::tcp::OwnedReadHalf,
    sender: mpsc::Sender<(ClientId, ClientMsg)>,
    broadcast_sender: mpsc::Sender,
) {
    let reader = BufReader::new(socket_reader);
    let mut lines = reader.lines();

    match lines.next_line().await {
        Ok(Some(first_line)) => {
            eprintln!("[log] client({}) name={}", id.0, first_line);
            sender.send((id, ClientMsg::Connected { name: first_line, chan: broadcast_sender })).await.unwrap()
        },
        _ => panic!("socket read failed"),
    }

    while let Ok(Some(line)) = lines.next_line().await {
        eprintln!("[log] client({}) message={}", id.0, line);
        sender.send((id, ClientMsg::Message { text: line })).await.unwrap();
    }
}

async fn client_writer(
    mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
    mut broadcast_receiver: mpsc::Receiver
) {
    while let Some(msg) = broadcast_receiver.recv().await {
        let line = format!("{}> {}", msg.from, msg.text);
        socket_writer.write_all(format!("{line}\n").as_bytes()).await.unwrap();
    }
}

Асинхронно програмиране в Rust

async блокове

1 2 3 4 5
let future = async {
    let answ = 42;
    println!("the answer is {answ}");
    answ
};
fn main() {
let future = async {
    let answ = 42;
    println!("the answer is {answ}");
    answ
};
}

Асинхронно програмиране в Rust

async функции

1 2 3 4 5
async fn forty_two() -> u32 {
    let answ = 42;
    println!("the answer is {answ}");
    answ
}
async fn forty_two() -> u32 {
    let answ = 42;
    println!("the answer is {answ}");
    answ
}
fn main() {}

се разгъва до

1 2 3 4 5 6 7
fn forty_two() -> impl Future<Output = u32> {
    async {
        let answ = 42;
        println!("the answer is {answ}");
        answ
    }
}

Асинхронно програмиране в Rust

Future

Асинхронно програмиране в Rust

Future

Async/.await в Rust

.await

1 2 3 4 5 6 7 8
async fn five() -> u8 {
    println!("getting five");
    5
}

async fn ten() -> u8 {
    five().await + five().await
}
fn main() {}
async fn five() -> u8 {
    println!("getting five");
    5
}

async fn ten() -> u8 {
    five().await + five().await
}

Async/.await в Rust

.await

1 2 3 4 5 6 7 8 9 10 11 12 13 14
async fn five() -> u8 {
    println!("getting five");
    5
}

async fn ten() -> u8 {
    five().await + five().await
}

fn main() {
    let x = ::futures::executor::block_on(async {
        ten().await
    });
}
getting five getting five
async fn five() -> u8 {
    println!("getting five");
    5
}

async fn ten() -> u8 {
    five().await + five().await
}

fn main() {
    let x = ::futures::executor::block_on(async {
        ten().await
    });
}

Async/.await в Rust

runtime agnostic

Rust няма вграден async runtime

Executors

Tokio

Executors

Tokio

Executors

Други

Futures екосистемата

Futures екосистемата

Futures екосистемата

Tokio

Нормално главната функция на програмата не може да е async.
Tokio предоставя макрос, който скрива това

1 2 3
#[tokio::main]
async fn main() {
}
#[tokio::main]
async fn main() {
}

Tokio

Не е нужно да се използва макроса, същото може да се постигне и на ръка.
Този начин също така позволява настройване на библиотеката

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
use tokio::runtime::Runtime;

fn main() {
    let runtime = Runtime::new().unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /* ... */
}
use tokio::runtime::Runtime;

fn main() {
    let runtime = Runtime::new().unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box> {
    /* ... */
Ok(())
}

Tokio

По подразбиране tokio стартира многонишков runtime.
Това в много случаи не е необходимо - tokio ни дава concurrency, много често не ни е нужен и паралелизъм. В такива случаи можем да настроим tokio да използва еднонишков runtime.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_current_thread()
        .build()
        .unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /* ... */
}
use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_current_thread()
        .build()
        .unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box> {
    /* ... */
Ok(())
}

Tokio

select

Можем да обединим сървърната имплементация в един task.
Със select! можем да реагираме на първото събитие, което се случи.
Това би позволило да имаме споделени mutable данни между двата ръкава - защото всичко се случва в тялото на една функция.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
async fn run() {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await.expect("listen failed");

    loop {
        tokio::select! {
            accept_result = listener.accept() => {
                let (socket, socket_addr) = accept_result.expect("accept failed");
                // etc..
            }
            msg = receiver.recv() => {
                match msg {
                    Some((id, ClientMsg::Connected { name, chan })) => {
                        // etc..
                    },
                    Some((id, ClientMsg::Message { text })) => {
                        // etc..
                    }
                    None => unreachable!(),
                }
            }
        }
    }
}

Tokio

select

Прочетете внимателно докъментацията на select.

Futures

cancellation

Оцветяване на функции

Оцветяване на функции

Оцветяване на функции

Това води до раздвояване на екосистемата

Оцветяване на функции

В асиннхронен код трябва да не използваме синхронни блокиращи операции.

Асинхронните операции блокират текущия task
Runtime-а може да продължи да изпълнява други задачи пред това време.

Синхронните операции блокират нишката на операционната система.
Това може да блокира целия runtime.
Особено ако използваме еднонишков runtime (concurrency without parallism).

Как работят Futures

Как работят Futures

По детайлен поглед над:

Как работят Futures

Trait Future

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Как работят Futures

State machine

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;

async fn write_one(
    mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
    mut receiver: mpsc::Receiver<BroadcastMsg>,
) {
    let recv_future = receiver.recv();
    let opt_msg = recv_future.await;            // <-- await point 1
    if let Some(msg) = opt_msg  {
        let line = format!("{}> {}\n", msg.from, msg.text);
        let write_future = socket_writer.write_all(line.as_bytes());
        let write_result = write_future.await;  // <-- await point 2

        write_result.unwrap();
    }
}
fn main() {}
struct BroadcastMsg { from: String, text: String, }
use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;

async fn write_one(
    mut socket_writer: tokio::net::tcp::OwnedWriteHalf,
    mut receiver: mpsc::Receiver,
) {
    let recv_future = receiver.recv();
    let opt_msg = recv_future.await;            // <-- await point 1
    if let Some(msg) = opt_msg  {
        let line = format!("{}> {}\n", msg.from, msg.text);
        let write_future = socket_writer.write_all(line.as_bytes());
        let write_result = write_future.await;  // <-- await point 2

        write_result.unwrap();
    }
}

генерира future, подобен на следното

1 2 3 4 5 6
enum FutWriteOne {
    Init,
    AtAwait1 { recv_future: Fut1 },
    AtAwait2 { line: String, write_future: Fut2 },
    Done,
}

Как работят Futures

State machine

Някои езици имплементират корутини като "леки"/зелени нишки.
Всяка корутина си има свои:

При спирането на една корутина и пускането на друга се прави еквивалентното на context switch, но изцяло в user space.

Rust не работи така.

Как работят Futures

State machine

В Rust всичката информация се съдържа във future структурата (включително локални променливи от стека).
Няма допълнителен стек и регистри.

Ако имаме

Тогава

Как работят Futures

Pin

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Как работят Futures

Pin

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Как работят Futures

Pin

1 2 3 4 5 6
async fn example() {
    let data = 10;
    let a = &data;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("{}", a);
}
fn main() {}
async fn example() {
    let data = 10;
    let a = &data;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("{}", a);
}

Би генерирало нещо подобно на

1 2 3 4 5 6
enum ExampleFut {
    Init,
    AtAwait1 { data: i32, a: &i32, sleep_fut: tokio::time::Sleep },
                          // ^^^^ - референция към `data`
    Done,
}

Как работят Futures

Waker

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Как работят Futures

Waker

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Как работят Futures

Waker

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Как работят Futures

Waker

Как runtime-а изпълнява future-и?
Извиква fut.poll()

Как работят Futures

Waker

Как runtime-а изпълнява future-и?
Извиква fut.poll()

Как работят Futures

Waker

Как runtime-а изпълнява future-и?
Извиква fut.poll()

Как работят Futures

Waker

Как runtime-а изпълнява future-и?
Извиква fut.poll()

Как работят Futures

Waker

Как runtime-а изпълнява future-и?
Извиква fut.poll()

Как работят Futures

Waker

Как runtime-а изпълнява future-и?
Извиква fut.poll()

Как работят Futures

Waker

1 2 3 4 5 6
async fn example() {
    let data = 10;
    let a = &data;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("{}", a);
}
fn main() {}
async fn example() {
    let data = 10;
    let a = &data;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("{}", a);
}

Как работят Futures

Waker

1 2 3 4 5 6 7
async fn example(
    mut socket: tokio::net::TcpStream,
    mut receiver: tokio::sync::mpsc::Receiver<String>,
) {
    let text = receiver.recv().await.unwrap();       // <-- await point 1
    socket.write_all(text.as_bytes()).await.unwrap();  // <-- await point 2
}
use tokio::io::AsyncWriteExt;
fn main() {}
async fn example(
    mut socket: tokio::net::TcpStream,
    mut receiver: tokio::sync::mpsc::Receiver,
) {
    let text = receiver.recv().await.unwrap();       // <-- await point 1
    socket.write_all(text.as_bytes()).await.unwrap();  // <-- await point 2
}

Как работят Futures

Waker

1 2 3 4 5 6 7
async fn example(
    mut socket: tokio::net::TcpStream,
    mut receiver: tokio::sync::mpsc::Receiver<String>,
) {
    let text = receiver.recv().await.unwrap();       // <-- await point 1
    socket.write_all(text.as_bytes()).await.unwrap();  // <-- await point 2
}
use tokio::io::AsyncWriteExt;
fn main() {}
async fn example(
    mut socket: tokio::net::TcpStream,
    mut receiver: tokio::sync::mpsc::Receiver,
) {
    let text = receiver.recv().await.unwrap();       // <-- await point 1
    socket.write_all(text.as_bytes()).await.unwrap();  // <-- await point 2
}

Executors

Съвместимост

Executors

Съвместимост

Executors

Съвместимост

Executors

Съвместимост

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Задържане на променливи между await състояния

1 2 3 4 5 6 7 8 9 10 11 12 13 14
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тук
    let val1 = some_async_op(&big_vec).await;

    let val2 = other_async_op().await;

    val1 + val2

    // Деструктора на `big_vec` се извиква тук, в края на scope-а.
    // Това означава, че променливата трябва да се пази жива през цялото време
    // докато future-а е жив - ненужно заемане на памет
}
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тук
    let val1 = some_async_op(&big_vec).await;

    let val2 = other_async_op().await;

    val1 + val2

    // Деструктора на `big_vec` се извиква тук, в края на scope-а.
    // Това означава, че променливата трябва да се пази жива през цялото време
    // докато future-а е жив - ненужно заемане на памет
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}

Подводни камъни

Задържане на променливи между await състояния

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    // Ръчно деструктираме променливата.
    // Така тя няма да се пази в състоянието на future-а при
    // следващите await точки.
    drop(big_vec);

    let val2 = other_async_op().await;

    val1 + val2
}
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    // Ръчно деструктираме променливата.
    // Така тя няма да се пази в състоянието на future-а при
    // следващите await точки.
    drop(big_vec);

    let val2 = other_async_op().await;

    val1 + val2
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}

Допълнителни материали

Въпроси