Запазване на анонимни функции, Read, Write, Networking

2 декември 2020

Административни неща

Разглеждане на второто домашно с примерна имплементация: https://youtu.be/HtyMM_RGexU

Saving closures

Да си направим адаптер за итератор, който работи подобно на
адаптера връщан от Iterator::map()

1 2 3 4 5 6 7
struct Map<I, F, B> where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    iter: I,
    f: F,
}
fn main() {}
struct Map where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    iter: I,
    f: F,
}

Saving closures

Имплементираме Iterator

1 2 3 4 5 6 7 8 9 10 11 12 13
impl<I, F, B> Iterator for Map<I, F, B> where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    type Item = B;

    fn next(&mut self) -> Option<Self::Item> {
        match self.iter.next() {
            Some(item) => Some((self.f)(item)),
            None => None,
        }
    }
}
struct Map where I: Iterator, F: FnMut(I::Item) -> B {
iter: I,
f: F,
}
fn main() {}
impl Iterator for Map where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    type Item = B;

    fn next(&mut self) -> Option {
        match self.iter.next() {
            Some(item) => Some((self.f)(item)),
            None => None,
        }
    }
}

Saving closures

Имплементираме Iterator

1 2 3 4 5 6 7 8 9 10 11 12 13
impl<I, F, B> Iterator for Map<I, F, B> where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    type Item = B;

    fn next(&mut self) -> Option<Self::Item> {
        match self.iter.next() {
            Some(item) => Some((self.f)(item)),
            None => None,
        }
    }
}
struct Map where I: Iterator, F: FnMut(I::Item) -> B {
iter: I,
f: F,
}
fn main() {}
impl Iterator for Map where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    type Item = B;

    fn next(&mut self) -> Option {
        match self.iter.next() {
            Some(item) => Some((self.f)(item)),
            None => None,
        }
    }
}

Забележете скобите около self.f

Saving closures

Малко улеснение

1 2 3 4 5 6 7 8 9 10
impl<I, F, B> Iterator for Map<I, F, B> where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    type Item = B;

    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next().map(|x| (self.f)(x))
    }
}
struct Map where I: Iterator, F: FnMut(I::Item) -> B {
iter: I,
f: F,
}
fn main() {}
impl Iterator for Map where
    I: Iterator,
    F: FnMut(I::Item) -> B
{
    type Item = B;

    fn next(&mut self) -> Option {
        self.iter.next().map(|x| (self.f)(x))
    }
}

Saving closures

1 2 3 4 5 6 7 8 9
// vec![1, 2, 3].into_iter().map(|x| x * 3)

let map = Map {
    iter: vec![1, 2, 3].into_iter(),
    f: |x| x * 3,
};

let v = map.collect::<Vec<_>>();
println!("{:?}", v);
[3, 6, 9]
struct Map where I: Iterator, F: FnMut(I::Item) -> B {
iter: I,
f: F,
}
impl Iterator for Map where
I: Iterator,
F: FnMut(I::Item) -> B
{
type Item = B;
fn next(&mut self) -> Option {
self.iter.next().map(|x| (self.f)(x))
}
}
fn main() {
// vec![1, 2, 3].into_iter().map(|x| x * 3)

let map = Map {
    iter: vec![1, 2, 3].into_iter(),
    f: |x| x * 3,
};

let v = map.collect::>();
println!("{:?}", v);
}

Returning closures

1 2 3
fn get_incrementer() -> ??? {
    |x| x + 1
}

Returning closures

Да проверим какъв е типът на closure-а

1
let _: () = |x| x + 1;

Returning closures

Да проверим какъв е типът на closure-а

1
let _: () = |x| x + 1;
error[E0308]: mismatched types --> src/bin/main_f4ff4185076e43830fdfe11463e56a301ab6f9e8.rs:2:13 | 2 | let _: () = |x| x + 1; | -- ^^^^^^^^^ expected `()`, found closure | | | expected due to this | = note: expected unit type `()` found closure `[closure@src/bin/main_f4ff4185076e43830fdfe11463e56a301ab6f9e8.rs:2:13: 2:22]`
fn main() {
let _: () = |x| x + 1;
}

Тип генериран от компилатора, това не ни е полезно

Returning closures

Вариант 1

Ако closure не прихваща променливи, той може автоматично да се сведе до указател към функция

1 2 3
fn get_incrementer() -> fn(i32) -> i32 {
    |x| x + 1
}
fn main() {}
fn get_incrementer() -> fn(i32) -> i32 {
    |x| x + 1
}

Returning closures

Вариант 2

Често се налага да прихванем променливи

1 2 3
fn curry(a: u32) -> ??? {
    |b| a + b
}

Returning closures

Вариант 2

Можем да използваме Trait objects

1 2 3
struct F<'a> {
    closure: &'a dyn Fn()
}

fn main() {}
struct F<'a> {
    closure: &'a dyn Fn()
}

1 2 3
struct F {
    closure: Box<dyn Fn()>
}

fn main() {}
struct F {
    closure: Box
}

Returning closures

Вариант 2

Така дали ще е добре?

1 2 3
fn curry(a: u32) -> Box<dyn Fn(u32) -> u32> {
    Box::new(|b| a + b)
}

Returning closures

Вариант 2

Така дали ще е добре?

1 2 3
fn curry(a: u32) -> Box<dyn Fn(u32) -> u32> {
    Box::new(|b| a + b)
}
error[E0373]: closure may outlive the current function, but it borrows `a`, which is owned by the current function --> src/bin/main_c3074d2dcf287ba7bd8e52e50983e379c8e9c09b.rs:4:14 | 4 | Box::new(|b| a + b) | ^^^ - `a` is borrowed here | | | may outlive borrowed value `a` | note: closure is returned here --> src/bin/main_c3074d2dcf287ba7bd8e52e50983e379c8e9c09b.rs:4:5 | 4 | Box::new(|b| a + b) | ^^^^^^^^^^^^^^^^^^^ help: to force the closure to take ownership of `a` (and any other referenced variables), use the `move` keyword | 4 | Box::new(move |b| a + b) | ^^^^^^^^
fn main() {}
fn curry(a: u32) -> Box u32> {
    Box::new(|b| a + b)
}

Returning closures

Вариант 2

move

1 2 3 4 5
fn curry(a: u32) -> Box<dyn Fn(u32) -> u32> {
    Box::new(move |b| a + b)
}

println!("{}", curry(1)(2));
3
fn curry(a: u32) -> Box u32> {
    Box::new(move |b| a + b)
}

fn main() {
println!("{}", curry(1)(2));
}

Closures & lifetimes

А какво става, ако искаме да прихванем референция?

1 2 3
fn curry<'a>(a: &'a u32) -> Box<dyn Fn(&u32) -> u32> {
    Box::new(move |b| a + b)
}
error[E0759]: `a` has lifetime `'a` but it needs to satisfy a `'static` lifetime requirement --> src/bin/main_1fc5deed1c334e9a9574d1cbc23849d5468c176e.rs:3:14 | 2 | fn curry<'a>(a: &'a u32) -> Box<dyn Fn(&u32) -> u32> { | ------- this data with lifetime `'a`... 3 | Box::new(move |b| a + b) | ^^^^^^^^^^^^^^ ...is captured here, requiring it to live as long as `'static` | help: to declare that the trait object captures data from argument `a`, you can add an explicit `'a` lifetime bound | 2 | fn curry<'a>(a: &'a u32) -> Box<dyn Fn(&u32) -> u32 + 'a> { | ^^^^
fn main() {}
fn curry<'a>(a: &'a u32) -> Box u32> {
    Box::new(move |b| a + b)
}

Closures & lifetimes

1 2 3 4 5 6 7 8 9 10
struct State<'b> {
    a: &'b u32
}

// impl Fn, FnMut, FnOnce for State

fn curry<'a>(a: &'a u32) -> Box<State<'a>> {
    let state = State { a };    // State<'a>
    Box::new(state)             // очаква 'static
}

Closures & lifetimes

Lifetime на структура

Какво означава обект (който не е референция) да има 'static lifetime?

Lifetime-а показва максимално ограничение до което може да живее някаква стойност

1 2 3 4 5 6 7 8
struct Foo<'a> { a: &'a i32 }

{
    let a = 10;                     // ---+- 'a
                                    //    |
    let foo = Foo { a: &a };        // ---+- foo: 'a
                                    //    |
}                                   // <--+
fn main() {
struct Foo<'a> { a: &'a i32 }

{
    let a = 10;                     // ---+- 'a
                                    //    |
    let foo = Foo { a: &a };        // ---+- foo: 'a
                                    //    |
}                                   // <--+
}

Closures & lifetimes

Lifetime на структура

Когато обект не държи референции няма такова ограничение

Затова се приема че обекта има 'static lifetime

1 2 3 4 5 6 7
struct Foo { a: i32 }

{
    let a = 10;

    let foo = Foo { a: a };         // foo: 'static
}
fn main() {
struct Foo { a: i32 }

{
    let a = 10;

    let foo = Foo { a: a };         // foo: 'static
}
}

Closures & lifetimes

По подразбиране се очаква trait object-а да няма lifetime ограничение, т.е да е 'static

Box<dyn Fn(&u32) -> u32> <-> Box<dyn Fn(&u32) -> u32 + 'static>;

Ако имаме ограничение трябва да го укажем изрично

1 2 3 4 5
fn curry<'a>(a: &'a u32) -> Box<dyn Fn(&u32) -> u32 + 'a> {
    Box::new(move |b| a + b)
}

println!("{}", curry(&1)(&2));
3
fn curry<'a>(a: &'a u32) -> Box u32 + 'a> {
    Box::new(move |b| a + b)
}

fn main() {
println!("{}", curry(&1)(&2));
}

Promise Demo

Ще опитаме да си имплементираме Promise в Rust

Promise Demo

Promise Demo

JavaScript API

1 2 3 4 5 6 7 8
const promise = new Promise((resolve, reject) => {
    setTimeout(() => {
        resolve('foo');
    }, 300);
});

promise.then(value => console.log(value));
promise.catch(error => console.log(error));

Promise Demo

JavaScript API

1 2 3 4 5 6 7 8
const promise = new Promise((resolve, reject) => {
    setTimeout(() => {
        resolve('foo');
    }, 300);
});

promise.then(value => console.log(value));
promise.catch(error => console.log(error));

Promise Demo

Ако искате да си разгледате сорса: https://github.com/d3lio/simple-promise

Read & Write

Има стандартни типажи, които ни помагат за четене и писане

Read & Write

std::io::Read

Един от тях е Read

1 2 3 4 5 6 7 8 9 10 11 12 13
pub trait Read {
    // Required:
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;

    // Provided:
    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { ... }
    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { ... }
    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { ... }
    fn by_ref(&mut self) -> &mut Self where Self: Sized { ... }
    fn bytes(self) -> Bytes<Self> where Self: Sized { ... }
    fn chain<R: Read>(self, next: R) -> Chain<Self, R> where Self: Sized { ... }
    fn take(self, limit: u64) -> Take<Self> where Self: Sized { ... }
}

Read & Write

Бележка:

В модула std::io има следната дефиниция:

1
type Result<T> = Result<T, Error>;

Този синтаксис дефинира type alias ("тип-синоним"). Типа std::io::Result<T> е еквивалентен на Result<T, std::io::Error>.

Това се използва за улеснение, и спокойно може да use-нем std::io и да адресираме io::Result<usize> без да указваме типа за грешка -- той вече е конкретизиран в alias-а.

Read

Имплементира се за някои очаквани структури и слайсове от байтове

1 2 3 4
impl Read for File
impl Read for Stdin
impl Read for TcpStream
impl<'_> Read for &'_ [u8]

Read

Четене от File

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
use std::io;
use std::io::prelude::*;
use std::fs::File;

fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt")?;
    let mut buffer = [0; 10];

    // Може да прочетем само 10 байта
    f.read(&mut buffer)?;

    let mut buffer = Vec::new();
    // Може да прочетем целия файл
    f.read_to_end(&mut buffer)?;

    // Или директно да четем в String
    let mut buffer = String::new();
    f.read_to_string(&mut buffer)?;

    Ok(())
}

Read

Четене от Stdin

1 2 3 4
use std::io::{self, Read};

let mut buffer = String::new();
io::stdin().read_to_string(&mut buffer)?;

Read & Write

std::io::Write

За писане се използва Write

1 2 3 4 5 6 7 8 9 10
pub trait Write {
    // Required:
    fn write(&mut self, buf: &[u8]) -> io::Result<usize>;
    fn flush(&mut self) -> io::Result<()>;

    // Provided:
    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { ... }
    fn write_fmt(&mut self, fmt: Arguments) -> io::Result<()> { ... }
    fn by_ref(&mut self) -> &mut Self where Self: Sized { ... }
}

Write

Както Read, се имплементира за очаквани структури, но и за вектор

1 2 3 4 5
impl Write for File
impl Write for Stdout
impl Write for Stderr
impl Write for TcpStream
impl Write for Vec<u8>

Write

1 2 3 4 5
use std::fs::File;
use std::io::Write;

let mut f = File::create("foo.txt")?;
f.write_all(b"Hello, world!")?;

Read & Write

Като цяло са интуитивни, но не винаги ефективни когато правим много, но малки операции

BufReader & BufWriter

BufReader & BufWriter

BufReader & BufWriter

BufReader & BufWriter

std::io::BufReader

BufReader е wrapper за структури, които имплементират Read

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::io::prelude::*;
use std::io::BufReader;
use std::fs::File;

fn main() -> Result<(), std::io::Error> {
    let f = File::open("log.txt")?;
    let mut reader = BufReader::new(f);

    let mut line = String::new();
    let len = reader.read_line(&mut line)?;
    println!("First line is {} bytes long", len);
    Ok(())
}
First line is 10 bytes long
use std::io::prelude::*;
use std::io::BufReader;
use std::fs::File;

fn main() -> Result<(), std::io::Error> {
std::fs::write("log.txt", b"Some stuff").unwrap();
    let f = File::open("log.txt")?;
    let mut reader = BufReader::new(f);

    let mut line = String::new();
    let len = reader.read_line(&mut line)?;
    println!("First line is {} bytes long", len);
    Ok(())
}

BufReader & BufWriter

std::io::BufRead

Тук се появява нов метод read_line:

1 2 3 4 5 6 7 8 9 10 11
pub trait BufRead: Read {
    // Required:
    fn fill_buf(&mut self) -> io::Result<&[u8]>;
    fn consume(&mut self, amt: usize);

    // Provided:
    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> { ... }
    fn read_line(&mut self, buf: &mut String) -> io::Result<usize> { ... }
    fn split(self, byte: u8) -> Split<Self> where Self: Sized { ... }
    fn lines(self) -> Lines<Self> where Self: Sized { ... }
}

BufReader & BufWriter

BufWriter

Подобно, BufWriter е wrapper за структури, които имплементират Write

1 2 3 4 5 6 7 8 9
use std::io::prelude::*;
use std::io::BufWriter;
use std::net::TcpStream;

let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());

for i in 1..10 {
    stream.write(&[i]).unwrap();
}

В този пример чрез BufWriter превръщаме 10 system calls в 1

BufReader & BufWriter

BufWrite

Няма BufWrite :(

BufReader & BufWriter

BufReader & BufWriter

Read & Write

Write може да се използва и за тестване чрез mock

1 2 3 4 5 6 7 8 9 10 11 12 13 14
fn write_u8<W>(writer: &mut W, data: u8) -> io::Result<usize>
where W: Write {
    // Do cool stuff with `writer`
}

#[test]
fn test_write_u8() {
    let mut mock: Vec<u8> = Vec::new();

    write_u8(&mut mock, 42).unwrap();

    assert_eq!(mock.len(), 1);
    assert_eq!(mock[0], 42);
}

Networking

Стандартната библиотека имплементира networking примитиви в модула std::net

UDP

UdpSocket

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
use std::net::UdpSocket;

// сокета се затваря на края на scope-a
fn main() {
    let mut socket = UdpSocket::bind("127.0.0.1:34254").unwrap();

    // Получава една дейтаграма от сокета. Ако буфера е прекалено малък за съобщението,
    // то ще бъде орязано.
    let mut buf = [0; 10];
    let (amt, src) = socket.recv_from(&mut buf).unwrap();

    // Редекларира `buf` като слайс от получените данни и ги праща в обратен ред.
    let buf = &mut buf[..amt];
    buf.reverse();
    socket.send_to(buf, &src).unwrap();
}
use std::net::UdpSocket;

// сокета се затваря на края на scope-a
fn main() {
    let mut socket = UdpSocket::bind("127.0.0.1:34254").unwrap();

    // Получава една дейтаграма от сокета. Ако буфера е прекалено малък за съобщението,
    // то ще бъде орязано.
    let mut buf = [0; 10];
    let (amt, src) = socket.recv_from(&mut buf).unwrap();

    // Редекларира `buf` като слайс от получените данни и ги праща в обратен ред.
    let buf = &mut buf[..amt];
    buf.reverse();
    socket.send_to(buf, &src).unwrap();
}

TCP

TcpStream

1 2 3 4 5 6 7 8 9 10
use std::io::prelude::*;
use std::net::TcpStream;

// стриймът се затваря на края на scope-a
fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();

    let _ = stream.write(&[1]);
    let _ = stream.read(&mut [0; 128]);
}
use std::io::prelude::*;
use std::net::TcpStream;

// стриймът се затваря на края на scope-a
fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();

    let _ = stream.write(&[1]);
    let _ = stream.read(&mut [0; 128]);
}

TCP

TcpListener

1 2 3 4 5 6 7 8 9 10 11 12 13 14
use std::net::{TcpListener, TcpStream};

fn handle_client(stream: TcpStream) {
    // ...
}

fn main() {
    let listener = TcpListener::bind("127.0.0.1:80").unwrap();

    // приема конекции и ги обработва
    for stream in listener.incoming() {
        handle_client(stream.unwrap());
    }
}
use std::net::{TcpListener, TcpStream};

fn handle_client(stream: TcpStream) {
    // ...
}

fn main() {
    let listener = TcpListener::bind("127.0.0.1:80").unwrap();

    // приема конекции и ги обработва
    for stream in listener.incoming() {
        handle_client(stream.unwrap());
    }
}

TCP

Simple chat

Ще разгледаме проста чат система за демонстрация на нишки, канали и TCP

Пълният код може да се разгледа в Github - https://github.com/d3lio/simple-chat

TCP

Simple chat

Какво няма да обхванем:

TCP

Simple chat

Какво няма да обхванем:

TCP

Simple chat

Какво няма да обхванем:

Simple chat

Server

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let server = TcpListener::bind(LOCALHOST).expect("Listener failed to bind");
    server.set_nonblocking(true).expect("Failed to initialize nonblocking");

    // Stores client sockets
    let mut clients = Vec::<TcpStream>::new();
    let (sx, rx) = mpsc::channel::<String>();

    loop {
        /* accept */
        /* broadcast */
        thread::sleep(Duration::from_millis(100));
    }
}
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let server = TcpListener::bind(LOCALHOST).expect("Listener failed to bind");
    server.set_nonblocking(true).expect("Failed to initialize nonblocking");

    // Stores client sockets
    let mut clients = Vec::::new();
    let (sx, rx) = mpsc::channel::();

    loop {
        /* accept */
        /* broadcast */
        thread::sleep(Duration::from_millis(100));
    }
}

Server

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
use std::io::{ErrorKind, Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn sleep() {
    thread::sleep(Duration::from_millis(100));
}

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let server = TcpListener::bind(LOCALHOST).expect("Listener failed to bind");
    server.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let mut clients = Vec::new();
    let (sx, rx) = mpsc::channel::<String>();

    loop {
        // Try to accept a client
        if let Ok((mut socket, addr)) = server.accept() {
            println!("Client {} connected", addr);

            let sx = sx.clone();

            clients.push(socket.try_clone().expect("Failed to clone client"));

            thread::spawn(move || loop {
                let mut buf = vec![0; MESSAGE_SIZE];

                // Try to receive message from client
                match socket.read_exact(&mut buf) {
                    Ok(_) => {
                        let msg = buf.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
                        let msg = String::from_utf8(msg).expect("Invalid utf8 message");

                        println!("{}: {:?}", addr, msg);
                        sx.send(msg).expect("Send to master channel failed");
                    },
                    Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
                    Err(_) => {
                        println!("Closing connection with: {}", addr);
                        break;
                    }
                }

                sleep();
            });
        }

        if let Ok(msg) = rx.try_recv() {
            // Try to send message from master channel
            clients = clients.into_iter().filter_map(|mut client| {
                let mut buf = msg.clone().into_bytes();
                buf.resize(MESSAGE_SIZE, 0);

                match client.write_all(&buf) {
                    Ok(_) => Some(client),
                    _ => None,
                }
            }).collect::<Vec<_>>();
        }

        sleep();
    }
}
use std::io::{ErrorKind, Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn sleep() {
    thread::sleep(Duration::from_millis(100));
}

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let server = TcpListener::bind(LOCALHOST).expect("Listener failed to bind");
    server.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let mut clients = Vec::new();
    let (sx, rx) = mpsc::channel::();

    loop {
        // Try to accept a client
        if let Ok((mut socket, addr)) = server.accept() {
            println!("Client {} connected", addr);

            let sx = sx.clone();

            clients.push(socket.try_clone().expect("Failed to clone client"));

            thread::spawn(move || loop {
                let mut buf = vec![0; MESSAGE_SIZE];

                // Try to receive message from client
                match socket.read_exact(&mut buf) {
                    Ok(_) => {
                        let msg = buf.into_iter().take_while(|&x| x != 0).collect::>();
                        let msg = String::from_utf8(msg).expect("Invalid utf8 message");

                        println!("{}: {:?}", addr, msg);
                        sx.send(msg).expect("Send to master channel failed");
                    },
                    Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
                    Err(_) => {
                        println!("Closing connection with: {}", addr);
                        break;
                    }
                }

                sleep();
            });
        }

        if let Ok(msg) = rx.try_recv() {
            // Try to send message from master channel
            clients = clients.into_iter().filter_map(|mut client| {
                let mut buf = msg.clone().into_bytes();
                buf.resize(MESSAGE_SIZE, 0);

                match client.write_all(&buf) {
                    Ok(_) => Some(client),
                    _ => None,
                }
            }).collect::>();
        }

        sleep();
    }
}

Simple chat

Client

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::net::TcpStream;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let mut client = TcpStream::connect(LOCALHOST).expect("Stream failed to connect");
    client.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let (sx, rx) = mpsc::channel::<String>();

    thread::spawn(move || loop {
        /* try recv */
        /* try send */
        thread::sleep(Duration::from_millis(100));
    });

    /* repl */
}
use std::net::TcpStream;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let mut client = TcpStream::connect(LOCALHOST).expect("Stream failed to connect");
    client.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let (sx, rx) = mpsc::channel::();

    thread::spawn(move || loop {
        /* try recv */
        /* try send */
        thread::sleep(Duration::from_millis(100));
    });

    /* repl */
}

Client

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
use std::io::{self, ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let mut client = TcpStream::connect(LOCALHOST).expect("Stream failed to connect");
    client.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let (sx, rx) = mpsc::channel::<String>();

    thread::spawn(move || loop {
        let mut buf = vec![0; MESSAGE_SIZE];

        // Try to receive message from server
        match client.read_exact(&mut buf) {
            Ok(_) => {
                let msg = buf.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
                let msg = String::from_utf8(msg).expect("Invalid utf8 message");
                println!("message recv {:?}", msg);
            },
            Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
            Err(_) => {
                println!("Connection with the server closed");
                break;
            }
        }

        // Try to send message from repl
        match rx.try_recv() {
            Ok(msg) => {
                let mut buf = msg.clone().into_bytes();
                buf.resize(MESSAGE_SIZE, 0);
                client.write_all(&buf).expect("Writing to socket failed");
                println!("message sent {:?}", msg);
            },
            Err(TryRecvError::Empty) => (),
            Err(TryRecvError::Disconnected) => break
        }

        thread::sleep(Duration::from_millis(100));
    });

    println!("repl");
    loop {
        let mut buf = String::new();
        io::stdin().read_line(&mut buf).expect("Reading from stdin failed");
        let msg = buf.trim().to_string();

        if msg == ":q" || sx.send(msg).is_err() { break }
    }
    println!("bye!");
}
use std::io::{self, ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let mut client = TcpStream::connect(LOCALHOST).expect("Stream failed to connect");
    client.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let (sx, rx) = mpsc::channel::();

    thread::spawn(move || loop {
        let mut buf = vec![0; MESSAGE_SIZE];

        // Try to receive message from server
        match client.read_exact(&mut buf) {
            Ok(_) => {
                let msg = buf.into_iter().take_while(|&x| x != 0).collect::>();
                let msg = String::from_utf8(msg).expect("Invalid utf8 message");
                println!("message recv {:?}", msg);
            },
            Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
            Err(_) => {
                println!("Connection with the server closed");
                break;
            }
        }

        // Try to send message from repl
        match rx.try_recv() {
            Ok(msg) => {
                let mut buf = msg.clone().into_bytes();
                buf.resize(MESSAGE_SIZE, 0);
                client.write_all(&buf).expect("Writing to socket failed");
                println!("message sent {:?}", msg);
            },
            Err(TryRecvError::Empty) => (),
            Err(TryRecvError::Disconnected) => break
        }

        thread::sleep(Duration::from_millis(100));
    });

    println!("repl");
    loop {
        let mut buf = String::new();
        io::stdin().read_line(&mut buf).expect("Reading from stdin failed");
        let msg = buf.trim().to_string();

        if msg == ":q" || sx.send(msg).is_err() { break }
    }
    println!("bye!");
}

Въпроси