Многонишково програмиране

21 ноември 2024

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    thread::spawn(|| {
        // това няма да се принтира, защото програмата
        // ще завърши преди втората нишка да е започнала
        println!("hi from spawned thread");
    });

    println!("hi from main thread");
}
hi from main thread
use std::thread;

fn main() {
    thread::spawn(|| {
        // това няма да се принтира, защото програмата
        // ще завърши преди втората нишка да е започнала
        println!("hi from spawned thread");
    });

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7 8 9 10 11 12
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото главната нишка
        // ни изчаква
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото главната нишка
        // ни изчаква
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

Сигнатурата на std::thread::spawn

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото изчакваме
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото изчакваме
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // very hard computation ...
        42
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}
The answer is Ok(42)
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // very hard computation ...
        42
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}

Нишки

1 2 3 4 5 6 7 8 9 10
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        panic!("too hard computation ...");
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}
thread '<unnamed>' panicked at src/bin/main_480a341d0afaddfcfc405e2dd9b455c8eb27e962.rs:5:9: too hard computation ... note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace The answer is Err(Any { .. })
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        panic!("too hard computation ...");
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}

Panic в нишка

Panic в нишка

Panic в нишка

Panic в нишка

Споделяне на стойности

Споделяне на стойности

Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход …

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход - води до компилационна грешка.

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` (bin "main_44c598e1dd6669cdea1e15fcdbf808719a029646") due to 1 previous error
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn thread1() {
    println!("thread 1 started");

    thread::spawn(|| {
        println!("thread 2 started");
        thread::sleep(std::time::Duration::from_millis(1));
        println!("thread 2 will exit");
    });

    println!("thread 1 will exit");
}

fn main() {
    let _ = thread::spawn(thread1).join();
    println!("thread 1 exited");

    thread::sleep(std::time::Duration::from_millis(100));
}
thread 1 started thread 1 will exit thread 2 started thread 1 exited thread 2 will exit
use std::thread;
fn thread1() {
    println!("thread 1 started");

    thread::spawn(|| {
        println!("thread 2 started");
        thread::sleep(std::time::Duration::from_millis(1));
        println!("thread 2 will exit");
    });

    println!("thread 1 will exit");
}

fn main() {
    let _ = thread::spawn(thread1).join();
    println!("thread 1 exited");

    thread::sleep(std::time::Duration::from_millis(100));
}

Споделяне на стойности

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Споделяне на стойности

Ако използваме стойността само от новата нишка, можем да я преместим с move closure

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
number 0 number 1 number 2 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Но това не би работило ако имаме повече от една нишка

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(move || {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0382]: use of moved value: `nums` --> src/bin/main_fb665c10ea221cdd41e075bfd1468b11553fe02f.rs:8:36 | 4 | let nums = vec![0, 1, 2, 3]; | ---- move occurs because `nums` has type `Vec<i32>`, which does not implement the `Copy` trait ... 7 | for _ in 0..2 { | ------------- inside of this loop 8 | handles.push(thread::spawn(move || { | ^^^^^^^ value moved into closure here, in previous iteration of loop 9 | for i in &nums { | ---- use occurs due to use in closure For more information about this error, try `rustc --explain E0382`. error: could not compile `rust` (bin "main_fb665c10ea221cdd41e075bfd1468b11553fe02f") due to 1 previous error
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(move || {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Scoped threads

Един вариант е да използваме scoped threads API-то

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    thread::scope(|s| {
        for _ in 0..2 {
            s.spawn(|| {
                for i in &nums {
                    println!("number {}", i);
                }
            });
        }
    });
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    thread::scope(|s| {
        for _ in 0..2 {
            s.spawn(|| {
                for i in &nums {
                    println!("number {}", i);
                }
            });
        }
    });
}

Scoped threads

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
thread::scope(|s /*: thread::Scope<'_, '_> */| {
    // тази функция се изпълнява в същата нишка

    for _ in 0..2 {
        // Scope::spawn създава нова нишка
        // Новата нишка може да държи референции към локални променливи
        s.spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        });
    }

    // на края на функцията всички нишки създадени чрез Scope::spawn
    // се join-ват.
});
use std::thread;
fn main() {
let nums = vec![1, 2, 3];
thread::scope(|s /*: thread::Scope<'_, '_> */| {
    // тази функция се изпълнява в същата нишка

    for _ in 0..2 {
        // Scope::spawn създава нова нишка
        // Новата нишка може да държи референции към локални променливи
        s.spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        });
    }

    // на края на функцията всички нишки създадени чрез Scope::spawn
    // се join-ват.
});
}

Scoped threads

В сигнатурата на Scope::spawn ограничението е F: 'scope, а не F: 'static

1 2 3 4 5 6 7
impl<'scope, 'env> Scope<'scope, 'env> {
    pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
    where
        F: FnOnce() -> T + Send + 'scope,
        T: Send + 'scope,
    { /* ... */ }
}

Споделяне на стойности

Друг вариант е да използваме нещо, което:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
fn main() {
    // TODO: какво да добавим тук?
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_9cb04f9a01531d4cd9e0ce9932649a0da8c1a35c.rs:9:36 | 9 | handles.push(thread::spawn(|| { | ^^ may outlive borrowed value `nums` 10 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_9cb04f9a01531d4cd9e0ce9932649a0da8c1a35c.rs:9:22 | 9 | handles.push(thread::spawn(|| { | ______________________^ 10 | | for i in &nums { 11 | | println!("number {}", i); 12 | | } 13 | | })); | |__________^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 9 | handles.push(thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` (bin "main_9cb04f9a01531d4cd9e0ce9932649a0da8c1a35c") due to 1 previous error
use std::thread;

fn main() {
    // TODO: какво да добавим тук?
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Rc

Rc позволява "споделена собственост" (shared ownership).
Това дали ще проработи?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Rc

Rc позволява "споделена собственост" (shared ownership).
Това дали ще проработи? - не

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ------------- ^------ | | | | ______________________|_____________within this `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}` | | | | | required by a bound introduced by this call 12 | | for i in &*nums_rc { 13 | | println!("number {}", i); 14 | | } 15 | | })); | |_________^ `Rc<Vec<i32>>` cannot be sent between threads safely | = help: within `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Vec<i32>>`, which is required by `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}: Send` note: required because it's used within this closure --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ^^^^^^^ note: required by a bound in `spawn` --> /rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/std/src/thread/mod.rs:672:1 For more information about this error, try `rustc --explain E0277`. error: could not compile `rust` (bin "main_1c4b38ddc008640338e13791daaf3eccafcab0dc") due to 1 previous error
use std::thread;
use std::rc::Rc;

fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Arc

Трябва да използваме Arc.
.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_arc = Arc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_arc = Arc::clone(&nums_arc);
        handles.push(thread::spawn(move || {
            for i in &*nums_arc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;
use std::sync::Arc;

fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_arc = Arc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_arc = Arc::clone(&nums_arc);
        handles.push(thread::spawn(move || {
            for i in &*nums_arc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Send и Sync

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Send и Sync

Трейтовете std::marker::Send и std::marker::Sync показват дали един тип е thread safe.
Т.е. дали обекти от този тип могат да се използват безопасно в многонишков контекст.

1 2
pub unsafe auto trait Send { }
pub unsafe auto trait Sync { }

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Аuto traits

1
pub struct Token(u32);
pub struct Token(u32);
fn main() {}

Auto trait docs

Send и Sync

Unsafe traits

1 2 3 4
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {}
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

Хак за stable

1 2 3
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {}
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Примитиви за синхронизация

Примитиви за синхронизация

Стандартния пример за грешен многонишков алгоритъм не се компилира

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;

let t1 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[0..50] { *sum += i; })
};

let t2 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[51..100] { *sum += i; })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
error[E0597]: `sum` does not live long enough --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:9:15 | 5 | let mut sum = 0; | ------- binding `sum` declared here ... 9 | let sum = &mut sum; | ^^^^^^^^ borrowed value does not live long enough 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 22 | } | - `sum` dropped here while still borrowed error[E0499]: cannot borrow `sum` as mutable more than once at a time --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:15:15 | 9 | let sum = &mut sum; | -------- first mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 15 | let sum = &mut sum; | ^^^^^^^^ second mutable borrow occurs here error[E0502]: cannot borrow `sum` as immutable because it is also borrowed as mutable --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:21:21 | 9 | let sum = &mut sum; | -------- mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 21 | println!("sum: {}", sum); | ^^^ immutable borrow occurs here | = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info) Some errors have detailed explanations: E0499, E0502, E0597. For more information about an error, try `rustc --explain E0499`. error: could not compile `rust` (bin "main_92ae06cbac3bb0b057de58f0494c471228f42c4d") due to 3 previous errors
use std::sync::Arc;
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let mut sum = 0;

let t1 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[0..50] { *sum += i; })
};

let t2 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[51..100] { *sum += i; })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
}

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Модула std::sync

Mutex

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock()` връща "умен указател" с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock()` връща "умен указател" с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Мutex

Обикновенно мутекса се възприема като примитива която определя критична секция

1 2 3 4 5 6 7
lock(my_mutex);
// начало на критичната секция

do_stuff(shared_data);

// край на критичната секция
unlock(my_mutex);

В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex е generic и опакова данните.

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

RwLock

RwLock

RwLock

RwLock

Mutex или RwLock

Mutex или RwLock

Mutex или RwLock

Arc + Mutex

Подобно на Rc<RefCell<T>>, може често да виждате Arc<Mutex<T>> или Arc<RwLock<T>>

Arc + Mutex

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
let v = Arc::new((0..100).collect::<Vec<_>>());
let total_sum = Arc::new(Mutex::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", *total_sum.lock().unwrap());
sum: 4950
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let total_sum = Arc::new(Mutex::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", *total_sum.lock().unwrap());
}

Arc + Mutex

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let v = (0..100).collect::<Vec<_>>();
let mut total_sum = Mutex::new(0);

thread::scope(|s| {
    s.spawn(|| {
        let local_sum = v[..50].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    });

    s.spawn(|| {
        let local_sum = v[50..].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    });
});

println!("sum: {}", *total_sum.get_mut().unwrap());
sum: 4950
use std::thread;
use std::sync::Mutex;
fn main() {
let v = (0..100).collect::>();
let mut total_sum = Mutex::new(0);

thread::scope(|s| {
    s.spawn(|| {
        let local_sum = v[..50].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    });

    s.spawn(|| {
        let local_sum = v[50..].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    });
});

println!("sum: {}", *total_sum.get_mut().unwrap());
}

Други примитиви

Атомарни числа

1 2 3 4 5 6 7 8 9 10 11 12 13
let mut num = 10;

// thread 1                 // thread 2
num += 5;                   num += 5;

// =============================================

let reg = load(&num);
let reg = reg + 5;          let reg = load(&num);
                            let reg = reg + 5
                            store(&mut num, reg);
store(&mut num, reg);       /* num = 15 */
/* num = 15 */

Атомарни числа

1 2 3 4 5 6 7 8
// псевдокод
let num = 10;

// thread 1                 // thread 2
fetch_add(&num, 5);         fetch_add(&num, 5);

load(&num);                 load(&num);
/* num = 20 */              /* num = 20 */

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарните числа могат да се модифицират през споделена референция

1 2 3 4 5 6 7 8
use std::sync::atomic::{AtomicI32, Ordering};

let num = AtomicI32::new(10);   // няма `mut`

num.fetch_add(5, Ordering::SeqCst);
num.fetch_add(5, Ordering::SeqCst);

println!("{}", num.load(Ordering::SeqCst));
20
fn main() {
use std::sync::atomic::{AtomicI32, Ordering};

let num = AtomicI32::new(10);   // няма `mut`

num.fetch_add(5, Ordering::SeqCst);
num.fetch_add(5, Ordering::SeqCst);

println!("{}", num.load(Ordering::SeqCst));
}

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};

let v = Arc::new((0..100).collect::<Vec<_>>());
let total_sum = Arc::new(AtomicI32::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::<i32>();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::<i32>();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", total_sum.load(Ordering::SeqCst));
sum: 4950
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;

fn main() {
let v = Arc::new((0..100).collect::>());
let total_sum = Arc::new(AtomicI32::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", total_sum.load(Ordering::SeqCst));
}

Атомарни числа

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//
use std::sync::atomic::{AtomicI32, Ordering};

let v = (0..100).collect::<Vec<_>>();
let total_sum = AtomicI32::new(0);

thread::scope(|scope| {
    scope.spawn(|| {
        let local_sum = v[..50].iter().sum::<i32>();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    });

    scope.spawn(|| {
        let local_sum = v[50..].iter().sum::<i32>();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    });
});

println!("sum: {}", total_sum.load(Ordering::SeqCst));
sum: 4950
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;

fn main() {
let v = (0..100).collect::>();
let total_sum = AtomicI32::new(0);

thread::scope(|scope| {
    scope.spawn(|| {
        let local_sum = v[..50].iter().sum::();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    });

    scope.spawn(|| {
        let local_sum = v[50..].iter().sum::();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    });
});

println!("sum: {}", total_sum.load(Ordering::SeqCst));
}

Атомарни числа

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

let should_stop = Arc::new(AtomicBool::new(false));

let t1 = thread::spawn({
    let should_stop = Arc::clone(&should_stop);
    move || {
        while !should_stop.load(Ordering::SeqCst) {
            println!("running");
            thread::sleep(Duration::from_millis(100));
        }
    }
});

thread::sleep(Duration::from_millis(300));
should_stop.store(true, Ordering::SeqCst);

let _ = t1.join();
running running running
fn main() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread;

let should_stop = Arc::new(AtomicBool::new(false));

let t1 = thread::spawn({
    let should_stop = Arc::clone(&should_stop);
    move || {
        while !should_stop.load(Ordering::SeqCst) {
            println!("running");
            thread::sleep(Duration::from_millis(100));
        }
    }
});

thread::sleep(Duration::from_millis(300));
should_stop.store(true, Ordering::SeqCst);

let _ = t1.join();
}

Канали

MPSC

Канали

Don't communicate by sharing memory,
share memory by communicating

Канали в стандартната библиотека

1 2 3 4 5 6 7 8 9 10 11 12
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

тип метод резултат грешки
Sender send(T) Result<(), SendError<T>> disconnected
Receiver recv() Result<T, RecvError> disconnected
Receiver try_recv() Result<T, TryRecvError> Empty, Disconnected
Receiver recv_timeout(Duration) Result<T, RecvTimeoutError> Timeout, Disconnected

Типове канали

Неограничен канал

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
}

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Ограничен канал

тип метод резултат грешки
SyncSender send(T) Result<(), SendError<T>> disconnected
SyncSender try_send(T) Result<(), TrySendError<T>> Full, Disconnected
Receiver recv() Result<T, RecvError> disconnected
Receiver try_recv() Result<T, TryRecvError> Empty, Disconnected
Receiver recv_timeout(Duration) Result<T, RecvTimeoutError> Timeout, Disconnected

Типове канали

Ограничен канал

1 2 3 4 5 6 7 8 9 10 11 12
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}

Sender

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem;
use std::sync::mpsc::{self, SendError};
fn main() {
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
}

SyncSender

Методи

1 2 3 4 5 6 7 8
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem;
use std::sync::mpsc::{self, TrySendError};
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
}

Множество изпращачи

Изпращащата част може да се клонира

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
1 2 3 4
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
}

Множество получатели

Множество получатели

Множество получатели

Множество получатели

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}
}

Receiver

Итератори

1 2 3 4 5 6 7 8 9 10 11 12 13 14
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
    println!("received {}", msg);
}
}

Receiver

Итератори

1 2 3 4 5 6 7 8 9 10 11 12 13
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
    println!("received {}", msg);
}
}

Crossbeam channel

Crossbeam channel

Разлики в API-то

std::sync::mpsc crossbeam_channel
вид MPSC MPMC
неограничен channel() unbounded()
неограничен - типове (Sender, Receiver) (Sender, Receiver)
ограничен sync_channel(k) bounded(k)
ограничен - типове (SyncSender, Receiver) (Sender, Receiver)

Външни библиотеки

Crossbeam

Външни библиотеки

Parking lot

Въпроси