Асинхронно програмиране

async/.await

10 декември 2024

Паралелизъм и concurrency

Паралелизъм и concurrency

Паралелизъм и concurrency

Паралелизъм и concurrency

Двете понятия са ортогонални:

Приложение

Приложение

Приложение

Защо?

Пример: TCP echo сървър

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
use std::io::{Read, Write};
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
    let addr = "127.0.0.1:8080".to_string();
    let listener = std::net::TcpListener::bind(&addr)?;
    println!("Listening on: {}", addr);

    loop {
        let (mut socket, _) = listener.accept()?;

        std::thread::spawn(move || {
            let mut buf = vec![0; 1024];


            loop {
                let n = socket
                    .read(&mut buf)
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                socket
                    .write_all(&buf[0..n])
                    .expect("failed to write data to socket");
            }
        });
    }
}
use std::io::{Read, Write};
use std::error::Error;

fn main() -> Result<(), Box> {
    let addr = "127.0.0.1:8080".to_string();
    let listener = std::net::TcpListener::bind(&addr)?;
    println!("Listening on: {}", addr);

    loop {
        let (mut socket, _) = listener.accept()?;

        std::thread::spawn(move || {
            let mut buf = vec![0; 1024];


            loop {
                let n = socket
                    .read(&mut buf)
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                socket
                    .write_all(&buf[0..n])
                    .expect("failed to write data to socket");
            }
        });
    }
}

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Как?

Как?

Как?

Как?

Как?

Как?

Как?


Как?


Как?


Как?


Как?

Как?

Имплементации

Event driven (с callback функции)

Имплементации

Event driven (с callback функции)

Имплементации

Event driven (с callback функции)

Имплементации

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// псевдокод
void process_request(shared_ptr<State> state) {
    async_connect(io_executor, state->socket, bind(on_connected, state));
}

void on_connected(shared_ptr<State> state) {
    state->request_buffer = prepare_request();
    async_write(socket, state->request_buffer, bind(on_send_complete, state));
}

void on_send_complete(shared_ptr<State> state) { 
    asyn_read(socket, state->response_buffer, bind(on_read_complete, state));
}

void on_read_complete(shared_ptr<State> state) {
    auto state->headers = parse_headers(state->response_buffer);
    auto state->body = parse_body(state->response_buffer);

    post(io_executor, bind(process, state));
}

void process(shared_ptr<State>) {
    /* ... */
}

Имплементации

Event driven (с callback функции)

Недостатъци:

Имплементации

Coroutines

Имплементации

Coroutines

Имплементации

Coroutines

Асинхронно програмиране в Rust

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    println!("Listening on: {}", addr);

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                let n = socket
                    .read(&mut buf)
                    .await
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                socket
                    .write_all(&buf[0..n])
                    .await
                    .expect("failed to write data to socket");
            }
        });
    }
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box> {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    println!("Listening on: {}", addr);

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                let n = socket
                    .read(&mut buf)
                    .await
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                socket
                    .write_all(&buf[0..n])
                    .await
                    .expect("failed to write data to socket");
            }
        });
    }
}

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Библиотечни функции:

Изпълними програми:

Async/.await в Rust

async функции

1 2 3 4
// връща анонимен тип, който имплементира trait-а `Future<Output = u8>`
async fn five() -> u8 {
    5
}
#![allow(dead_code)]
// връща анонимен тип, който имплементира trait-а `Future`
async fn five() -> u8 {
    5
}
fn main() {}

Async/.await в Rust

async блокове

1 2 3 4 5 6 7 8 9
use std::future::Future;

fn ten() -> impl Future<Output = u8> {
    // връща анонимен тип, който имплементира trait-а `Future<Output = u8>`
    async {
        let x: u8 = five().await;
        x + 5
    }
}
#![allow(dead_code)]
use std::future::Future;

fn ten() -> impl Future {
    // връща анонимен тип, който имплементира trait-а `Future`
    async {
        let x: u8 = five().await;
        x + 5
    }
}
async fn five() -> u8 { 5 }
fn main() {}

Async/.await в Rust

.await

1 2 3 4 5 6 7 8 9 10 11
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ten().await;
}
error[E0728]: `await` is only allowed inside `async` functions and blocks --> src/bin/main_c247aff303b227bd4f1fa2405d3f7ad357520e11.rs:10:19 | 9 | fn main() { | --------- this is not `async` 10 | let x = ten().await; | ^^^^^ only allowed inside `async` functions and blocks For more information about this error, try `rustc --explain E0728`. error: could not compile `rust` (bin "main_c247aff303b227bd4f1fa2405d3f7ad357520e11") due to 1 previous error
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ten().await;
}

Async/.await в Rust

.await

1 2 3 4 5 6 7 8 9 10 11 12 13
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ::futures::executor::block_on(async {
        ten().await
    });
}
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ::futures::executor::block_on(async {
        ten().await
    });
}

Trait Future

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Trait Future

1 2 3 4 5
async fn foo() {
    println!("foo");
}

let foo_future = foo();
#![allow(unused_variables)]
fn main() {
async fn foo() {
    println!("foo");
}

let foo_future = foo();
}

Изпълнение на future

Future може да се изпълни

Изпълнение на future

Async функцията връща анонимна структура, която имплементира трейта Future.
Тази структура е enum, който съдържа всички възможни междинни състояния.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
fn main() {
    let fut1 = async_func1();
    ::futures::executor::block_on(fut1);
}

async fn async_func1() -> i32 {
    let fut2 = another_async_func();
    let x = fut2.await;

    let y = regular_func();

    let fut3 = make_call_to_db();
    let z = fut3.await;

    x + y + z
}

fn main() {
    let fut1 = async_func1();
    ::futures::executor::block_on(fut1);
}

async fn async_func1() -> i32 {
    let fut2 = another_async_func();
    let x = fut2.await;

    let y = regular_func();

    let fut3 = make_call_to_db();
    let z = fut3.await;

    x + y + z
}
async fn another_async_func() -> i32 { 0 }
async fn make_call_to_db() -> i32 { 0 }
fn regular_func() -> i32 { 0 }

1 2 3 4 5 6
enum Fut1 {
    Init,
    AtAwait1 { fut2: Fut2 },
    AtAwait2 { x: i32, y: i32, fut3: Fut3 },
    Done,
}

Изпълнение на future

Futures извършват прогрес, когато някой executor им извика poll

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
enum Fut1 {
    Init,
    AtAwait1 { fut2: Fut2 },
    AtAwait2 { x: i32, y: i32, fut3: Fut3 },
    Done,
}

impl Future for Fut1 {
    type Output = i32;

    // Една възможна имплементация
    // Все още игнорираме `Pin` и `Context`
    fn poll(self: &mut Self) -> Poll<Self::Output> {
        loop {

            match std::mem::replace(self, Fut1::Done) {
                Fut1::Init => {
                    let fut2 = another_async_func();
                    *self = Fut1::AtAwait1 { fut2 };
                }
                Fut1::AtAwait1 { mut fut2 } => {
                    match fut2.poll() {
                        Poll::Ready(res) => {
                            let x = res;
                            let y = regular_function();
                            let fut3 = make_call_to_db();

                            *self = Fut1::AtAwait2 { x, y, fut3 };
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait1 { fut2 };
                            return Poll::Pending;
                        }
                    }
                },
                Fut1::AtAwait2 { x, y, mut fut3 } => {
                    match fut3.poll() {
                        Poll::Ready(res) => {
                            let z = res;
                            return Poll::Ready(x + y + z);
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait2 { x, y, fut3 };
                            return Poll::Pending;
                        }
                    }
                }
                Fut1::Done => {
                    panic!("`poll` called on a finished future");
                }
            }

        }
    }
}
trait Future {
type Output;
fn poll(self: &mut Self) -> Poll;
}
enum Poll { Ready(T), Pending }

enum Fut1 {
    Init,
    AtAwait1 { fut2: Fut2 },
    AtAwait2 { x: i32, y: i32, fut3: Fut3 },
    Done,
}
struct Fut2;
impl Future for Fut2 { type Output = i32; fn poll(&mut self) -> Poll { todo!(); } }
struct Fut3;
impl Future for Fut3 { type Output = i32; fn poll(&mut self) -> Poll { todo!(); } }
fn regular_function() -> i32 { 0 }
fn another_async_func() -> Fut2 { todo!() }
fn make_call_to_db() -> Fut3 { todo!() }

impl Future for Fut1 {
    type Output = i32;

    // Една възможна имплементация
    // Все още игнорираме `Pin` и `Context`
    fn poll(self: &mut Self) -> Poll {
        loop {

            match std::mem::replace(self, Fut1::Done) {
                Fut1::Init => {
                    let fut2 = another_async_func();
                    *self = Fut1::AtAwait1 { fut2 };
                }
                Fut1::AtAwait1 { mut fut2 } => {
                    match fut2.poll() {
                        Poll::Ready(res) => {
                            let x = res;
                            let y = regular_function();
                            let fut3 = make_call_to_db();

                            *self = Fut1::AtAwait2 { x, y, fut3 };
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait1 { fut2 };
                            return Poll::Pending;
                        }
                    }
                },
                Fut1::AtAwait2 { x, y, mut fut3 } => {
                    match fut3.poll() {
                        Poll::Ready(res) => {
                            let z = res;
                            return Poll::Ready(x + y + z);
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait2 { x, y, fut3 };
                            return Poll::Pending;
                        }
                    }
                }
                Fut1::Done => {
                    panic!("`poll` called on a finished future");
                }
            }

        }
    }
}
fn main() {}

Executors

Executors

Executors

Tokio

Tokio имплементира цялата машинария за изпълнение на future-и:

Tokio

Нормално главната функция на програмата не може да е async.
Tokio предоставя макрос, който скрива това

1 2 3
#[tokio::main]
async fn main() {
}
#[tokio::main]
async fn main() {
}

Tokio

Не е нужно да се използва макроса, същото може да се постигне и на ръка.
Този начин също така позволява настройване на библиотеката

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
use tokio::runtime::Runtime;

fn main() {
    let runtime = Runtime::new().unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /* ... */
}
use tokio::runtime::Runtime;

fn main() {
    let runtime = Runtime::new().unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box> {
    /* ... */
Ok(())
}

Tokio

По подразбиране tokio стартира многонишков runtime.
Това в много случаи не е необходимо - tokio ни дава concurrency, много често не ни е нужен и паралелизъм. В такива случаи можем да настроим tokio да използва еднонишков runtime.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_current_thread()
        .build()
        .unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /* ... */
}
use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_current_thread()
        .build()
        .unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box> {
    /* ... */
Ok(())
}

Executors

Executors

Executors

Executors

Executors

Executors

Съвместимост

Executors

Съвместимост

Executors

Съвместимост

Executors

Съвместимост

Futures екосистемата

Futures екосистемата

Futures екосистемата

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Задържане на променливи между await състояния

1 2 3 4 5 6 7 8 9 10 11 12 13 14
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тук
    let val1 = some_async_op(&big_vec).await;

    let val2 = other_async_op().await;

    val1 + val2

    // Деструктора на `big_vec` се извиква тук, в края на scope-а.
    // Това означава, че променливата трябва да се пази жива през цялото време
    // докато future-а е жив - ненужно заемане на памет
}
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тук
    let val1 = some_async_op(&big_vec).await;

    let val2 = other_async_op().await;

    val1 + val2

    // Деструктора на `big_vec` се извиква тук, в края на scope-а.
    // Това означава, че променливата трябва да се пази жива през цялото време
    // докато future-а е жив - ненужно заемане на памет
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}

Подводни камъни

Задържане на променливи между await състояния

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    // Ръчно деструктираме променливата.
    // Така тя няма да се пази в състоянието на future-а при
    // следващите await точки.
    drop(big_vec);

    let val2 = other_async_op().await;

    val1 + val2
}
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    // Ръчно деструктираме променливата.
    // Така тя няма да се пази в състоянието на future-а при
    // следващите await точки.
    drop(big_vec);

    let val2 = other_async_op().await;

    val1 + val2
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}

Допълнителни материали


Въпроси