упр.13 задача 1
- Краен срок:
- 21.01.2026 23:59
- Точки:
- 4
Срокът за предаване на решения е отминал
// Include the solution source in the same file, so we
// don't have to worry about item visibility.
// Please don't use `include!` in real code, this is a hack
// around the checking system.
include!{ "../src/lib.rs" }
fn job_from_fn<F, T, E>(f: F) -> impl Job<Output=T, Error=E>
where
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
struct FnJob<F> {
f: F,
}
impl<F, T, E> Job for FnJob<F> where
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
type Output = T;
type Error = E;
fn run(self) -> Result<Self::Output, Self::Error> {
(self.f)()
}
}
FnJob { f }
}
#[test]
fn test_jobs_run_in_parallel() {
with_timeout(
std::time::Duration::from_secs(3),
move || {
let pool = ThreadPool::new(4);
let start = std::time::Instant::now();
let recv1 = pool.submit(job_from_fn::<_, i32, ()>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(1)
}));
let recv2 = pool.submit(job_from_fn::<_, i32, &str>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Err("err2")
}));
let recv3 = pool.submit(job_from_fn::<_, i32, ()>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(3)
}));
let recv4 = pool.submit(job_from_fn::<_, i32, &str>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Err("err4")
}));
assert_eq!(recv1.recv().unwrap(), Ok(1));
assert_eq!(recv2.recv().unwrap(), Err("err2"));
assert_eq!(recv3.recv().unwrap(), Ok(3));
assert_eq!(recv4.recv().unwrap(), Err("err4"));
let elapsed = std::time::Instant::now().duration_since(start);
if elapsed.as_millis() > 800 {
panic!("assertion failed: expected elapsed < 800ms, found elapsed = {:?}", elapsed)
}
},
);
}
#[test]
fn test_submit_queue() {
with_timeout(
std::time::Duration::from_secs(3),
|| {
let pool = ThreadPool::new(2);
let wait_pair = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let (job_started_sender, job_started_receiver) = mpsc::channel();
let make_job = |name| {
let wait_pair = Arc::clone(&wait_pair);
let sender = job_started_sender.clone();
job_from_fn::<_, _, ()>(move || {
sender.send(format!("started {name}")).unwrap();
{
let (mutex, condvar) = &*wait_pair;
let lock = mutex.lock().unwrap();
let _lock = condvar.wait_while(lock, |lock| {*lock != true}).unwrap();
}
Ok(format!("done {name}"))
})
};
let job_result_1 = pool.submit(make_job(1));
let job_result_2 = pool.submit(make_job(2));
let job_result_3 = pool.submit(make_job(3));
let job_result_4 = pool.submit(make_job(4));
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(job_started_receiver.try_recv(), Ok("started 1".to_string()));
assert_eq!(job_started_receiver.try_recv(), Ok("started 2".to_string()));
assert_eq!(job_started_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
*wait_pair.0.lock().unwrap() = true;
wait_pair.1.notify_all();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(job_started_receiver.try_recv(), Ok("started 3".to_string()));
assert_eq!(job_started_receiver.try_recv(), Ok("started 4".to_string()));
assert_eq!(job_started_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
assert_eq!(job_result_1.recv().unwrap(), Ok("done 1".to_string()));
assert_eq!(job_result_2.recv().unwrap(), Ok("done 2".to_string()));
assert_eq!(job_result_3.recv().unwrap(), Ok("done 3".to_string()));
assert_eq!(job_result_4.recv().unwrap(), Ok("done 4".to_string()));
},
);
}
#[test]
fn test_submit_recursive() {
with_timeout(
std::time::Duration::from_secs(3),
move || {
let pool = Arc::new(ThreadPool::new(1));
let pool_clone = Arc::clone(&pool);
let recv1 = pool.submit(job_from_fn::<_, _, ()>(move || {
let recv2 = pool_clone.submit(job_from_fn::<_, _, ()>(|| {
Ok(())
}));
Ok(recv2)
}));
let recv2 = recv1.recv().unwrap().unwrap();
let result = recv2.recv().unwrap();
assert_eq!(result, Ok(()));
},
)
}
#[test]
fn test_scheduling() {
// Проверява, че изпълняването на една дълга задача не блокира
// изпълняването на други задачи, ако има свободни нишки.
with_timeout(
std::time::Duration::from_secs(3),
move || {
let pool = ThreadPool::new(2);
let (wait_token_sender, wait_token_receiver) = mpsc::channel();
let long_job = pool.submit(job_from_fn::<_, &str, ()>(move || {
let _ = wait_token_receiver.recv();
Ok("done")
}));
let short_job_1 = pool.submit(job_from_fn::<_, i32, ()>(|| { Ok(1) }));
let short_job_2 = pool.submit(job_from_fn::<_, i32, ()>(|| { Ok(2) }));
assert_eq!(short_job_1.recv().unwrap(), Ok(1));
assert_eq!(short_job_2.recv().unwrap(), Ok(2));
wait_token_sender.send(()).unwrap();
assert_eq!(long_job.recv().unwrap(), Ok("done"));
},
);
}
fn with_timeout(timeout: std::time::Duration, f: fn()) {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let _ = std::thread::spawn(move || {
let catch_result = std::panic::catch_unwind(f);
sender.send(catch_result).unwrap();
});
match receiver.recv_timeout(timeout) {
Ok(Ok(())) => {},
Ok(Err(panic_payload)) => std::panic::resume_unwind(panic_payload),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => panic!("test timeout"),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => unreachable!(),
};
}
Да се реализира система за паралелно изпълнение на задачи (ThreadPool), която използва фиксиран брой нишки (workers). Системата трябва да приема задачи, да ги изпълнява в отделна нишка и да връща резултата обратно към потребителя по тип-безопасен начин.
Изисквания:
- Job Trait: Всяка задача трябва да имплементира Job trait. Той дефинира изходния тип (Output) и типа на грешката (Error).
- ThreadPool: При създаване трябва да стартира точно определен брой работни нишки.
- Submit: Методът submit не трябва да блокира. Той трябва да връща Receiver, по който потребителят ще получи резултата от изпълнението, когато то приключи.
- Graceful Shutdown: Когато ThreadPool излезе от scope (drop), той трябва да изпрати сигнал за спиране на всички нишки и да изчака всяка от тях да приключи (join).
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
// =========================================================
// 1. JOB TRAIT (НЕ СЕ ПРОМЕНЯ)
// =========================================================
pub trait Job: Send + 'static {
type Output: Send + 'static;
type Error: Send + 'static;
fn run(self) -> Result<Self::Output, Self::Error>;
}
// =========================================================
// 2. ВЪТРЕШНИ СЪОБЩЕНИЯ (НЕ СЕ ПРОМЕНЯ)
// =========================================================
enum Message {
Run(Box<dyn FnOnce() + Send>),
Shutdown,
}
// =========================================================
// 3. THREAD POOL СТРУКТУРА (ПОПЪЛНЕТЕ ТУК)
// =========================================================
pub struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
sender: mpsc::Sender<Message>,
}
impl ThreadPool {
pub fn new(worker_count: usize) -> Self {
let (sender, receiver) = mpsc::channel::<Message>();
// Използваме Arc<Mutex<...>>, за да споделим receiver между нишките
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
let thread_receiver = Arc::clone(&receiver);
let handle = thread::spawn(move || {
loop {
// 1. Получаване на съобщение от канала (внимавайте с lock-а)
// 2. Ако съобщението е Run(f) -> изпълнете f()
// 3. Ако съобщението е Shutdown -> прекъснете цикъла
/* ТУК ПОПЪЛНЕТЕ ЛОГИКАТА НА РАБОТНИКА */
}
});
workers.push(handle);
}
Self { workers, sender }
}
pub fn submit<J>(&self, job: J) -> mpsc::Receiver<Result<J::Output, J::Error>>
where
J: Job,
{
let (result_sender, result_receiver) = mpsc::channel();
// Трябва да опаковаме изпълнението на job.run() в closure
// и да го изпратим към workers чрез self.sender
/* ТУК ПОПЪЛНЕТЕ ЛОГИКАТА ЗА ИЗПРАЩАНЕ */
result_receiver
}
}
// =========================================================
// 4. ГРАЦИОЗНО СПИРАНЕ (ПОПЪЛНЕТЕ ТУК)
// =========================================================
impl Drop for ThreadPool {
fn drop(&mut self) {
// 1. Изпратете Shutdown съобщение за всяка нишка
// 2. Направете join() на всяка нишка от self.workers
/* ТУК ПОПЪЛНЕТЕ ЛОГИКАТА ЗА DROP */
}
}
}
Задължително прочетете (или си припомнете): Указания за предаване на домашни
Погрижете се решението ви да се компилира с базовия тест:
// Include the solution source in the same file, so we
// don't have to worry about item visibility.
// Please don't use `include!` in real code, this is a hack
// around the checking system.
include!{ "../src/lib.rs" }
fn job_from_fn<F, T, E>(f: F) -> impl Job<Output=T, Error=E>
where
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
struct FnJob<F> {
f: F,
}
impl<F, T, E> Job for FnJob<F> where
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
type Output = T;
type Error = E;
fn run(self) -> Result<Self::Output, Self::Error> {
(self.f)()
}
}
FnJob { f }
}
#[test]
fn test_jobs_run_in_parallel() {
with_timeout(
std::time::Duration::from_secs(3),
move || {
let pool = ThreadPool::new(4);
let start = std::time::Instant::now();
let recv1 = pool.submit(job_from_fn::<_, i32, ()>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(1)
}));
let recv2 = pool.submit(job_from_fn::<_, i32, &str>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Err("err2")
}));
let recv3 = pool.submit(job_from_fn::<_, i32, ()>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(3)
}));
let recv4 = pool.submit(job_from_fn::<_, i32, &str>(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
Err("err4")
}));
assert_eq!(recv1.recv().unwrap(), Ok(1));
assert_eq!(recv2.recv().unwrap(), Err("err2"));
assert_eq!(recv3.recv().unwrap(), Ok(3));
assert_eq!(recv4.recv().unwrap(), Err("err4"));
let elapsed = std::time::Instant::now().duration_since(start);
if elapsed.as_millis() > 800 {
panic!("assertion failed: expected elapsed < 800ms, found elapsed = {:?}", elapsed)
}
},
);
}
#[test]
fn test_submit_queue() {
with_timeout(
std::time::Duration::from_secs(3),
|| {
let pool = ThreadPool::new(2);
let wait_pair = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let (job_started_sender, job_started_receiver) = mpsc::channel();
let make_job = |name| {
let wait_pair = Arc::clone(&wait_pair);
let sender = job_started_sender.clone();
job_from_fn::<_, _, ()>(move || {
sender.send(format!("started {name}")).unwrap();
{
let (mutex, condvar) = &*wait_pair;
let lock = mutex.lock().unwrap();
let _lock = condvar.wait_while(lock, |lock| {*lock != true}).unwrap();
}
Ok(format!("done {name}"))
})
};
let job_result_1 = pool.submit(make_job(1));
let job_result_2 = pool.submit(make_job(2));
let job_result_3 = pool.submit(make_job(3));
let job_result_4 = pool.submit(make_job(4));
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(job_started_receiver.try_recv(), Ok("started 1".to_string()));
assert_eq!(job_started_receiver.try_recv(), Ok("started 2".to_string()));
assert_eq!(job_started_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
*wait_pair.0.lock().unwrap() = true;
wait_pair.1.notify_all();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(job_started_receiver.try_recv(), Ok("started 3".to_string()));
assert_eq!(job_started_receiver.try_recv(), Ok("started 4".to_string()));
assert_eq!(job_started_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
assert_eq!(job_result_1.recv().unwrap(), Ok("done 1".to_string()));
assert_eq!(job_result_2.recv().unwrap(), Ok("done 2".to_string()));
assert_eq!(job_result_3.recv().unwrap(), Ok("done 3".to_string()));
assert_eq!(job_result_4.recv().unwrap(), Ok("done 4".to_string()));
},
);
}
#[test]
fn test_submit_recursive() {
with_timeout(
std::time::Duration::from_secs(3),
move || {
let pool = Arc::new(ThreadPool::new(1));
let pool_clone = Arc::clone(&pool);
let recv1 = pool.submit(job_from_fn::<_, _, ()>(move || {
let recv2 = pool_clone.submit(job_from_fn::<_, _, ()>(|| {
Ok(())
}));
Ok(recv2)
}));
let recv2 = recv1.recv().unwrap().unwrap();
let result = recv2.recv().unwrap();
assert_eq!(result, Ok(()));
},
)
}
#[test]
fn test_scheduling() {
// Проверява, че изпълняването на една дълга задача не блокира
// изпълняването на други задачи, ако има свободни нишки.
with_timeout(
std::time::Duration::from_secs(3),
move || {
let pool = ThreadPool::new(2);
let (wait_token_sender, wait_token_receiver) = mpsc::channel();
let long_job = pool.submit(job_from_fn::<_, &str, ()>(move || {
let _ = wait_token_receiver.recv();
Ok("done")
}));
let short_job_1 = pool.submit(job_from_fn::<_, i32, ()>(|| { Ok(1) }));
let short_job_2 = pool.submit(job_from_fn::<_, i32, ()>(|| { Ok(2) }));
assert_eq!(short_job_1.recv().unwrap(), Ok(1));
assert_eq!(short_job_2.recv().unwrap(), Ok(2));
wait_token_sender.send(()).unwrap();
assert_eq!(long_job.recv().unwrap(), Ok("done"));
},
);
}
fn with_timeout(timeout: std::time::Duration, f: fn()) {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let _ = std::thread::spawn(move || {
let catch_result = std::panic::catch_unwind(f);
sender.send(catch_result).unwrap();
});
match receiver.recv_timeout(timeout) {
Ok(Ok(())) => {},
Ok(Err(panic_payload)) => std::panic::resume_unwind(panic_payload),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => panic!("test timeout"),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => unreachable!(),
};
}
