# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 # Assicurati che questo codice NON sia salvato in un file chiamato serial.py import serial import threading import queue import time import sys import select # Usato per rendere l'input leggermente meno bloccante (opzionale) class SerialManager: """ Gestisce la comunicazione seriale con ricezione in un thread separato. (Codice della classe SerialManager invariato rispetto alla versione precedente) """ def __init__(self, port, baudrate=9600, timeout=1, encoding='utf-8', errors='ignore'): self.port = port self.baudrate = baudrate self.timeout = timeout self.encoding = encoding self.errors = errors self.ser = None self.receive_thread = None self.data_queue = queue.Queue() self._running = False # Flag per il thread di ricezione self._thread_stop_event = threading.Event() # Evento per fermare il thread interno def _receiver_loop(self): self.ser.timeout = self.timeout # Usa il timeout per non bloccare indefinitamente readline print(f"[Thread Ricezione] Avviato per la porta {self.port}") while not self._thread_stop_event.is_set(): # Controlla l'evento di stop try: # Controlla se ci sono dati senza bloccare troppo a lungo if self.ser.in_waiting > 0: line_bytes = self.ser.readline() if line_bytes: try: line_str = line_bytes.decode(self.encoding, errors=self.errors).strip() if line_str: self.data_queue.put(line_str) # print(f"[DEBUG RX] Messo in coda: {line_str}") # Debug except UnicodeDecodeError as e: print(f"[Thread Ricezione] Errore decodifica: {e} - Dati grezzi: {line_bytes}") else: # Breve pausa se non ci sono dati, per evitare busy-waiting # e per permettere al loop di controllare _thread_stop_event time.sleep(0.05) except serial.SerialException as e: # Errore grave sulla porta (es. disconnessione) print(f"[Thread Ricezione] Errore seriale: {e}") self.data_queue.put(None) # Segnala errore/fine self._thread_stop_event.set() # Ferma il loop break except OSError: # Può capitare su Linux se la porta scompare improvvisamente print(f"[Thread Ricezione] Errore OSError (probabile disconnessione).") self.data_queue.put(None) # Segnala errore/fine self._thread_stop_event.set() # Ferma il loop break except Exception as e: print(f"[Thread Ricezione] Errore inaspettato: {e}") # Decidi se fermare il thread o solo loggare time.sleep(0.1) # Pausa prima di riprovare print(f"[Thread Ricezione] Terminato.") # Assicura che il segnale None sia in coda se non già presente self.data_queue.put(None) def start(self): if self.ser and self.ser.is_open: print("La porta seriale è già aperta.") return True try: print(f"Tentativo di aprire la porta {self.port} a {self.baudrate} baud...") self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout ) print(f"Porta {self.port} aperta con successo.") self._thread_stop_event.clear() # Resetta l'evento di stop self._running = True # Indica che il manager è logicamente attivo self.receive_thread = threading.Thread(target=self._receiver_loop, daemon=True) self.receive_thread.start() return True except serial.SerialException as e: print(f"Errore nell'aprire la porta {self.port}: {e}") self.ser = None self._running = False return False except Exception as e: print(f"Errore generico durante l'avvio: {e}") self.ser = None self._running = False return False def stop(self): print("Tentativo di fermare il gestore seriale...") self._running = False # Indica che non siamo più logicamente attivi self._thread_stop_event.set() # Segnala al thread di ricezione di fermarsi if self.receive_thread and self.receive_thread.is_alive(): print("In attesa della terminazione del thread di ricezione...") self.receive_thread.join(timeout=self.timeout + 1) if self.receive_thread.is_alive(): print("Attenzione: Il thread di ricezione non si è fermato correttamente.") if self.ser and self.ser.is_open: try: print(f"Chiusura della porta {self.port}...") self.ser.close() print(f"Porta {self.port} chiusa.") except serial.SerialException as e: print(f"Errore durante la chiusura della porta {self.port}: {e}") else: pass # Già chiusa o mai aperta self.ser = None self.receive_thread = None # Svuota la coda da eventuali residui (incluso il None di terminazione) while not self.data_queue.empty(): try: self.data_queue.get_nowait() except queue.Empty: break print("Gestore seriale fermato.") def send(self, data): if not self.is_running() or not self.ser or not self.ser.is_open: print("Errore: La porta seriale non è attiva o aperta per l'invio.") return False try: # Assicurati che data sia una stringa, poi codifica if isinstance(data, str): # Aggiungi newline se non presente? Dipende dall'applicazione. # Lo facciamo nel loop di input per chiarezza. encoded_data = data.encode(self.encoding, errors=self.errors) elif isinstance(data, bytes): encoded_data = data # Già bytes else: print(f"Errore: Tipo di dato non supportato per l'invio: {type(data)}") return False self.ser.write(encoded_data) # print(f"[DEBUG TX] Inviato: {encoded_data}") # Debug return True except serial.SerialException as e: print(f"Errore durante l'invio sulla porta {self.port}: {e}") # Errore grave, probabilmente la porta non è più valida self._thread_stop_event.set() # Ferma anche il thread di ricezione self.data_queue.put(None) # Segnala return False except Exception as e: print(f"Errore generico durante l'invio: {e}") return False def get_data(self, block=False, timeout=None): try: data = self.data_queue.get(block=block, timeout=timeout) # Se riceviamo None, significa che il thread si sta fermando o c'è stato un errore if data is None: # Potremmo voler propagare il None per segnalare la fine # Oppure semplicemente non restituire nulla se il thread non è più in esecuzione if not self.is_thread_alive(): return None # Il thread è morto, non ci saranno più dati else: return None # Il thread ha segnalato un problema, ma potrebbe riprovare? Restituiamo None. return data except queue.Empty: return None def is_running(self): """ Controlla se il gestore è stato avviato e non fermato esplicitamente. """ return self._running def is_thread_alive(self): """ Controlla specificamente se il thread di ricezione è attivo. """ return self.receive_thread and self.receive_thread.is_alive() def __enter__(self): if not self.start(): raise RuntimeError(f"Impossibile avviare il gestore seriale sulla porta {self.port}") return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() return False # Non sopprimere eccezioni # --- Funzione per il Thread di Input Console --- def console_input_loop(serial_manager, stop_event_main): """ Legge l'input dalla console in un thread separato. Args: serial_manager (SerialManager): L'istanza del gestore seriale per inviare i dati. stop_event_main (threading.Event): Evento globale per segnalare la terminazione. """ print("\nInserisci i dati da inviare sulla seriale.") print("Digita 'quit' per uscire.") while not stop_event_main.is_set(): try: # Usare select per un timeout leggero su input() - funziona bene su Linux/macOS # Su Windows, select funziona solo sui socket, quindi input() rimarrà bloccante. # Questo è un compromesso; l'alternativa Windows è msvcrt (più complesso). # Impostiamo un timeout per controllare stop_event periodicamente anche su Win. if sys.platform != 'win32': # Attende input su stdin per max 0.2 secondi rlist, _, _ = select.select([sys.stdin], [], [], 0.2) if not rlist: # Nessun input, il timeout è scaduto, controlla stop_event e continua continue # C'è input, leggilo user_input = sys.stdin.readline().strip() else: # Su Windows, input() è bloccante. Non c'è modo semplice cross-platform # di interromperlo a metà. Il thread verrà fermato forzatamente # se l'evento viene settato mentre è bloccato su input(). # Potremmo aggiungere un controllo *prima* di chiamare input. if stop_event_main.is_set(): break # TODO: Per Windows, considerare libreria msvcrt per kbhit() se serve più reattività user_input = input("prompt> ") # Mostra il prompt su Windows # --- Processa l'input ricevuto --- if not stop_event_main.is_set(): # Ricontrolla dopo l'eventuale blocco/attesa if user_input.strip().lower() == 'quit': print("[Thread Input] Comando 'quit' ricevuto. Segnalazione di stop...") stop_event_main.set() # Segnala al loop principale e al thread RX di fermarsi break # Esce dal loop di input elif user_input: # Invia solo se non è una stringa vuota # Aggiungi newline (modifica qui se serve \r\n) data_to_send = user_input + '\r\n' #print(f"[Thread Input] Invio: {data_to_send.strip()}") # Debug if not serial_manager.send(data_to_send): print("[Thread Input] Invio fallito. La connessione potrebbe essere chiusa.") stop_event_main.set() # Segnala errore/terminazione break # Su Linux/macOS, dopo readline() non serve un nuovo prompt. Su Win sì. if sys.platform == 'win32': pass # Il prompt viene mostrato da input() stesso else: print("> ", end='', flush=True) # Ristampa il prompt per Linux/macOS except EOFError: # Gestisce Ctrl+D (Linux/macOS) o Ctrl+Z+Invio (Windows) print("\n[Thread Input] EOF ricevuto. Terminazione...") stop_event_main.set() break except KeyboardInterrupt: # Non dovrebbe accadere se il main gestisce Ctrl+C, ma per sicurezza: print("\n[Thread Input] Interruzione nel thread di input. Terminazione...") stop_event_main.set() break except Exception as e: print(f"\n[Thread Input] Errore inaspettato: {e}") stop_event_main.set() break print("[Thread Input] Terminato.") # --- Esempio di Utilizzo Principale --- if __name__ == "__main__": # --- Scelta Porta Seriale --- if sys.platform.startswith('win'): #default_port = 'COM2' # Cambia con la tua porta COM default_port = None # Cambia con la tua porta COM elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): #default_port = '/dev/ttyACM0' # O /dev/ttyUSB0, ecc. default_port = None else: default_port = None serial_port_name = default_port # Permetti override da riga di comando (opzionale) if len(sys.argv) > 1: serial_port_name = sys.argv[1] print(f"Utilizzo porta da argomento: {serial_port_name}") elif serial_port_name is None: serial_port_name = input("Inserisci il nome della porta seriale (es. COM3 o /dev/ttyUSB0): ") if not serial_port_name: print("Nessuna porta specificata. Uscita.") sys.exit(1) else: print(f"Utilizzo porta di default per {sys.platform}: {serial_port_name}") # --- Inizializzazione --- print("\n--- Avvio Serial Manager con Input Utente ---") # Evento globale per coordinare la terminazione main_stop_event = threading.Event() input_thread = None try: # Il blocco 'with' gestisce start() e stop() di SerialManager with SerialManager(port=serial_port_name, baudrate=9600, timeout=0.1) as sm: print(f"SerialManager avviato su {sm.port}. Ricezione attiva.") # Avvia il thread per l'input da console input_thread = threading.Thread(target=console_input_loop, args=(sm, main_stop_event), daemon=True) # Daemon così non blocca l'uscita input_thread.start() # Stampa il prompt iniziale (solo su piattaforme non Windows dove input() non lo fa) if sys.platform != 'win32': print("> ", end='', flush=True) # --- Loop Principale: Controlla Ricezione e Segnale di Stop --- while not main_stop_event.is_set(): # 1. Controlla dati ricevuti (non bloccante) received_data = sm.get_data(block=False) if received_data is not None: # Stampa i dati ricevuti, andando a capo e ristampando il prompt # \r porta il cursore all'inizio della riga, K cancella la riga prompt = "prompt> " clear_line = '\r\033[K' if sys.platform != 'win32' else '\r'# Su Win \r è sufficiente print(f"{clear_line}RX: {received_data}") print(prompt, end='', flush=True) # Ristampa il prompt elif received_data is None and not sm.is_thread_alive() and sm.is_running(): # Thread RX terminato inaspettatamente (es. errore seriale grave) if not main_stop_event.is_set(): # Controlla se non stiamo già uscendo print("\nAttenzione: Il thread di ricezione seriale si è fermato inaspettatamente.") main_stop_event.set() # Segnala terminazione generale break # 2. Piccola pausa per ridurre uso CPU time.sleep(0.1) # Il loop è terminato (stop_event è stato settato) print("\nUscita dal loop principale richiesta...") except serial.SerialException as e: # Errore nell'apertura iniziale print(f"Errore Seriale Critico (probabilmente all'apertura): {e}") except RuntimeError as e: # Errore sollevato da __enter__ se start() fallisce print(f"Errore durante l'avvio del gestore: {e}") except KeyboardInterrupt: print("\nInterruzione da tastiera (Ctrl+C) ricevuta. Chiusura forzata...") main_stop_event.set() # Segnala ai thread di terminare (se possibile) except Exception as e: print(f"\nErrore inaspettato nel blocco principale: {e}") import traceback traceback.print_exc() main_stop_event.set() # Segnala terminazione in caso di errore grave finally: # --- Pulizia --- print("Avvio pulizia finale...") # Assicura che l'evento sia settato per tutti i thread main_stop_event.set() # Il blocco 'with' chiama sm.stop() automaticamente, che attende il thread RX. # Attendi (brevemente) il thread di input. Essendo daemon, non bloccherà # l'uscita del programma se è bloccato su input(), ma proviamo a dargli # un attimo per uscire se ha ricevuto l'evento. if input_thread and input_thread.is_alive(): # print("In attesa (breve) della terminazione del thread di input...") input_thread.join(timeout=0.5) #if input_thread.is_alive(): # print("Nota: Il thread di input potrebbe essere ancora bloccato su input(). Terminerà con il programma.") print("Programma terminato.")