# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import socket import select import logging import threading import sys import errno import time import os # Import per la generazione di ID messaggio unici from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes, serialization from cryptography.exceptions import InvalidSignature from cryptography.hazmat.backends import default_backend class UDPServer: """ Un server UDP con crittografia asimmetrica che gestisce la frammentazione robusta dei messaggi. """ def __init__(self, host, port, buffer_size=1024, timeout=5, fragment_timeout=10, max_fragment_size=490): """ Inizializza l'oggetto UDPServer. """ self.host = host self.port = port self.buffer_size = buffer_size self.socket_timeout = timeout self.fragment_timeout = fragment_timeout self.max_fragment_size = max_fragment_size self.udp_socket = None self.is_active = False self._stop_event = threading.Event() self.receiver_thread = None self.on_message = None # Callback per i messaggi ricevuti, inizialmente None self.logger = self._setup_logger() self._socket_lock = threading.Lock() # I buffer dei frammenti client sono ora organizzati per indirizzo client, poi per ID messaggio self.client_message_fragment_buffers = {} # Genera una coppia di chiavi RSA self.private_key = rsa.generate_private_key( public_exponent=65537, key_size=4096 ) self.public_key = self.private_key.public_key() # Serializza la chiave pubblica per poterla inviare self.public_key_pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) # Dizionario per memorizzare le chiavi pubbliche dei client self.client_public_keys = {} def _setup_logger(self): """Configura il logger.""" logger = logging.getLogger('UDPServer') logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) return logger def start(self): """Avvia il server UDP.""" if self.is_active: self.logger.warning("Il server è già attivo.") return try: with self._socket_lock: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.settimeout(self.socket_timeout) self.udp_socket.bind((self.host, self.port)) self.is_active = True self._stop_event.clear() self.receiver_thread = threading.Thread(target=self._receive_loop) self.receiver_thread.start() self.logger.info(f"Server UDP avviato su {self.host}:{self.port}") except socket.error as e: self._handle_error(f"Errore socket durante l'avvio: {e}") self.close() except Exception as e: self._handle_error(f"Errore generico durante l'avvio: {e}") self.close() def _receive_loop(self): """Loop principale per la ricezione di messaggi UDP.""" try: while not self._stop_event.is_set(): with self._socket_lock: if self.udp_socket is None: break ready = select.select([self.udp_socket], [], [], self.socket_timeout) if ready[0]: try: with self._socket_lock: if self.udp_socket is None: break data, client_address = self.udp_socket.recvfrom(self.buffer_size) if not data: self.logger.warning("Ricevuto pacchetto UDP vuoto.") continue self._process_message(client_address, data) except socket.timeout: self.logger.debug("Timeout durante la ricezione dati.") continue except socket.error as e: if e.errno == errno.ECONNREFUSED: self.logger.warning(f"Nessun host in ascolto: {e}") else: self._handle_error(f"Errore socket durante la ricezione: {e}") else: self._cleanup_fragment_buffers() pass # Nessun dato ricevuto entro il timeout except Exception as ex: self._handle_error(f"Errore nel thread di ricezione: {ex}") finally: self.close() def _cleanup_fragment_buffers(self): """Pulisce i buffer di frammenti scaduti.""" clients_to_cleanup = [] for client_address, message_buffers in self.client_message_fragment_buffers.items(): message_ids_to_cleanup = [] for message_id, buffer_info in message_buffers.items(): if 'timer' in buffer_info and buffer_info['timer'] and not buffer_info['timer'].is_alive(): self.logger.debug(f"Timeout buffer frammenti per msg_id {message_id} da {client_address}, pulizia buffer.") message_ids_to_cleanup.append(message_id) for message_id in message_ids_to_cleanup: del message_buffers[message_id] if not message_buffers: clients_to_cleanup.append(client_address) for client_address in clients_to_cleanup: if client_address in self.client_message_fragment_buffers: del self.client_message_fragment_buffers[client_address] def _reset_fragment_timer(self, client_address, message_id): """Resetta o avvia il timer di frammentazione per un client e message ID.""" if client_address in self.client_message_fragment_buffers and message_id in self.client_message_fragment_buffers[client_address] and 'timer' in self.client_message_fragment_buffers[client_address][message_id] and self.client_message_fragment_buffers[client_address][message_id]['timer']: self.client_message_fragment_buffers[client_address][message_id]['timer'].cancel() # Cancella il timer esistente timer = threading.Timer(self.fragment_timeout, self._timeout_fragment_buffer, args=[client_address, message_id]) self.client_message_fragment_buffers[client_address][message_id]['timer'] = timer timer.start() def _timeout_fragment_buffer(self, client_address, message_id): """Gestisce il timeout del buffer di frammenti per un client e message ID.""" if client_address in self.client_message_fragment_buffers and message_id in self.client_message_fragment_buffers[client_address]: self.logger.warning(f"Timeout raggiunto per buffer frammenti msg_id {message_id} da {client_address}. Scarto il buffer.") del self.client_message_fragment_buffers[client_address][message_id] if not self.client_message_fragment_buffers[client_address]: del self.client_message_fragment_buffers[client_address] def _process_message(self, client_address, data): """ Processa i messaggi UDP in entrata, gestendo chiavi pubbliche, richieste e messaggi criptati frammentati. """ # Gestione della chiave pubblica (non frammentata) start_key_marker = b"-----BEGIN PUBLIC KEY-----" end_key_marker = b"-----END PUBLIC KEY-----" start_key_index = data.find(start_key_marker) end_key_index = data.find(end_key_marker) if start_key_index != -1 and end_key_index != -1: # Estrai la chiave pubblica public_key_bytes = data[start_key_index:end_key_index + len(end_key_marker)] try: public_key = serialization.load_pem_public_key(public_key_bytes) self.client_public_keys[client_address] = public_key self.logger.info(f"Chiave pubblica ricevuta da {client_address}") return # I messaggi di chiave pubblica non sono messaggi frammentati di per sé, ferma l'elaborazione qui. except ValueError as e: self._handle_error(f"Errore nella lettura della chiave pubblica da {client_address}: {e}") return # Ferma l'elaborazione in caso di errore della chiave pubblica # Gestione della richiesta di chiave pubblica (non frammentata) if data == b"REQ_PUB_KEY\n": self.logger.info(f"Richiesta di chiave pubblica ricevuta da {client_address}") self.send_to(self.public_key_pem, client_address) return # Le richieste di chiave pubblica non sono messaggi frammentati, ferma l'elaborazione qui. # Tentativo di processare come messaggio frammentato o non frammentato criptato if len(data) >= 6: # Controlla se i dati sono abbastanza lunghi da contenere l'header (4 msg_id + 1 frag_num + 1 total_frags) message_id_bytes = data[:4] # Byte ID messaggio estratti fragment_number = data[4] total_fragment_count = data[5] fragment_payload = data[6:] message_id = int.from_bytes(message_id_bytes, byteorder='big') # ID messaggio usato direttamente if message_id > 0 and total_fragment_count > 0 and fragment_number > 0: # Header di frammento valido self._process_fragment(client_address, message_id, fragment_number, total_fragment_count, fragment_payload) return # Frammento processato, ferma l'ulteriore elaborazione di questi dati in questa funzione # Se non è chiave pubblica, richiesta chiave pubblica, o frammento, prova a trattarlo come messaggio non frammentato (assumendo criptato) self._process_non_fragmented_message(client_address, data) def _process_fragment(self, client_address, message_id, fragment_number, total_fragment_count, fragment_payload): """Gestisce l'elaborazione di un frammento di messaggio.""" if client_address not in self.client_message_fragment_buffers: self.client_message_fragment_buffers[client_address] = {} # Inizializza i buffer dei messaggi del client if message_id not in self.client_message_fragment_buffers[client_address]: self.client_message_fragment_buffers[client_address][message_id] = { 'fragments': {}, 'total_fragments': total_fragment_count, 'timer': None } message_buffer = self.client_message_fragment_buffers[client_address][message_id] message_buffer['fragments'][fragment_number] = fragment_payload if len(message_buffer['fragments']) == total_fragment_count: self.logger.debug(f"Tutti i frammenti ricevuti per msg_id {message_id} da {client_address}. Assemblaggio...") # Riassembla i frammenti in ordine reassembled_message_bytes = bytearray() for i in range(1, total_fragment_count + 1): if i not in message_buffer['fragments']: self.logger.error(f"Frammento mancante #{i} per msg_id {message_id} da {client_address}!") del self.client_message_fragment_buffers[client_address][message_id] return # Ferma l'elaborazione, scarta il messaggio. In un sistema robusto reale, potresti richiedere la ritrasmissione. reassembled_message_bytes.extend(message_buffer['fragments'][i]) decrypted_message = self._decrypt_message(client_address, bytes(reassembled_message_bytes)) if decrypted_message: self.logger.info(f"Messaggio assemblato e decriptato da {client_address}, msg_id {message_id}: {decrypted_message}...") if self.on_message: # se c'è un callback per i messaggi self.on_message(decrypted_message, client_address) # chiama il callback con self, messaggio e indirizzo else: print("MessageProcessor: Nessuna callback on_message impostata per questo messaggio.") id_appg = str(message_id) str_appg = id_appg + ": ACK" self.send_to(str_appg, client_address) else: self.logger.error(f"Errore nella decrittazione del messaggio assemblato msg_id {message_id} da {client_address}.") del self.client_message_fragment_buffers[client_address][message_id] # Pulisci il buffer dopo l'elaborazione else: # Non tutti i frammenti ancora self._reset_fragment_timer(client_address, message_id) self.logger.debug(f"Frammento #{fragment_number}/{total_fragment_count} ricevuto per msg_id {message_id} da {client_address}. In attesa di altri frammenti...") def _process_non_fragmented_message(self, client_address, data): """Processa i messaggi non frammentati (assunti come criptati).""" decrypted_message = self._decrypt_message(client_address, data) if decrypted_message: self.logger.info(f"Messaggio non frammentato ricevuto e decriptato da {client_address}: {decrypted_message[:50]}...") self.send_to(b"ACK", client_address) else: self.logger.warning(f"Impossibile decriptare messaggio non frammentato da {client_address} o chiave pubblica non disponibile.") def _decrypt_message(self, client_address, message_bytes): """Decripta il messaggio se la chiave pubblica del client è disponibile.""" if client_address in self.client_public_keys and self.client_public_keys[client_address]: try: decrypted_message = self.private_key.decrypt( message_bytes, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return decrypted_message.decode('latin-1') # o utf-8, a seconda della codifica except ValueError as e: self.logger.error(f"Errore di decrittazione da {client_address}: {e}") return None else: self.logger.warning(f"Chiave pubblica non disponibile per {client_address}, impossibile decriptare.") return None def close(self): """Chiude la connessione del server e rilascia le risorse.""" if not self.is_active: return self.is_active = False try: self._stop_event.set() with self._socket_lock: if self.udp_socket: self.udp_socket.setblocking(0) self.udp_socket.close() self.udp_socket = None if self.receiver_thread and self.receiver_thread.is_alive(): self.receiver_thread.join(timeout=1) except socket.error as e: self._handle_error(f"Errore socket durante la chiusura: {e}") except Exception as e: self._handle_error(f"Errore generico durante la chiusura: {e}") finally: self.logger.info("Server UDP arrestato.") def send_to(self, message, client_address): """ Invia un messaggio UDP criptato a un indirizzo specificato. """ if not self.is_active: self._handle_error("Il server non è attivo, impossibile inviare il messaggio") return try: if isinstance(message, str): message = message.encode('utf-8') if message == b"REQ_PUB_KEY" or message == self.public_key_pem: pass # non deve criptare la richiesta e la chiave pubblica # Cripta il messaggio se l'indirizzo di destinazione è noto e abbiamo la sua chiave pubblica elif client_address in self.client_public_keys and self.client_public_keys[client_address]: encrypted_message = self.client_public_keys[client_address].encrypt( message, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) message = encrypted_message else: self.logger.warning(f"Chiave pubblica di {client_address} non trovata. Invio non criptato.") with self._socket_lock: if self.udp_socket: self.udp_socket.sendto(message, client_address) self.logger.debug(f"Inviato messaggio a {client_address}: {message[:50]}...") # Mostra solo i primi 50 caratteri per brevità else: self._handle_error("Il socket non è inizializzato, impossibile inviare il messaggio.") except (socket.error, ValueError) as e: self._handle_error(f"Errore durante l'invio a {client_address}: {e}") except Exception as e: self._handle_error(f"Errore generico durante l'invio a {client_address}: {e}") def _handle_error(self, message): """Gestisce gli errori.""" self.logger.error(message) # Funzione di callback che assegniamo a on_message def print_message_callback(messaggio, indirizzo): """ Funzione di callback per gestire i messaggi ricevuti. Prende il messaggio e l'indirizzo come argomenti. """ print(f"Callback: Messaggio ricevuto: {messaggio}") print(f"Callback: Indirizzo mittente: {indirizzo}") def main(): # Configurazione del server HOST = '0.0.0.0' # Ascolta su tutte le interfacce di rete PORT = 5001 # Porta su cui il server ascolta FRAGMENT_TIMEOUT = 5 # Timeout per i frammenti, in secondi # Crea e avvia il server UDP server = UDPServer(HOST, PORT, fragment_timeout=FRAGMENT_TIMEOUT) # Passa fragment_timeout al costruttore server.on_message = print_message_callback # Assegna la funzione di callback server.start() try: # Mantieni il server in esecuzione while server.is_active: # Rinominato in is_active time.sleep(1) except KeyboardInterrupt: print("\nServer in arresto...") finally: server.close() if __name__ == "__main__": main()