Home Rhyylen
Contatto
 
 
 
Rust Language
Capitolo 44
Invio di messaggi

Abbiamo visto nel paragrafo precedente la creazione di thread e la gestione di variabili esterne. In questo ci concetreremo sul passaggio di messaggi tra thread. L'idea è mutuata come specificato sul sito ufficiale, dal linguaggio Go di Google, la presento in lingua originale: “Do not communicate by sharing memory; instead, share memory by communicating.” In pratica abbiamo una comunicazione che avviene tra canali ("channels") similmente ad altri linguaggi, come Go, ovviamente oppure Clojure ma anche C#, per dirne un altro, ha una libreria (System.Threading.Channels ) che  implementa una forma di comunicazione tramite i canali (non è l'unico sistema in quel linguaggio).
In Rust abbiamo nell'ambito del crate std abbiamo il modulo sync ed in particolare la funzione
mpsc::channel  che ci permette di creare un nuovo canale. mpsc è un oggetto il cui nome sta per multi-producer, single-consumer  ma all'interno di sync ve ne sono altri come  mpmc (multi-producer, multi-consumer, attualmente sperimentale) o Mutex. Per il momento ci concentriamo sul primo, mpsc
Prendiamo in esame un esempio di creazione di un canale così come si trova anche sulla documentazione ufficiale, ottima base di partenza:

let (tx, rx) = mpsc::channel();

Questa istruzione crea un canale in cui tx è il trasmettitore ed rx il ricevente. 
Vediamo come si può creare un programma che crea un thread secondario che invia un messaggio, una stringa al main.

  Esempio 44.1
1
2
3
4
5
6
7
8
9
10
11
12
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel(); // Creiamo un canale (tx = sender, rx = receiver)
    thread::spawn(move || {
        let msg = String::from("Messaggio dal thread!");
        tx.send(msg).unwrap(); // Invia il messaggio
    });
    let ricevuto = rx.recv().unwrap(); // Attende e riceve il messaggio
    println!("Main thread ha ricevuto: {}", ricevuto);
}

Alla riga 5 creiamo dunque il nostro canale.
Alla riga 6 creiamo il thread secondario che deve essere corredato ma move al fine di accedere a tx ed rx.
Alla riga 7 viene creato un messaggio, una stringa che viene inviata tramite send. Quest'ultimo metodo, send, restituisce un Result il cui output può essere gestito con le solite metodiche., nel caso specifico, proprio come fanno anche nel testo ufficiasle, viene generato un panic s fornte di un eventuale criticità ad esempio potrebbe capitare (non è questo il caso) che il thread ricevente sia stato oggetto di un drop; comunque sia non avrete mai risultati strani ed imprevedibili.
Il messaggio viene ricevuto alla riga 10 tramite il metodo recv (receive), anch'esso restituisce un result. Questo metodo, recv, blocca il main in attesa di un messaggio.
Vediamo adesso un altro esempio, simile al precedente, dove il main riceve un intero dal thread secondario e lo elabora, usando le solite tecniche, niente di nuovo ma repetita juvant:

  Esempio 44.2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let msg: i32 = 44; // Messaggio da inviare
        tx.send(msg).unwrap();
    });
    let mut x1: i32 = 0; // Dichiarazione di x1 al di fuori del match
    match rx.recv() {
        Ok(valore) => x1 = valore, // Assegna il valore ricevuto a x1
        Err(e) => println!("Errore nel ricevere il messaggio: {}", e),
    }
    // Ora x1 è utilizzabile al di fuori del match
    println!("Valore di x1: {}", x1);
    let risultato = x1 * 3;
    println!("Tre volte x1: {}", risultato);
}

Bene, credo sia tutto abbastanza chiaro in questo esempio, non c'è nulla di nuovo o di non visto, abbiamo semplicemente aggiunto, a scopo pratico, un modo per lavorare con i messaggi che arrivano da un trasmettitore.
Abbiamo detto che
recv() blocca il thread in attesa fino a che non arriva un messaggio. Se per assurdo eliminassimo le righe dalla 5 alla 8, il nostro main rimarrebbe in attesa per sempre o almeno fino a che non interrompiamo manualmente il programma. Questa può essere una situazione sgradevole se non organizzate per bene i messaggi e il loro flusso. Per evitare questo possiamo usare try_recv() che controlla se vi è un messaggio disponibile, invece di bloccare il thread ricevente e restituisce un errore se non arriva nulla. In pratica: 

use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    let mut x1: i32 = 0; // Dichiarazione di x1 al di fuori del match
    match rx.recv() {
        Ok(valore) => x1 = valore, // Assegna il valore ricevuto a x1
        Err(e) => println!("Errore nel ricevere il messaggio: {}", e),
    }
    // Ora x1 è utilizzabile al di fuori del match
    println!("Valore di x1: {}", x1);
    let risultato = x1 * 3;
    println!("Tre volte x1: {}", risultato);
}

In pratica questo codice si blocca ma se al posto di rx.recv() mettiamo rx.try_recv() allora l'output sarà:

Errore nel ricevere il messaggio: receiving on an empty channel
Valore di x1: 0
Tre volte x1: 0

perchè main noin ha ricevuto nulla, con il programma che termina regolarmente. Problema risolto? Usiamo sempre try_recv()? Troppo facile... anche ripristinando il codice che troviamo nell'esempio 44.2 alle righe tra la 5 e la 8 potremmo avere lo stesso output, questo perchè non è detto che il segnale arrivi in tempo prima che il main sia chiuso. Quindi il codice realmente funzionante dovrebbe introdurre un ritardo, ad esempio come vedte qui di seguito, usando sleep:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
  let (tx, rx) = mpsc::channel();

  thread::spawn(move || {
    let msg: i32 = 44; // Messaggio da inviare
    tx.send(msg).unwrap();
  });
  thread::sleep(Duration::from_millis(100));

  let mut x1: i32 = 0; // Dichiarazione di x1 al di fuori del match
  match rx.try_recv() {
  Ok(valore) => x1 = valore, // Assegna il valore ricevuto a x1
  Err(e) => println!("Errore nel ricevere il messaggio: {}", e),
  }

  // Ora x1 è utilizzabile al di fuori del match
  println!("Valore di x1: {}", x1);
  let risultato = x1 * 3;
  println!("Tre volte x1: {}", risultato);
}

Non è molto bello ne elegante ma soprattutto non è funzionale, in scenari complessi sarebbe impossibile gestire tutto. Si può rimediare ad esempio usando combinazioni di Arc e Mutex ma ne parleremo più avanti.

Torniamo all'esempio 44.2 è interessante notare che se lo riscriviamo come segue:

  Esempio 44.3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let msg: i32 = 44; // Messaggio da inviare
        tx.send(msg).unwrap();
        println!("{}", msg);
    });
    let mut x1: i32 = 0; // Dichiarazione di x1 al di fuori del match
    match rx.recv() {
        Ok(valore) => x1 = valore, // Assegna il valore ricevuto a x1
        Err(e) => println!("Errore nel ricevere il messaggio: {}", e),
    }
    // Ora x1 è utilizzabile al di fuori del match
    println!("Valore di x1: {}", x1);
    let risultato = x1 * 3;
    println!("Tre volte x1: {}", risultato);
}

aggiungendo l'istruzione di stampa alla riga 8, esso funziona senza problemi ma il messaggio fosse costituito da una stringa allora riceveremmo un errore:

error[E0382]: borrow of moved value: `msg`

questo a causa del consueto fatto che le stringhe non implementano Copy e quando effettuiamo l'invio di un messaggio la proprietà della stringa viene attribuita a send e quindi al thread ricevente. Una soluzione, come avrete intuito, può essere quella di passare il clone della stringa, ad esempio, nel caso di una stringa:

thread::spawn(move || {
    let val = String::from("ciao");
    tx.send(val.clone()).unwrap(); // Invia una copia della stringa
println!("val is {val}"); // Continua a usare la stringa originale });

Insomma, una situazione da tenere presente.

Un altro caso interessante è il passaggio di più messaggi ad un thread ricevente. Anche in questo caso riprendiamo l'esempio che trovate anche nella documentazione ufficiale e che ritengo davvero il più semplice per illustrare questa parte:

  Esempio 44.4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for received in rx {
        println!("Got: {received}");
    }
}

creiamo quindi un vec di stringhe ciascun elemento del quale viene passato come parametro da inviare via send tramite il ciclo che inizia alla riga 13. E' interessante notare che per il ricevente rx non è esplicitato il metodo recv(). Questo perchè rx implementa IntoIterator e quindi può essere usato come un iteratore. Dentro un for in questi casi non è possibile usare
recv(), ovvero riscrivere la riga 18 come
rx.recv
in quanto, dal momento che iterando già viene richiamato
recv() è next che richiama recv in quanto internamente abbiamo questa implementazione per l'iteratore:

impl<T> Iterator for Receiver<T> {
  type Item = T;
fn next(&mut self) -> Option<T> {
  self.recv().ok()
  }
}


e quindi esplicitarlo vorrebbe dire attendere un secondo messaggio che non c'è. Se si vuole proprio usare recv si può scrivere invece del for:

while let Ok(received) = rx.recv() {
println!("Got: {}", received);


il ciclo evidentemente è alimentato da Ok(received).

Ora, dopo aver visto come trattare più messaggi dobbiamo vedere come gestire più "produttori".
Vediamo un esempio e poi lo commentiamo:

  Esempio 44.5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel(); // Canale per la comunicazione
    let letters = ['a', 'b', 'c', 'd', 'e']; // Caratteri fissi
    let mut handles = vec![];
    for letter in letters {
        let tx_clone = tx.clone();
        let handle = thread::spawn(move || {
            for _ in 0..5 {
                tx_clone.send(letter).unwrap();
                println!("Produttore ha inviato '{}'", letter);
                thread::sleep(Duration::from_millis(100)); // Simula lavoro
            }
        });
        handles.push(handle);
    }
    drop(tx); // Chiude il canale quando tutti i produttori hanno finito
    // Il consumatore raccoglie i caratteri e li assembla in una stringa
    let mut result = String::new();
    for received in rx {
        result.push(received);
    }
    // Attende la fine di tutti i produttori
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Stringa finale assemblata: {}", result);
}

Questo esempio prende un array di caratteri e crea una stringa random, che viene stampata alla riga 29, attraverso una sequenza di thread che consumano i caratteri iniziali.
Le righe fino alla 7 non dovrebbero proporre difficoltà di alcun genere.
Alla riga 8 iniziamo a consumare l'array mentre alla 9 iniziamo a creare i thread. Ecco proprio la riga 9 ci mostra quel clone() che è necessario al fine di garantire il passaggio di tutti i thread al consumatore. Se non lo mettessimo il primo thread prenderebbe possesso esclusivo di tx. Dalla riga 10 alla 18 i vari thread creati prendono possesso delle lettere e vengono messi in un vec.
Alla 19 fa la sua comparsa drop(tx). Questa istruzione ci assicura che il ciclo for alla riga 22 non resti in attesa perenne di ricevere ulteiori thread. In pratica drop chiude il canale aperto in quanto i cloni di tx escono dallo scope e per loro si applica il drop automatico (ricordiamoci le regole di scope) mentre il tx diciamo origionale è ancora attivo nel main.
La stringa finale sarà, esecuzione dipo esecuzione, diversa, di solito, dalle precedenti. 
Un altro esempio lo prendo così com'è dalla documentazione ufficiale, in questo non è necessario usare drop:

  Esempio 44.6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
     });
     thread::spawn(move || {
     let vals = vec![
     String::from("more"),
     String::from("messages"),
     String::from("for"),
     String::from("you"),
     ];
     for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
        }
    });
    for received in rx {
    println!("Got: {received}");
    }
}

In questo caso creaimo due thread ovvero due trasmettitori che vengono terminati (drop) al termine dei rispettivi ambiti. Quindi anche tx, che entra in gioco alla riga 27,  viene terminato e pertanto non serve effettuare un drop specifico per esso. Se fosse stato clonato avremmo dovuto provvedere manualmente. Per il resto il funzionamento di questo esempio dovrebbe essere chiaro.