# L'eseguibile si ottiene con pyinstaller --onefile --clean --noconsole server_UDP_crypto_protocollo_TLV_csharp.py # compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import argparse import socket import select import logging import threading import sys import errno import time import os import queue # Added for inter-thread communication from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes, serialization from cryptography.exceptions import InvalidSignature from cryptography.hazmat.backends import default_backend import signal # --- I/O Redirection and TCP Server Components --- class OutputQueueHandler(logging.Handler): """A logging handler that puts formatted records onto a queue.""" def __init__(self, output_queue): super().__init__() self.output_queue = output_queue def emit(self, record): try: msg = self.format(record) self.output_queue.put(msg) except Exception: self.handleError(record) class StdoutQueueRedirector: """A file-like object that redirects stdout writes to a queue.""" def __init__(self, output_queue): self.output_queue = output_queue self.original_stdout = sys.stdout # Keep a reference if needed def write(self, text): # Avoid infinite loops if queue.put causes output if text and not text.isspace(): # Don't queue empty lines or just whitespace self.output_queue.put(text.rstrip()) # Remove trailing newline often added by print def flush(self): # No-op needed for file-like interface pass def isatty(self): # Pretend it's not a terminal return False class TCPServer(threading.Thread): """Non-blocking TCP server to handle control and I/O.""" def __init__(self, host, port, output_queue, shutdown_event, command_callback): super().__init__(daemon=True, name="TCPServerThread") self.host = host self.port = port self.output_queue = output_queue self.shutdown_event = shutdown_event self.command_callback = command_callback # Callback for received commands self.server_socket = None self.client_socket = None self.client_address = None self._client_lock = threading.Lock() self._running = threading.Event() # Internal running state for TCP server self.logger = logging.getLogger('TCPServer') # Use the main logger setup self.on_init = None # Callback per connessione con server TCP self.connected = False def _log(self, level, message): # Helper to ensure TCP server logs go through the queue system self.logger.log(level, message) def run(self): """Main loop to listen for connections and handle I/O.""" self._running.set() try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Allow reusing address quickly after server restart self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(1) self.server_socket.settimeout(1.0) # Timeout for accept self._log(logging.INFO, f"TCP Control Server listening on {self.host}:{self.port}") # Start the output sender thread output_thread = threading.Thread(target=self._send_output_loop, daemon=True, name="TCPOutputSender") output_thread.start() while self._running.is_set() and not self.shutdown_event.is_set(): try: conn, addr = self.server_socket.accept() with self._client_lock: if self.client_socket: self._log(logging.WARNING, f"Closing existing TCP connection from {self.client_address} to accept new one from {addr}") try: self.client_socket.sendall(b"Server: Closing connection due to new client.\r\n") self.client_socket.close() except socket.error: pass # Ignore errors on closing old socket self.client_socket = conn self.client_address = addr self._log(logging.INFO, f"TCP Client connected from {addr}") self.client_socket.sendall(b"Server: Connection established. Send 'exit' to shutdown.\r\n") # Start a thread to handle receiving commands from this client recv_thread = threading.Thread(target=self._handle_client_recv, args=(conn, addr), daemon=True, name=f"TCPClientRecv-{addr}") recv_thread.start() self.on_init("connesso") except socket.timeout: continue # Just loop again to check running/shutdown flags except socket.error as e: if self._running.is_set(): # Avoid logging errors during shutdown self._log(logging.ERROR, f"TCP Socket accept error: {e}") break # Exit loop on major socket error except Exception as e: if self._running.is_set(): self._log(logging.ERROR, f"Unexpected error in TCP accept loop: {e}") break except socket.error as e: self._log(logging.ERROR, f"TCP Server socket error on setup: {e}") except Exception as e: self._log(logging.ERROR, f"Unexpected error starting TCP Server: {e}") finally: self._log(logging.INFO, "TCP Server shutting down.") self._running.clear() # Signal internal loops to stop with self._client_lock: if self.client_socket: try: self.client_socket.close() except socket.error: pass self.client_socket = None if self.server_socket: self.server_socket.close() self.output_queue.put(None) # Signal output thread to terminate output_thread.join(timeout=2) # Wait briefly for output thread def _handle_client_recv(self, client_socket, client_address): """Handles receiving data from a connected TCP client.""" try: client_socket.settimeout(2.0) # Check periodically buffer = bytearray() while self._running.is_set() and not self.shutdown_event.is_set(): # Check if this is still the active client with self._client_lock: if self.client_socket != client_socket: self._log(logging.INFO, f"TCP recv thread for {client_address} exiting (client replaced).") break # Exit if a new client has connected # Check if socket is still readable ready_to_read, _, _ = select.select([client_socket], [], [], 0.5) # Short poll if ready_to_read: try: data = client_socket.recv(1024) if not data: self._log(logging.INFO, f"TCP Client {client_address} disconnected.") break # Connection closed by client buffer.extend(data) # Process lines (commands are line-based) while b'\r\n' in buffer: line, buffer = buffer.split(b'\r\n', 1) command = line.decode('utf-8', errors='ignore').strip() if command: self._log(logging.DEBUG, f"TCP Received command: '{command}' from {client_address}") self.command_callback(command) # Execute command callback except socket.timeout: continue # No data received, loop again except (socket.error, ConnectionResetError) as e: self._log(logging.WARNING, f"TCP Socket error receiving from {client_address}: {e}") break except Exception as e: self._log(logging.ERROR, f"Unexpected error receiving from TCP client {client_address}: {e}") break elif self.shutdown_event.is_set(): break # Exit if main shutdown is signaled finally: with self._client_lock: # Only close if this is still the current client if self.client_socket == client_socket: self._log(logging.INFO, f"Closing TCP connection for {client_address}") try: client_socket.close() except socket.error: pass self.client_socket = None self.client_address = None def _send_output_loop(self): """Continuously sends messages from the output queue to the client.""" while True: try: message = self.output_queue.get() # Blocks until item is available if message is None: # Shutdown signal self._log(logging.DEBUG, "TCP Output sender received shutdown signal.") break with self._client_lock: if self.client_socket: try: # Ensure message ends with newline for client display if isinstance(message, str): message = message.encode('utf-8') if not message.endswith(b'\r\n'): message += b'\r\n' self.client_socket.sendall(message) except (socket.error, BrokenPipeError, ConnectionResetError) as e: self._log(logging.WARNING, f"TCP Failed to send to {self.client_address}: {e}. Client likely disconnected.") # Don't close socket here, let recv thread handle it except Exception as e: self._log(logging.ERROR, f"Unexpected error sending TCP message: {e}") # If no client connected, message is discarded (or could be buffered) self.output_queue.task_done() # Mark task as done except Exception as e: # Log errors in the queue processing itself self._log(logging.ERROR, f"Error in TCP output loop: {e}") # Avoid spinning on error time.sleep(0.1) self._log(logging.INFO,"TCP Output sender thread finished.") def stop(self): """Stops the TCP server.""" self._log(logging.INFO, "Stopping TCP Server...") self._running.clear() # Signal loops to stop # No need to explicitly close sockets here, finally block in run() handles it # --- Modified UDPServer Class --- class UDPServer: # (Keep most of the existing __init__ and methods) def __init__(self, host, port, output_queue, shutdown_event, buffer_size=1024, timeout=5, fragment_timeout=10, max_fragment_size=486): """Inizializza.""" self.host = host self.port = port self.buffer_size = buffer_size self.socket_timeout = timeout self.fragment_timeout = fragment_timeout self.max_fragment_size = max_fragment_size self.udp_socket = None self.is_active = False # --- MODIFIED --- # self._stop_event = threading.Event() # Use the shared shutdown_event self.main_shutdown_event = shutdown_event # Reference the main shutdown event self.output_queue = output_queue # Store the output queue # --- END MODIFIED --- self.receiver_thread = None self.on_message = None self.logger = self._setup_logger() # Logger setup now uses the queue self._socket_lock = threading.Lock() self.client_message_fragment_buffers = {} self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096) self.public_key = self.private_key.public_key() self.public_key_pem = self.public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) self.client_public_keys = {} def _setup_logger(self): """Configura il logger per inviare output alla coda.""" logger = logging.getLogger() # Get root logger or specific one # Prevent adding handlers multiple times if called again if not hasattr(logger, '_has_udp_tcp_handlers'): logger.setLevel(logging.DEBUG) # Remove existing handlers (like default StreamHandler) if necessary for handler in logger.handlers[:]: logger.removeHandler(handler) # Add the queue handler queue_handler = OutputQueueHandler(self.output_queue) queue_handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s') queue_handler.setFormatter(formatter) logger.addHandler(queue_handler) logger._has_udp_tcp_handlers = True # Mark logger as configured return logger def start(self): """Avvia il server UDP.""" if self.is_active: self.logger.warning("Server UDP già attivo.") return try: with self._socket_lock: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # self.udp_socket.settimeout(self.socket_timeout) # Use select timeout instead self.udp_socket.bind((self.host, self.port)) self.udp_socket.setblocking(False) # Make socket non-blocking for select self.is_active = True # self._stop_event.clear() # Not needed, use main_shutdown_event self.receiver_thread = threading.Thread(target=self._receive_loop, daemon=True, name="UDPReceiverThread") self.receiver_thread.start() self.logger.info(f"Server UDP avviato su {self.host}:{self.port}") except socket.error as e: self._handle_error(f"Errore socket durante l'avvio UDP: {e}") self.close() except Exception as e: self._handle_error(f"Errore generico durante l'avvio UDP: {e}") self.close() def _receive_loop(self): """Loop principale di ricezione UDP usando select.""" try: # while not self._stop_event.is_set(): # Check main shutdown event while not self.main_shutdown_event.is_set(): with self._socket_lock: if not self.udp_socket: break # Use select for non-blocking check with timeout ready_to_read, _, _ = select.select([self.udp_socket], [], [], self.socket_timeout) if ready_to_read: try: with self._socket_lock: if self.udp_socket is None: break # Ready to read, so recvfrom should not block significantly data, client_address = self.udp_socket.recvfrom(self.buffer_size) if not data: self.logger.warning("Ricevuto pacchetto UDP vuoto.") continue # --- Process data (no change needed here) --- self._process_received_data(client_address, data) # --- End process data --- except socket.error as e: # EAGAIN/EWOULDBLOCK are expected on non-blocking sockets when no data if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK: continue # Normal case, just loop again elif e.errno == errno.ECONNREFUSED: self.logger.warning(f"UDP Connection refused (ICMP response): {e}") else: # Check if stopping before logging error if not self.main_shutdown_event.is_set(): self._handle_error(f"Errore socket UDP: {e}") except Exception as e: if not self.main_shutdown_event.is_set(): self._handle_error(f"Errore inatteso in _receive_loop UDP: {e}") break # Exit loop on unexpected error else: # Timeout in select, check fragment buffers self._cleanup_fragment_buffers() except Exception as ex: if not self.main_shutdown_event.is_set(): self._handle_error(f"Errore critico nel thread di ricezione UDP: {ex}") finally: self.logger.info("Thread ricezione UDP terminato.") self.is_active = False # Mark as inactive when thread ends # --- NO CHANGES needed in the following methods (fragment handling, TLV processing, crypto) --- # _cleanup_fragment_buffers, _reset_fragment_timer, _timeout_fragment_buffer, # _process_received_data, _process_fragment, _process_complete_message, # _decrypt_message, send_tlv # --- Modified methods below --- def _process_complete_message(self, client_address, data): """Processa un messaggio completo (dopo eventuale decrittazione e riassemblaggio).""" try: type = data[0:1] length = int.from_bytes(data[1:3], byteorder='big') value = data[3:3 + length] if type == b'M': message = value.decode('utf-8', errors='replace') self.logger.info(f"Messaggio UDP da {client_address}: {message[:50]}...") # Use logger instead of print, it will go to the queue self.logger.info(f"Messaggio ASCII ricevuto da {client_address}: {message}") if self.on_message: # Callback might still print, which is fine (will be redirected) self.on_message(message, client_address) self.send_tlv(client_address, b'A', b"ACK") # Invia ACK elif type == b'A': self.logger.debug(f"Ricevuto ACK UDP da {client_address}") pass else: self.logger.warning(f"Tipo TLV UDP sconosciuto da {client_address}: {type}") except (ValueError, IndexError, UnicodeDecodeError) as e: self.logger.error(f"Errore nell'estrazione o decodifica TLV UDP da {client_address}: {e}") except Exception as e: self.logger.error(f"Errore imprevisto in _process_complete_message UDP da {client_address}: {e}") def close(self): """Chiude il server UDP (called by main shutdown).""" if not self.is_active: return self.logger.info("Arresto Server UDP...") self.is_active = False # Set inactive flag early # self._stop_event.set() # Now handled by main_shutdown_event setting try: with self._socket_lock: if self.udp_socket: # Make it non-blocking to avoid close issues? Usually not needed. self.udp_socket.close() self.udp_socket = None # Join the thread after closing the socket if self.receiver_thread and self.receiver_thread.is_alive(): self.logger.debug("Attesa terminazione thread ricezione UDP...") self.receiver_thread.join(timeout=self.socket_timeout + 1) # Wait slightly longer than timeout if self.receiver_thread.is_alive(): self.logger.warning("Thread ricezione UDP non terminato in tempo.") except socket.error as e: self._handle_error(f"Errore socket in chiusura UDP: {e}") except Exception as e: self._handle_error(f"Errore in chiusura UDP: {e}") finally: self.logger.info("Server UDP arrestato.") def send_tlv(self, client_address, type, value): """Invia un messaggio TLV al client UDP.""" # --- Minor modification: Check is_active first --- if not self.is_active or self.main_shutdown_event.is_set(): # self._handle_error("Server UDP non attivo o in arresto.") # Reduce noise during shutdown return # --- Rest of the method is unchanged --- if isinstance(value, str): value = value.encode('utf-8') length = len(value) length_bytes = length.to_bytes(2, byteorder='big') tlv_message = type + length_bytes + value # --- Crypto logic unchanged --- if type != b'K' and type != b'R': if client_address in self.client_public_keys: try: encrypted_message = self.client_public_keys[client_address].encrypt( tlv_message, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) message_to_send = encrypted_message except ValueError as e: self.logger.error(f"Errore durante la crittografia UDP per {client_address}: {e}") return else: self.logger.warning(f"Chiave pubblica non trovata per {client_address}. Invio UDP non criptato.") message_to_send = tlv_message else: message_to_send = tlv_message # --- End Crypto logic --- try: with self._socket_lock: if self.udp_socket: self.udp_socket.sendto(message_to_send, client_address) # Reduce log noise for ACKs, increase for Keys/Messages if type == b'K' or type == b'M': self.logger.debug(f"UDP Inviato (Tipo: {type.decode()}) a {client_address}: {message_to_send[:50]}...") elif type == b'A': self.logger.debug(f"UDP Inviato ACK a {client_address}") else: self.logger.debug(f"UDP Inviato (Tipo: {type.decode()}) a {client_address}") # else: # Removed redundant check, handled by is_active # self._handle_error("Socket UDP non inizializzato.") except (socket.error, ValueError) as e: # Don't log errors excessively if shutting down if not self.main_shutdown_event.is_set(): self._handle_error(f"Errore invio UDP a {client_address}: {e}") except Exception as e: if not self.main_shutdown_event.is_set(): self._handle_error(f"Errore generico invio UDP a {client_address}: {e}") def _handle_error(self, message): """Gestisce gli errori (ora logga tramite il sistema logger->queue).""" self.logger.error(message) def stop(self): """Ferma il server UDP (metodo pubblico chiamato da main).""" # This method might not be strictly needed if close() is called during shutdown # but it's good practice to have it. self.logger.info("Chiamata a stop() per UDPServer") # Signal shutdown via the main event if not already set if not self.main_shutdown_event.is_set(): self.main_shutdown_event.set() # Call close to handle socket closure and thread joining self.close() # --- Unchanged fragment/crypto methods --- def _cleanup_fragment_buffers(self): """Pulisce i buffer scaduti.""" clients_to_cleanup = [] current_time = time.time() # Need to iterate safely if timers modify the dict # A simple way is to check time explicitly if timers aren't reliable across threads/shutdown frag_copy = dict(self.client_message_fragment_buffers) for client_address, message_buffers in frag_copy.items(): message_ids_to_cleanup = [] msg_buf_copy = dict(message_buffers) for message_id, buffer_info in msg_buf_copy.items(): # Check if timer exists and has finished OR explicitly check timestamp timer_finished = ('timer' in buffer_info and buffer_info['timer'] and not buffer_info['timer'].is_alive()) # Add a timestamp check as fallback last_update_time = buffer_info.get('last_update', 0) timeout_occurred = (current_time - last_update_time > self.fragment_timeout) if timer_finished or timeout_occurred: if client_address in self.client_message_fragment_buffers and message_id in self.client_message_fragment_buffers[client_address]: self.logger.debug(f"Pulizia buffer frammenti scaduto msg_id {message_id} da {client_address}.") message_ids_to_cleanup.append(message_id) # Cancel timer just in case if 'timer' in buffer_info and buffer_info['timer']: buffer_info['timer'].cancel() # Safely remove cleaned up message IDs if message_ids_to_cleanup and client_address in self.client_message_fragment_buffers: for message_id in message_ids_to_cleanup: if message_id in self.client_message_fragment_buffers[client_address]: del self.client_message_fragment_buffers[client_address][message_id] # If client buffer is now empty, mark client for cleanup if not self.client_message_fragment_buffers[client_address]: clients_to_cleanup.append(client_address) # Safely remove cleaned up clients for client_address in clients_to_cleanup: if client_address in self.client_message_fragment_buffers: del self.client_message_fragment_buffers[client_address] def _reset_fragment_timer(self, client_address, message_id): """Resetta il timer e aggiorna timestamp.""" if client_address in self.client_message_fragment_buffers and message_id in self.client_message_fragment_buffers[client_address]: buffer_info = self.client_message_fragment_buffers[client_address][message_id] # Cancel existing timer if any if 'timer' in buffer_info and buffer_info['timer'] and buffer_info['timer'].is_alive(): buffer_info['timer'].cancel() # Update timestamp buffer_info['last_update'] = time.time() # Start new timer timer = threading.Timer(self.fragment_timeout, self._timeout_fragment_buffer_callback, args=[client_address, message_id]) buffer_info['timer'] = timer timer.start() else: self.logger.warning(f"Tentativo di resettare timer per buffer inesistente: {client_address} / {message_id}") def _timeout_fragment_buffer_callback(self, client_address, message_id): """Callback eseguito dal timer quando scade.""" # This runs in the Timer's thread. It's safer to just log and let the main loop cleanup. self.logger.warning(f"Timeout frammenti (callback timer) per msg_id {message_id} da {client_address}.") # The main loop's _cleanup_fragment_buffers will handle the actual removal based on timer state or timestamp. def _process_received_data(self, client_address, data): """Processa i dati grezzi ricevuti, gestendo correttamente l'header.""" self.logger.debug(f"Ricevuto dati UDP da {client_address}: {len(data)} bytes") # --- PASSO 1: Controllo Header e Estrazione Payload --- if len(data) < 7: self.logger.warning(f"Dati UDP troppo corti da {client_address} ({len(data)} bytes), attesi almeno 7 per header.") return # Estrai sempre header e payload msg_id = int.from_bytes(data[:4], byteorder='big') frag_num = data[4] # Nota: frag_num è 1-based inviato dal client total_frags = int.from_bytes(data[5:7], byteorder='big') payload = data[7:] # Estrai SEMPRE il payload dopo l'header self.logger.debug(f"Header UDP: msg_id={msg_id}, frag={frag_num}, total={total_frags}. Payload len={len(payload)}") # --- PASSO 2: Gestione Frammentazione --- if total_frags > 1: # È un frammento di un messaggio più grande # Controlli di sanità per frammenti if msg_id == 0: self.logger.warning(f"Ricevuto frammento (total={total_frags}) ma con msg_id=0 da {client_address}. Scartato.") return if frag_num == 0 or frag_num > total_frags: self.logger.warning(f"Numero frammento non valido ({frag_num}/{total_frags}) per msg_id={msg_id} da {client_address}. Scartato.") return # Processa il payload del frammento (che è già stato estratto) self._process_fragment(client_address, msg_id, frag_num, total_frags, payload) return # Gestito come frammento, finito qui per questo pacchetto # --- PASSO 3: Gestione Messaggio Completo (non frammentato o total_frags=1) --- # A questo punto, 'payload' contiene l'intero messaggio TLV ricevuto # (potenzialmente criptato), senza l'header di 7 byte. if not payload: self.logger.warning(f"Ricevuto header valido ma payload vuoto da {client_address}. msg_id={msg_id}") return # --- PASSO 3a: Gestione Tipi Speciali K/R (PRIMA della decifratura) --- # Guarda il tipo all'inizio del payload. K e R non dovrebbero essere criptati. try: type_peek = payload[0:1] except IndexError: self.logger.warning(f"Payload ricevuto da {client_address} troppo corto per contenere un tipo TLV ({len(payload)} bytes).") return if type_peek == b'K': # Gestisci chiave pubblica ricevuta (presumibilmente non criptata) try: # Estrai TLV completo dal payload if len(payload) < 3: raise ValueError("Payload troppo corto per TLV K") key_length = int.from_bytes(payload[1:3], byteorder='big') expected_payload_len = 3 + key_length if expected_payload_len > len(payload): self.logger.warning(f"Lunghezza chiave pubblica TLV K ({key_length}) non valida per payload ({len(payload)}) da {client_address}") return key_value = payload[3:expected_payload_len] # Verifica se ci sono dati extra nel payload if len(payload) > expected_payload_len: self.logger.warning(f"Dati extra ({len(payload) - expected_payload_len} bytes) dopo TLV K nel payload da {client_address}") public_key = serialization.load_pem_public_key(key_value, backend=default_backend()) self.client_public_keys[client_address] = public_key self.logger.info(f"Chiave pubblica UDP tipo 'K' ricevuta e caricata da {client_address}.") # Potresti inviare un ACK per la chiave se necessario: # self.send_tlv(client_address, b'A', b"ACK_KEY") return # Chiave processata, finito. except ValueError as e: self._handle_error(f"Errore nella lettura/caricamento della chiave pubblica K da {client_address}: {e}") return except Exception as e: self._handle_error(f"Errore generico processamento chiave K da {client_address}: {e}") return elif type_peek == b'R': # Gestisci richiesta chiave (presumibilmente non criptata) try: # Estrai TLV completo dal payload if len(payload) < 3: raise ValueError("Payload troppo corto per TLV R") req_length = int.from_bytes(payload[1:3], byteorder='big') expected_payload_len = 3 + req_length if expected_payload_len > len(payload): self.logger.warning(f"Lunghezza richiesta R TLV ({req_length}) non valida per payload ({len(payload)}) da {client_address}") return req_value = payload[3:expected_payload_len] # Verifica dati extra if len(payload) > expected_payload_len: self.logger.warning(f"Dati extra ({len(payload) - expected_payload_len} bytes) dopo TLV R nel payload da {client_address}") if req_value == b"REQ_PUB_KEY": self.logger.info(f"Richiesta chiave pubblica UDP tipo 'R' ricevuta da {client_address}, invio...") # Invia la nostra chiave pubblica self.send_tlv(client_address, b'K', self.public_key_pem) return # Richiesta processata, finito. else: self.logger.warning(f"Richiesta UDP tipo 'R' non riconosciuta da {client_address}: {req_value}") return except Exception as e: self._handle_error(f"Errore processamento richiesta R da {client_address}: {e}") return # --- PASSO 3b: Decifratura (se non K/R) --- # Se non era K o R, il payload dovrebbe contenere il messaggio criptato (es. tipo 'M') # Prova a decifrare l'INTERO payload. decrypted_payload = self._decrypt_message(client_address, payload) if decrypted_payload is None: # Errore durante decifratura o dati scartati perché non criptati quando atteso. # _decrypt_message ha già loggato l'errore specifico. self.logger.warning(f"Decifratura fallita o dati payload scartati per {client_address}. Impossibile processare.") return # --- PASSO 3c: Processa Messaggio Completo Decifrato --- # Ora 'decrypted_payload' contiene il TLV in chiaro (es. tipo 'M') self._process_complete_message(client_address, decrypted_payload) # NEL FILE: server_UDP_TCP_crypto_protocollo_TLV.py # SOSTITUISCI il metodo _process_fragment esistente con questo: def _process_fragment(self, client_address, message_id, fragment_number, total_fragments, payload_criptato): """Gestisce un frammento UDP (criptato).""" if client_address not in self.client_message_fragment_buffers: self.client_message_fragment_buffers[client_address] = {} # Inizializza buffer per questo message_id se è il primo frammento if message_id not in self.client_message_fragment_buffers[client_address]: if len(self.client_message_fragment_buffers[client_address]) > 50: # Limite messaggi concorrenti self.logger.warning(f"Troppi messaggi frammentati pendenti da {client_address}. Scartando msg_id {message_id}.") return self.client_message_fragment_buffers[client_address][message_id] = { 'fragments': {}, # Qui metteremo i frammenti DECRITTATI 'total_fragments': total_fragments, 'timer': None, 'last_update': time.time() } self.logger.debug(f"Inizio ricezione frammenti per msg_id {message_id} ({total_fragments} total) da {client_address}") message_buffer = self.client_message_fragment_buffers[client_address][message_id] # --- Decifra il payload del frammento ricevuto --- decrypted_fragment_payload = self._decrypt_message(client_address, payload_criptato) if decrypted_fragment_payload is None: # Errore di decifratura per questo frammento self.logger.error(f"Errore decrittazione frammento UDP {fragment_number}/{total_fragments} di {message_id} da {client_address}. Frammento scartato.") # Potremmo voler invalidare l'intero messaggio qui? Per ora no. # Aggiorniamo comunque il timestamp per evitare timeout prematuro se altri arrivano message_buffer['last_update'] = time.time() self._reset_fragment_timer(client_address, message_id) return # Non aggiungere il frammento fallito al buffer # --- Memorizza il frammento *decrittato* --- if fragment_number in message_buffer['fragments']: self.logger.debug(f"Ricevuto frammento duplicato UDP #{fragment_number} per msg_id {message_id} da {client_address}. Sovrascrivo.") message_buffer['fragments'][fragment_number] = decrypted_fragment_payload # Salva i dati decifrati message_buffer['last_update'] = time.time() # Aggiorna timestamp received_count = len(message_buffer['fragments']) self.logger.debug(f"Ricevuto e decifrato frammento UDP #{fragment_number}/{total_fragments} per msg_id {message_id} da {client_address} ({received_count}/{total_fragments} ricevuti)") # --- Controlla se il messaggio è completo --- if received_count == total_fragments: self.logger.info(f"Tutti {total_fragments} frammenti ricevuti e decifrati per msg_id {message_id} da {client_address}. Riassemblo...") # Cancella il timer associato if 'timer' in message_buffer and message_buffer['timer'] and message_buffer['timer'].is_alive(): message_buffer['timer'].cancel() # Riassembla i frammenti DECRITTATI in ordine reassembled_plaintext = bytearray() reassembly_ok = True for i in range(1, total_fragments + 1): if i not in message_buffer['fragments']: self.logger.error(f"ERRORE RIASSEMBLAGGIO: Frammento decifrato #{i} mancante per msg_id {message_id} da {client_address}! Messaggio scartato.") reassembly_ok = False break reassembled_plaintext.extend(message_buffer['fragments'][i]) # Pulisci il buffer per questo message_id ANCHE SE fallito del self.client_message_fragment_buffers[client_address][message_id] if not self.client_message_fragment_buffers[client_address]: # Rimuovi client se non ha più messaggi del self.client_message_fragment_buffers[client_address] # Processa il messaggio completo riassemblato se OK if reassembly_ok: self.logger.debug(f"Riassemblaggio msg_id {message_id} completato ({len(reassembled_plaintext)} bytes plaintext). Processo TLV...") # Ora passa il TLV completo riassemblato (in chiaro) alla funzione di processamento self._process_complete_message(client_address, bytes(reassembled_plaintext)) # else: Fallimento riassemblaggio già loggato else: # Non ancora completo, resetta il timer di inattività self._reset_fragment_timer(client_address, message_id) def _decrypt_message(self, client_address, message_bytes): """Decripta il messaggio/frammento UDP. Restituisce bytes decriptati o originali se non criptato/chiave mancante, None su errore grave.""" # Check if we have the key for this client client_pub_key = self.client_public_keys.get(client_address) # Try decryption ONLY if key exists. Assume messages are encrypted if key exists. if client_pub_key: try: # Use OUR private key to decrypt messages encrypted with client's public key? NO! # Client encrypts with *SERVER's* public key. Server decrypts with *SERVER's* private key. decrypted = self.private_key.decrypt( message_bytes, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) self.logger.debug(f"Dati decriptati con successo da {client_address} ({len(message_bytes)} -> {len(decrypted)} bytes)") return decrypted except ValueError: # This is the most common error if data is not encrypted or wrong key used self.logger.warning(f"Decriptazione fallita (ValueError) da {client_address}. Dati potrebbero non essere criptati o corrotti.") # What to do? Option 1: Return original bytes (assume not encrypted) # Option 2: Return None (strict - discard if decryption fails) # Let's be strict for encrypted comms: return None. return None except Exception as e: self.logger.error(f"Errore imprevisto durante decriptazione UDP da {client_address}: {e}") return None # Return None on unexpected decryption errors else: # No public key known for this client, assume message is not encrypted self.logger.debug(f"Nessuna chiave pubblica per {client_address}. Processo dati come non criptati.") return message_bytes # Return original bytes # --- Main Application Logic --- # Global variables (use with care, better to pass as args) udp_server = None tcp_server = None original_stdout = sys.stdout main_shutdown = threading.Event() # Single event for coordinating shutdown output_queue = queue.Queue() # Queue for all output def process_tcp_command(command): """Callback function to handle commands received via TCP.""" global main_shutdown, udp_server # Allow modification logger = logging.getLogger('MainApp') # Get logger instance logger.info(f"Comando TCP ricevuto: '{command}'") if command.lower() == "exit": logger.info("Comando 'exit' ricevuto. Avvio arresto...") main_shutdown.set() # Signal all threads to stop elif command.lower() == "status": logger.info("--- STATUS RICHIESTO ---") if udp_server: logger.info(f"UDP Server Active: {udp_server.is_active}") logger.info(f"Client connessi (chiavi note): {list(udp_server.client_public_keys.keys())}") logger.info(f"Buffer frammenti attivi: {len(udp_server.client_message_fragment_buffers)}") if tcp_server: logger.info(f"TCP Server Active: {tcp_server._running.is_set()}") with tcp_server._client_lock: if tcp_server.client_socket: logger.info(f"Client TCP connesso: {tcp_server.client_address}") else: logger.info("Nessun client TCP connesso.") logger.info("-----------------------") else: logger.warning(f"Comando TCP '{command}' non riconosciuto.") def udp_message_callback(messaggio, indirizzo): """Callback per messaggi UDP ricevuti (solo log).""" # Log message will be handled by the logger's queue handler # No need to explicitly put on queue here. # Keep this simple or remove if only logging is needed. logging.info(f"CALLBACK UDP: Messaggio '{messaggio[:3000]}...' da {indirizzo}") # Example: Could trigger other actions based on message content here def signal_handler(sig, frame): """Gestisce SIGINT (Ctrl+C) per arresto pulito.""" print("\r\nRicevuto SIGINT (Ctrl+C). Arresto in corso...", file=original_stdout) # Print directly to original stdout logging.warning("SIGINT Ricevuto! Avvio arresto...") main_shutdown.set() # Signal shutdown def tcp_init_callback(message): """Callback per connessione al server TCP""" tcp_server.connected = True pass # --- MAIN Execution --- if __name__ == "__main__": udp_host = "0.0.0.0" # Ascolta su tutte le interfacce tcp_host = "localhost" # TCP solo per controllo locale fragment_timeout_sec = 10 # Increased timeout parser = argparse.ArgumentParser(description="Client UDP con protocollo TLV, crittografia RSA e controllo TCP.") parser.add_argument( '--udp-server-port', type=int, default=5001, # Mantieni i default originali help="Porta del server UDP remoto (default: 5001)" ) parser.add_argument( '--tcp-control-port', type=int, default=5002, # Mantieni i default originali help="Porta locale su cui ascoltare per le connessioni TCP di controllo (default: 5002)" ) args = parser.parse_args() udp_port = args.udp_server_port tcp_port = args.tcp_control_port # 1. Setup shared shutdown event main_shutdown = threading.Event() # 2. Setup output queue output_queue = queue.Queue() # 3. Redirect stdout stdout_redirector = StdoutQueueRedirector(output_queue) sys.stdout = stdout_redirector # 4. Configure root logger to use the queue # (UDPServer's _setup_logger will handle this now) # 5. Instantiate UDP Server (passing queue and event) # Logger setup happens inside UDPServer's __init__ now udp_server = UDPServer(udp_host, udp_port, output_queue, main_shutdown, fragment_timeout=fragment_timeout_sec) udp_server.on_message = udp_message_callback # Set UDP message callback # 6. Instantiate TCP Server (passing queue, event, and command handler) tcp_server = TCPServer(tcp_host, tcp_port, output_queue, main_shutdown, process_tcp_command) tcp_server.on_init = tcp_init_callback # Imposta callback per connessione con il server # 7. Setup Signal Handler signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Handle termination signal too # Get the main logger instance AFTER setup logger = logging.getLogger('MainApp') # Logger for main part logger.info("--- Avvio Applicazione Server ---") # 8. Start Servers try: tcp_server.start() # Start the TCP server thread while True: if tcp_server.connected == True: break time.sleep(1) udp_server.start() # 9. Main loop - Wait for shutdown signal logger.info("Server pronto. In attesa di connessioni UDP e TCP (su localhost:5002)...") logger.info("Invia 'exit' tramite TCP per terminare.") # Keep main thread alive while checking event while not main_shutdown.is_set(): # Optional: Add periodic checks or tasks here if needed # Example: Check thread health if not udp_server.receiver_thread.is_alive() and udp_server.is_active: logger.error("!!! Thread ricezione UDP non è più attivo ma il server dovrebbe esserlo!") # Attempt restart? Signal shutdown? # main_shutdown.set() # break # Exit main loop if not tcp_server.is_alive() and tcp_server._running.is_set(): logger.error("!!! Thread principale TCP Server non è più attivo ma dovrebbe esserlo!") # main_shutdown.set() # break # Exit main loop main_shutdown.wait(timeout=5.0) # Wait with timeout allows periodic checks except Exception as e: logger.critical(f"Errore critico nel main loop: {e}", exc_info=True) main_shutdown.set() # Ensure shutdown on critical error finally: # 10. Cleanup logger.info("--- Inizio procedura di arresto ---") if not main_shutdown.is_set(): main_shutdown.set() # Ensure event is set logger.info("Arresto Server TCP...") if tcp_server and tcp_server.is_alive(): tcp_server.stop() # Signal TCP server thread to stop tcp_server.join(timeout=5) # Wait for TCP server thread if tcp_server.is_alive(): logger.warning("Thread TCP Server non terminato in tempo.") logger.info("Arresto Server UDP...") if udp_server and udp_server.is_active: # udp_server.stop() # stop() calls close() and sets event udp_server.close() # close() is sufficient now as event is already set # Joining UDP thread is handled within close/stop logger.info("Attesa svuotamento coda output...") # Wait for the queue to be fully processed by the (now stopped) TCP output thread # Or just proceed, as the thread should have exited. Putting None signals it. # output_queue.join() # This might block if TCP thread died unexpectedly # Restore stdout sys.stdout = original_stdout print("--- Arresto Completato ---")