Rayon
26 ноември 2024
Някои предварителни бележки
Оптимизация на ниво таргетирана архитектура
Някои предварителни бележки
Оптимизация на ниво таргетирана архитектура
rustc --print target-cpus
Някои предварителни бележки
Оптимизация на ниво таргетирана архитектура
rustc --print target-cpus
Наличните групи инструкции могат да бъдат показани с
rustc --print=cfg -C target-cpu=native
Тук native може да бъде всяко target-cpu
Някои предварителни бележки
Оптимизация на ниво таргетирана архитектура
rustc --print target-cpus
Наличните групи инструкции могат да бъдат показани с
rustc --print=cfg -C target-cpu=native
Тук native може да бъде всяко target-cpu
Лесен начин за оптимизация е да компилирате/ползвате
RUST_FLAGS="-C target-cpu=native"
Някои предварителни бележки
Един пример с rust.godbolt.org
1
2
3
4
#[no_mangle]
fn ones(num: i32) -> u32 {
num.count_ones()
}
Някои предварителни бележки
Един пример с rust.godbolt.org
1
2
3
4
#[no_mangle]
fn ones(num: i32) -> u32 {
num.count_ones()
}
- Компилира се до 1 popcnt инструкция, ако SSE4 е в таргетираната архитектура
- Използвайте
-C opt-level=3 -C target-cpu=native
- Компилира се до 18 инструкции в архитектурата по подразбиране
- И в двата случая се ползва същия LLVM модел
Rayon
Въведение
Rayon
Въведение
- Библиотека за паралелна обработка на данни която лесно може да замени последователните операции с паралелни
Rayon
Въведение
- Библиотека за паралелна обработка на данни която лесно може да замени последователните операции с паралелни
- Rayon лесно и с минимални усилия въвежда паралелизма в кода
Rayon
Въведение
- Библиотека за паралелна обработка на данни която лесно може да замени последователните операции с паралелни
- Rayon лесно и с минимални усилия въвежда паралелизма в кода
- Гарантира свободно от
data race
конфликти изпълнение на кода и прилага паралелизма вrun-time
, когато има смисъл.
Rayon
Как да го ползваме?
Rayon
Как да го ползваме?
- Чрез паралелни итератори (
ParallelIterator
,IndexedParallelIterator
) trait-ове
Rayon
Как да го ползваме?
- Чрез паралелни итератори (
ParallelIterator
,IndexedParallelIterator
) trait-ове - Чрез
par_sort
за сортиране на&mut [T]
или вектори
Rayon
Как да го ползваме?
- Чрез паралелни итератори (
ParallelIterator
,IndexedParallelIterator
) trait-ове - Чрез
par_sort
за сортиране на&mut [T]
или вектори - Чрез
join
за да се раздели таск на две части
Rayon
Как да го ползваме?
- Чрез паралелни итератори (
ParallelIterator
,IndexedParallelIterator
) trait-ове - Чрез
par_sort
за сортиране на&mut [T]
или вектори - Чрез
join
за да се раздели таск на две части - Чрез
scope
за да се създаде scope където може да се създадат произволен брой нишки
Rayon
Как да го ползваме?
- Чрез паралелни итератори (
ParallelIterator
,IndexedParallelIterator
) trait-ове - Чрез
par_sort
за сортиране на&mut [T]
или вектори - Чрез
join
за да се раздели таск на две части - Чрез
scope
за да се създаде scope където може да се създадат произволен брой нишки ThreadPoolBuilder
може да бъде използван да създадете свой thread pool-ове или да промените глобалния.
Rayon
Rayon пример
1
2
3
4
5
6
7
8
use rayon::prelude::*;
fn main() {
(1..10).into_par_iter()
.for_each(|p| {
println!("Executing {:}", p);
});
}
Executing 1 Executing 6 Executing 9 Executing 5 Executing 7 Executing 3 Executing 2 Executing 8 Executing 4
extern crate rayon; use rayon::prelude::*; fn main() { (1..10).into_par_iter() .for_each(|p| { println!("Executing {:}", p); }); }
Rayon
Rayon пример с mutability
1
2
3
4
5
6
7
use rayon::prelude::*;
fn main() {
let mut arr = [0, 7, 9, 11];
arr.par_iter_mut().for_each(|p| *p -= 1);
println!("{:?}", arr);
}
[-1, 6, 8, 10]
extern crate rayon; use rayon::prelude::*; fn main() { let mut arr = [0, 7, 9, 11]; arr.par_iter_mut().for_each(|p| *p -= 1); println!("{:?}", arr); }
Rayon
Rayon пример с each и any
1
2
3
4
5
6
7
8
9
10
use rayon::prelude::*;
fn main() {
let vec = vec![2, 4, 6, 8];
assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
assert!(vec.par_iter().all(|n| (*n % 2) == 0));
assert!(!vec.par_iter().any(|n| *n > 8 ));
assert!(vec.par_iter().all(|n| *n <= 8 ));
}
extern crate rayon; use rayon::prelude::*; fn main() { let vec = vec![2, 4, 6, 8]; assert!(!vec.par_iter().any(|n| (*n % 2) != 0)); assert!(vec.par_iter().all(|n| (*n % 2) == 0)); assert!(!vec.par_iter().any(|n| *n > 8 )); assert!(vec.par_iter().all(|n| *n <= 8 )); }
Rayon
За да ползвате паралелните итератори в Rayon
Обикновено всичко което трябва да направите е:
Rayon
За да ползвате паралелните итератори в Rayon
Обикновено всичко което трябва да направите е:
- Да замените
iter()
сpar_iter()
Rayon
За да ползвате паралелните итератори в Rayon
Обикновено всичко което трябва да направите е:
- Да замените
iter()
сpar_iter()
- Да замените
iter_mut()
сpar_iter_mut()
Rayon
За да ползвате паралелните итератори в Rayon
Обикновено всичко което трябва да направите е:
- Да замените
iter()
сpar_iter()
- Да замените
iter_mut()
сpar_iter_mut()
- Да замените
into_iter()
сinto_par_iter()
Rayon
За да ползвате паралелните итератори в Rayon
Обикновено всичко което трябва да направите е:
- Да замените
iter()
сpar_iter()
- Да замените
iter_mut()
сpar_iter_mut()
- Да замените
into_iter()
сinto_par_iter()
- но …
Rayon
За да ползвате паралелните итератори в Rayon
Обикновено всичко което трябва да направите е:
- Да замените
iter()
сpar_iter()
- Да замените
iter_mut()
сpar_iter_mut()
- Да замените
into_iter()
сinto_par_iter()
- но …
- не винаги
Rayon
Да разгледаме типичната грешка
1
2
3
4
5
6
7
8
9
use rayon::prelude::*;
fn main() {
let mut total = 0;
(1..10).into_par_iter()
.for_each(|p| {
total += p;
});
}
error[E0594]: cannot assign to `total`, as it is a captured variable in a `Fn` closure --> src/bin/main_530cd1344e9a9c93a61a57075d2aed005ad3578e.rs:8:13 | 8 | total += p; | ^^^^^^^^^^ cannot assign For more information about this error, try `rustc --explain E0594`. error: could not compile `rust` (bin "main_530cd1344e9a9c93a61a57075d2aed005ad3578e") due to 1 previous error
extern crate rayon; use rayon::prelude::*; fn main() { let mut total = 0; (1..10).into_par_iter() .for_each(|p| { total += p; }); }
Rayon
Как е дефиниран паралелния итератор
1
2
3
4
5
6
7
pub trait ParallelIterator: Sized + Send {
type Item: Send;
//...
fn for_each<OP>(self, op: OP)
where OP: Fn(Self::Item) + Sync + Send,
//...
}
1
Rayon
Да разгледаме типичната грешка - едно решение
1
2
3
4
5
6
7
8
9
10
use rayon::prelude::*;
use std::sync::atomic::{AtomicI32, Ordering};
fn main() {
let mut total = AtomicI32::new(0);
(1..10).into_par_iter()
.for_each(|p| {
total.fetch_add(p, Ordering::SeqCst);
});
}
extern crate rayon; use rayon::prelude::*; use std::sync::atomic::{AtomicI32, Ordering}; fn main() { let mut total = AtomicI32::new(0); (1..10).into_par_iter() .for_each(|p| { total.fetch_add(p, Ordering::SeqCst); }); }
Rayon
Решение чрез fold
1
2
3
4
5
6
7
8
9
10
use rayon::prelude::*;
fn main() {
let numbers = 0..=100;
let sum = numbers.into_par_iter()
.fold(|| 0, |a, b| a + b)
.sum::<i32>();
assert_eq!(sum, 5050);
}
extern crate rayon; use rayon::prelude::*; fn main() { let numbers = 0..=100; let sum = numbers.into_par_iter() .fold(|| 0, |a, b| a + b) .sum::(); assert_eq!(sum, 5050); }
Rayon
Решение чрез reduce
- Връща една стойност използвайки
op
Rayon
Решение чрез reduce
- Връща една стойност използвайки
op
1
2
3
4
5
6
7
8
9
use rayon::prelude::*;
fn main() {
let numbers = 0..=100;
let sum = numbers.into_par_iter()
.reduce(|| 0, |a, b| (a + b));
assert_eq!(sum, 5050);
}
extern crate rayon; use rayon::prelude::*; fn main() { let numbers = 0..=100; let sum = numbers.into_par_iter() .reduce(|| 0, |a, b| (a + b)); assert_eq!(sum, 5050); }
Rayon
Решение чрез try_reduce с проверка за overflow
1
2
3
4
5
6
7
8
9
10
// Compute the sum of squares, being careful about overflow.
fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> {
iter
.into_par_iter()
.map(|i| i.checked_mul(i)) // square each item,
.try_reduce(|| 0, i32::checked_add) // and add them up!
}
assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16));
assert_eq!(sum_squares(0..10_000), None);
assert_eq!(sum_squares(1_000_000..1_000_001), None);
Rayon
Паралелно сортиране
- Сложност:
O(n * log(n))
в най-лошия случай - Някои примери:
Rayon
Паралелно сортиране
- Сложност:
O(n * log(n))
в най-лошия случай - Някои примери:
1
2
3
4
5
6
7
8
9
10
let mut v:[i32; 5] = [-5, 4, 1, -3, 2];
v.par_sort();
assert_eq!(v, [-5, -3, 1, 2, 4]);
v.par_sort_by(|a, b| b.cmp(a));
assert_eq!(v, [4, 2, 1, -3, -5]);
v.par_sort_by_key(|k| k.abs());
assert_eq!(v, [1, 2, -3, 4, -5]);
Rayon
Паралелните операции трябва да се ползват разумно
- par_chunks групира елементите на групи с равна дължина
- par_rchuncks ги групира стартирайки от дясно на ляво
Rayon
Паралелните операции трябва да се ползват разумно
- par_chunks групира елементите на групи с равна дължина
- par_rchuncks ги групира стартирайки от дясно на ляво
1
2
let chunks: Vec<_> = [1, 2, 3, 4, 5, 6].par_chunks(3).collect();
assert_eq!(chunks, vec![&[1, 2, 3], &[4, 5, 6]]);
Rayon
Едно сравнение
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fn calc_something(i: i32) -> i32 { i & (i >> 1) }
#[inline(always)]
fn method_rayon_no_chunks(contents: &[i32]) -> i32 {
contents.
par_iter().
map(|i| calc_something(*i)).
sum()
}
#[inline(always)]
fn sum_all(contents: &[i32]) -> i32 {
contents.
iter().
map(|i| calc_something(*i)).
sum()
}
#[inline(always)]
fn method_rayon_chunks(contents: &[i32]) -> i32 {
contents.
par_chunks(12).
map(|chunk| {sum_all(chunk)}).
sum()
}
Rayon
Други функционалности - try_for_each_with
1
2
3
4
5
6
7
8
9
10
11
12
use std::sync::mpsc::channel;
use rayon::prelude::*;
let (sender, receiver) = channel();
(0..5).into_par_iter()
.try_for_each_with(sender, |s, x| s.send(x))
.expect("expected no send errors");
let mut res: Vec<_> = receiver.iter().collect();
res.sort();
Rayon
Други функционалности - inspect
1
2
3
4
5
6
7
8
let a = [1, 4, 2, 3];
let sum = a.par_iter()
.cloned()
.inspect(|x| println!("about to filter: {}", x))
.filter(|&x| x % 2 == 0)
.inspect(|x| println!("made it through filter: {}", x))
.reduce(|| 0, |sum, i| sum + i);
Rayon
Създаване на паралелен итератор чрез par_bridge
- Паралелен итератор който обгръща последователен итератор
Rayon
Създаване на паралелен итератор чрез par_bridge
- Паралелен итератор който обгръща последователен итератор
- Когато не можем да създадем итератор чрез
into_par_iter
илиpar_iter
Rayon
Създаване на паралелен итератор чрез par_bridge
- Паралелен итератор който обгръща последователен итератор
- Когато не можем да създадем итератор чрез
into_par_iter
илиpar_iter
- По-бавен
Rayon
Създаване на паралелен итератор чрез par_bridge
- Паралелен итератор който обгръща последователен итератор
- Когато не можем да създадем итератор чрез
into_par_iter
илиpar_iter
- По-бавен
- Приложим за
networking
Rayon
Създаване на паралелен итератор чрез par_bridge
- Паралелен итератор който обгръща последователен итератор
- Когато не можем да създадем итератор чрез
into_par_iter
илиpar_iter
- По-бавен
- Приложим за
networking
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;
fn main() {
let rx = {
let (tx, rx) = channel();
tx.send("one!").unwrap();
tx.send("two!").unwrap();
tx.send("three!").unwrap();
rx
};
let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
output.sort();
assert_eq!(&*output, &["one!", "three!", "two!"]);
}
extern crate rayon; use rayon::iter::ParallelBridge; use rayon::prelude::ParallelIterator; use std::sync::mpsc::channel; fn main() { let rx = { let (tx, rx) = channel(); tx.send("one!").unwrap(); tx.send("two!").unwrap(); tx.send("three!").unwrap(); rx }; let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); output.sort(); assert_eq!(&*output, &["one!", "three!", "two!"]); }
Rayon
rayon::join
Rayon
rayon::join
- Търси динамична оптимизация
Rayon
rayon::join
- Търси динамична оптимизация
- Работи с
workstealing algorithm
надThreadPool
-овете
Rayon
rayon::join
- Търси динамична оптимизация
- Работи с
workstealing algorithm
надThreadPool
-овете
1
2
3
4
5
6
7
8
9
10
11
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
if v.len() > 1 {
let mid = partition(v);
let (lo, hi) = v.split_at_mut(mid);
rayon::join(|| quick_sort(lo), || quick_sort(hi));
}
}
fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
// see https://en.wikipedia.org/wiki/Quicksort#Lomuto_partition_scheme
}
Rayon
rayon::join - Наивна имплементация на фибоначи
1
2
3
4
5
6
7
fn fib(n: u32) -> u32 {
if n < 2 {
return n;
}
let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2));
a + b
}
Rayon
ThreadPool-s
- Промяна на сетингите на глобалния thread pool
Rayon
ThreadPool-s
- Промяна на сетингите на глобалния thread pool
1
2
3
4
5
6
7
let size = 1024 * 1024;
let number_of_threads = 2;
rayon::ThreadPoolBuilder::new()
.stack_size(size)
.num_threads(number_of_threads)
.build_global()
.unwrap();
extern crate rayon; fn main() { let size = 1024 * 1024; let number_of_threads = 2; rayon::ThreadPoolBuilder::new() .stack_size(size) .num_threads(number_of_threads) .build_global() .unwrap(); }
Rayon
ThreadPool-s
- Можете също да дефинирате свои thread pool-ове, да стартирате нишки в тях и т.н.
Rayon
ThreadPool-s
- Можете също да дефинирате свои thread pool-ове, да стартирате нишки в тях и т.н.
1
2
3
4
5
6
7
8
use std::thread;
let y = 1;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
use std::thread; fn main() { let y = 1; let pool = rayon::ThreadPoolBuilder::new() .num_threads(4) .build() .unwrap(); pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id())); pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id())); }
Rayon
ThreadPool-s
- Можете също да дефинирате свои thread pool-ове, да стартирате нишки в тях и т.н.
1
2
3
4
5
6
7
8
use std::thread;
let y = 1;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
use std::thread; fn main() { let y = 1; let pool = rayon::ThreadPoolBuilder::new() .num_threads(4) .build() .unwrap(); pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id())); pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id())); }
- Как можем да ползваме
y
?
Rayon
scope и spawn
- Един
scope
в Rayon е асоцииран с един ThreadPool
Rayon
scope и spawn
- Един
scope
в Rayon е асоцииран с един ThreadPool scope
завършва след когато всички нишки в него са изпълнени
Rayon
scope и spawn
- Един
scope
в Rayon е асоцииран с един ThreadPool scope
завършва след когато всички нишки в него са изпълнениscope
-овете могат да бъдат вложени
Rayon
scope и spawn
- Един
scope
в Rayon е асоцииран с един ThreadPool scope
завършва след когато всички нишки в него са изпълнениscope
-овете могат да бъдат вложениscope
-овете са по "гъвкави" в сравнение сjoin
, но това си идва за сметка на бързодействието;scope
е алокиран наheap
-a, ajoin
(основно) на стека
Rayon
scope и spawn
- Един
scope
в Rayon е асоцииран с един ThreadPool scope
завършва след когато всички нишки в него са изпълнениscope
-овете могат да бъдат вложениscope
-овете са по "гъвкави" в сравнение сjoin
, но това си идва за сметка на бързодействието;scope
е алокиран наheap
-a, ajoin
(основно) на стека
1
2
3
4
5
pool.scope(move |s| {
s.spawn(move |s| {
println!("Task executes on thread: {} {:?}", y, thread::current().id())
})
});
Rayon
Ресурси
- Rayon
- Rust Cookbook
- Rayon Doc
- How Rust makes Rayon's data parallelism magical
- Rayon: Data Parallelism for Fun and Profit
- Multiple threadpools in Rust
- YouTube: Scientific Computing with Rust