|
Abbiamo visto nel paragrafo precedente la creazione di thread e la gestione di variabili esterne. In questo ci concetreremo sul passaggio di messaggi tra thread. L'idea è mutuata come specificato sul sito ufficiale, dal linguaggio Go di Google, la presento in lingua originale: “Do not communicate by sharing memory; instead, share memory by communicating.” In pratica abbiamo una comunicazione che avviene tra canali ("channels") similmente ad altri linguaggi, come Go, ovviamente oppure Clojure ma anche C#, per dirne un altro, ha una libreria (System.Threading.Channels ) che implementa una forma di comunicazione tramite i canali (non è l'unico sistema in quel linguaggio). In Rust abbiamo nell'ambito del crate std abbiamo il modulo sync ed in particolare la funzione mpsc::channel che ci permette di creare un nuovo canale. mpsc è un oggetto il cui nome sta per multi-producer, single-consumer ma all'interno di sync ve ne sono altri come mpmc (multi-producer, multi-consumer, attualmente sperimentale) o Mutex. Per il momento ci concentriamo sul primo, mpsc. Prendiamo in esame un esempio di creazione di un canale così come si trova anche sulla documentazione ufficiale, ottima base di partenza: let (tx, rx) = mpsc::channel(); Questa istruzione crea un canale in cui tx è il trasmettitore ed rx il ricevente. Vediamo come si può creare un programma che crea un thread secondario che invia un messaggio, una stringa al main.
Alla riga 5 creiamo dunque il nostro canale. Alla riga 6 creiamo il thread secondario che deve essere corredato ma move al fine di accedere a tx ed rx. Alla riga 7 viene creato un messaggio, una stringa che viene inviata tramite send. Quest'ultimo metodo, send, restituisce un Result il cui output può essere gestito con le solite metodiche., nel caso specifico, proprio come fanno anche nel testo ufficiasle, viene generato un panic s fornte di un eventuale criticità ad esempio potrebbe capitare (non è questo il caso) che il thread ricevente sia stato oggetto di un drop; comunque sia non avrete mai risultati strani ed imprevedibili. Il messaggio viene ricevuto alla riga 10 tramite il metodo recv (receive), anch'esso restituisce un result. Questo metodo, recv, blocca il main in attesa di un messaggio. Vediamo adesso un altro esempio, simile al precedente, dove il main riceve un intero dal thread secondario e lo elabora, usando le solite tecniche, niente di nuovo ma repetita juvant:
Bene, credo sia tutto abbastanza chiaro in questo esempio, non c'è nulla di nuovo o di non visto, abbiamo semplicemente aggiunto, a scopo pratico, un modo per lavorare con i messaggi che arrivano da un trasmettitore. Abbiamo detto che recv() blocca il thread in attesa fino a che non arriva un messaggio. Se per assurdo eliminassimo le righe dalla 5 alla 8, il nostro main rimarrebbe in attesa per sempre o almeno fino a che non interrompiamo manualmente il programma. Questa può essere una situazione sgradevole se non organizzate per bene i messaggi e il loro flusso. Per evitare questo possiamo usare try_recv() che controlla se vi è un messaggio disponibile, invece di bloccare il thread ricevente e restituisce un errore se non arriva nulla. In pratica:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let mut x1: i32 = 0; // Dichiarazione di
x1 al di fuori del match
match rx.recv() {
Ok(valore) => x1 = valore, // Assegna
il valore ricevuto a x1
Err(e) => println!("Errore nel
ricevere il messaggio: {}", e),
}
// Ora x1 è utilizzabile al di fuori del
match
println!("Valore di x1: {}", x1);
let risultato = x1 * 3;
println!("Tre volte x1: {}", risultato);
}
In pratica questo codice si blocca ma se al posto di rx.recv() mettiamo rx.try_recv() allora l'output sarà:
perchè main noin ha ricevuto nulla, con il programma che termina regolarmente. Problema risolto? Usiamo sempre try_recv()? Troppo facile... anche ripristinando il codice che troviamo nell'esempio 44.2 alle righe tra la 5 e la 8 potremmo avere lo stesso output, questo perchè non è detto che il segnale arrivi in tempo prima che il main sia chiuso. Quindi il codice realmente funzionante dovrebbe introdurre un ritardo, ad esempio come vedte qui di seguito, usando sleep: use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let msg: i32 = 44; // Messaggio da inviare tx.send(msg).unwrap(); }); thread::sleep(Duration::from_millis(100)); let mut x1: i32 = 0; // Dichiarazione di x1 al di fuori del match match rx.try_recv() { Ok(valore) => x1 = valore, // Assegna il valore ricevuto a x1 Err(e) => println!("Errore nel ricevere il messaggio: {}", e), } // Ora x1 è utilizzabile al di fuori del match println!("Valore di x1: {}", x1); let risultato = x1 * 3; println!("Tre volte x1: {}", risultato); } Non è molto bello ne elegante ma soprattutto non è funzionale, in scenari complessi sarebbe impossibile gestire tutto. Si può rimediare ad esempio usando combinazioni di Arc e Mutex ma ne parleremo più avanti. Torniamo all'esempio 44.2 è interessante notare che se lo riscriviamo come segue:
aggiungendo l'istruzione di stampa alla riga 8, esso funziona senza problemi ma il messaggio fosse costituito da una stringa allora riceveremmo un errore: error[E0382]: borrow of moved value: `msg` questo a causa del consueto fatto che le stringhe non implementano Copy e quando effettuiamo l'invio di un messaggio la proprietà della stringa viene attribuita a send e quindi al thread ricevente. Una soluzione, come avrete intuito, può essere quella di passare il clone della stringa, ad esempio, nel caso di una stringa:
thread::spawn(move || {
let val = String::from("ciao"); tx.send(val.clone()).unwrap(); // Invia una copia della stringa println!("val is {val}"); // Continua a usare la stringa originale }); Insomma, una situazione da tenere presente. Un altro caso interessante è il passaggio di più messaggi ad un thread ricevente. Anche in questo caso riprendiamo l'esempio che trovate anche nella documentazione ufficiale e che ritengo davvero il più semplice per illustrare questa parte:
creiamo quindi un vec di stringhe ciascun elemento del quale viene passato come parametro da inviare via send tramite il ciclo che inizia alla riga 13. E' interessante notare che per il ricevente rx non è esplicitato il metodo recv(). Questo perchè rx implementa IntoIterator e quindi può essere usato come un iteratore. Dentro un for in questi casi non è possibile usare recv(), ovvero riscrivere la riga 18 come rx.recv in quanto, dal momento che iterando già viene richiamato recv() è next che richiama recv in quanto internamente abbiamo questa implementazione per l'iteratore: impl<T> Iterator for Receiver<T> { type Item = T; fn next(&mut self) -> Option<T> { self.recv().ok() } } e quindi esplicitarlo vorrebbe dire attendere un secondo messaggio che non c'è. Se si vuole proprio usare recv si può scrivere invece del for: while let Ok(received) = rx.recv() { println!("Got: {}", received); il ciclo evidentemente è alimentato da Ok(received). Ora, dopo aver visto come trattare più messaggi dobbiamo vedere come gestire più "produttori". Vediamo un esempio e poi lo commentiamo:
Questo esempio prende un array di caratteri e crea una stringa random, che viene stampata alla riga 29, attraverso una sequenza di thread che consumano i caratteri iniziali. Le righe fino alla 7 non dovrebbero proporre difficoltà di alcun genere. Alla riga 8 iniziamo a consumare l'array mentre alla 9 iniziamo a creare i thread. Ecco proprio la riga 9 ci mostra quel clone() che è necessario al fine di garantire il passaggio di tutti i thread al consumatore. Se non lo mettessimo il primo thread prenderebbe possesso esclusivo di tx. Dalla riga 10 alla 18 i vari thread creati prendono possesso delle lettere e vengono messi in un vec. Alla 19 fa la sua comparsa drop(tx). Questa istruzione ci assicura che il ciclo for alla riga 22 non resti in attesa perenne di ricevere ulteiori thread. In pratica drop chiude il canale aperto in quanto i cloni di tx escono dallo scope e per loro si applica il drop automatico (ricordiamoci le regole di scope) mentre il tx diciamo origionale è ancora attivo nel main. La stringa finale sarà, esecuzione dipo esecuzione, diversa, di solito, dalle precedenti. Un altro esempio lo prendo così com'è dalla documentazione ufficiale, in questo non è necessario usare drop:
In questo caso creaimo due thread ovvero due trasmettitori che vengono terminati (drop) al termine dei rispettivi ambiti. Quindi anche tx, che entra in gioco alla riga 27, viene terminato e pertanto non serve effettuare un drop specifico per esso. Se fosse stato clonato avremmo dovuto provvedere manualmente. Per il resto il funzionamento di questo esempio dovrebbe essere chiaro. |