# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 # server.py (versione relay con crittografia ibrida + timeout + disconnessione + keep-alive + sender ID) import socket import struct import os import threading import time from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.backends import default_backend from cryptography.fernet import Fernet # Aggiunto per crittografia simmetrica # --- Configurazione --- MULTICAST_GROUP = '224.1.1.1' MULTICAST_PORT = 5007 MAX_PACKET_SIZE = 65507 FRAGMENT_SIZE = 1024 # Aumentato leggermente per efficienza Fernet CLIENT_TIMEOUT = 60 # Timeout in secondi # --- Costanti TLV --- TLV_FRAGMENT = 1 TLV_SERVER_PUBLIC_KEY_RESPONSE = 5 TLV_REQUEST_SERVER_PUBLIC_KEY = 7 TLV_DISCONNECT_REQUEST = 8 # Client invia questo (payload crittografato simmetricamente) TLV_KEEPALIVE = 9 TLV_SENDER_ID = 10 TLV_NOTIFY_CONNECT = 11 # Server notifica gli altri client TLV_NOTIFY_DISCONNECT = 12 # Server notifica gli altri client TLV_ENCRYPTED_SYMMETRIC_KEY = 13 # Client -> Server (chiave simmetrica crittografata asimmetricamente) TLV_SYMMETRICALLY_ENCRYPTED_MESSAGE = 14 # Messaggio crittografato simmetricamente # --- Funzioni di utilità --- def create_tlv(type, value): type_bytes = struct.pack('!H', type) length_bytes = struct.pack('!H', len(value)) return type_bytes + length_bytes + value def parse_tlv(data): if len(data) < 4: raise ValueError("Dati TLV insufficienti.") type, length = struct.unpack('!HH', data[:4]) if len(data) < 4 + length: raise ValueError("Lunghezza TLV non corrisponde ai dati.") value = data[4:4 + length] remaining_data = data[4 + length:] # Corretto l'indice slicing return type, value, remaining_data def encrypt_data_asymmetric(data, public_key): # Usato solo dal client per inviare la chiave simmetrica return public_key.encrypt(data, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) def decrypt_data_asymmetric(ciphertext, private_key): # Usato solo dal server per ricevere la chiave simmetrica try: return private_key.decrypt(ciphertext, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) except Exception as e: print(f"Errore durante la decrittazione asimmetrica: {e}"); return None def encrypt_data_symmetric(data, fernet_key): f = Fernet(fernet_key) return f.encrypt(data) def decrypt_data_symmetric(token, fernet_key): f = Fernet(fernet_key) try: return f.decrypt(token) except Exception as e: print(f"Errore durante la decrittazione simmetrica: {e}") return None def fragment_data(data, fragment_size): return [data[i:i+fragment_size] for i in range(0, len(data), fragment_size)] def defragment_data(fragments): return b''.join(fragments) def create_relay_message_symmetric(sender_address, message_plaintext, recipient_fernet_key): """Crea il messaggio da relayare, crittografato simmetricamente.""" sender_address_bytes = sender_address[0].encode() + struct.pack("!H", sender_address[1]) tlv_sender = create_tlv(TLV_SENDER_ID, sender_address_bytes) encrypted_message = encrypt_data_symmetric(message_plaintext, recipient_fernet_key) tlv_encrypted_message = create_tlv(TLV_SYMMETRICALLY_ENCRYPTED_MESSAGE, encrypted_message) return tlv_sender + tlv_encrypted_message # --- Classe ServerThread --- class ServerThread(threading.Thread): def __init__(self, sock, private_key, public_key): super().__init__() self.sock = sock self.private_key = private_key self.public_key = public_key self.running = True self.received_fragments = {} self.client_keys = {} # address -> (fernet_key, last_activity) self.lock = threading.Lock() # Lock per proteggere client_keys def run(self): cleanup_thread = threading.Thread(target=self.cleanup_inactive_clients, daemon=True) cleanup_thread.start() print("Server thread avviato.") while self.running: try: data, address = self.sock.recvfrom(MAX_PACKET_SIZE) # print(f"Ricevuto pacchetto da {address}, len={len(data)}") # Debug self.process_data(data, address) except socket.timeout: continue # Normale se non ci sono dati except (ValueError, struct.error, OSError) as e: if self.running: print(f"Errore socket/parsing (client {address}): {e}") except Exception as ex: if self.running: print(f"Errore non gestito nel loop principale (client {address}): {ex}") def process_data(self, data, address): # Aggiorna l'attività *prima* di elaborare, tranne per la registrazione iniziale is_registered = False with self.lock: if address in self.client_keys: is_registered = True self.client_keys[address] = (self.client_keys[address][0], time.time()) try: offset = 0 while offset < len(data): type, value, _ = parse_tlv(data[offset:]) # Non ci serve remaining_data qui offset += 4 + len(value) # Avanza correttamente #print(f"Processing TLV type {type} from {address}") # Debug if type == TLV_FRAGMENT: # 1: Frammento UDP # L'attività è già stata aggiornata se il client è registrato self.handle_fragment(value, address) elif type == TLV_REQUEST_SERVER_PUBLIC_KEY: # 7: Richiesta chiave pubblica server self.send_server_public_key(address) elif type == TLV_ENCRYPTED_SYMMETRIC_KEY: # 13: Ricezione chiave simmetrica client self.register_client_symmetric_key(value, address) elif type == TLV_KEEPALIVE: # 9: Keep-alive # L'attività è già stata aggiornata pass # print(f"Keep-alive ricevuto da {address}") else: # Altri tipi (come DISCONNECT) potrebbero essere dentro un frammento crittografato print(f"Tipo TLV {type} ricevuto al livello esterno da {address}. Ignorato o gestito dentro handle_fragment.") except ValueError as e: print(f"Errore parsing TLV da {address}: {e}") except Exception as e: print(f"Errore durante l'elaborazione dei dati da {address}: {e}") def register_client_symmetric_key(self, encrypted_key_value, address): """ Registra la chiave simmetrica Fernet inviata dal client.""" decrypted_fernet_key = decrypt_data_asymmetric(encrypted_key_value, self.private_key) if decrypted_fernet_key: try: # Verifica che sia una chiave Fernet valida (opzionale ma buono) Fernet(decrypted_fernet_key) with self.lock: # Notifica gli altri client *prima* di aggiungere la nuova chiave for other_address, (other_key, _) in self.client_keys.items(): if other_address != address: try: address_bytes = address[0].encode() + struct.pack("!H", address[1]) tlv_connect_notify = create_tlv(TLV_NOTIFY_CONNECT, address_bytes) # Tipo 11 # Non crittografiamo le notifiche di connessione/disconnessione self.sock.sendto(tlv_connect_notify, other_address) except Exception as e: print(f"Errore invio notifica connessione a {other_address}: {e}") # Aggiungi il nuovo client self.client_keys[address] = (decrypted_fernet_key, time.time()) print(f"Chiave simmetrica del client {address} registrata con successo.") # Invia notifica di connessione anche ai client già connessi (incluso il nuovo) # Questo è ridondante se il client si considera già connesso, ma serve per fargli sapere chi c'è già with self.lock: current_clients = list(self.client_keys.keys()) for existing_addr in current_clients: if existing_addr != address: # Non notificare a se stesso chi è già connesso try: addr_bytes = existing_addr[0].encode() + struct.pack("!H", existing_addr[1]) tlv_notify = create_tlv(TLV_NOTIFY_CONNECT, addr_bytes) self.sock.sendto(tlv_notify, address) # Invia al *nuovo* client chi c'era già except Exception as e: print(f"Errore invio notifica client esistente {existing_addr} al nuovo client {address}: {e}") except (ValueError, TypeError) as e: print(f"Errore: chiave simmetrica ricevuta da {address} non valida: {e}") else: print(f"Errore: impossibile decrittare la chiave simmetrica da {address}.") def send_server_public_key(self, address): public_pem = self.public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) tlv_response = create_tlv(TLV_SERVER_PUBLIC_KEY_RESPONSE, public_pem) # Tipo 5 try: self.sock.sendto(tlv_response, address) print(f"Chiave pubblica del server inviata a {address}") except Exception as e: print(f"Errore invio chiave pubblica a {address}: {e}") def handle_fragment(self, value, address): try: msg_id = struct.unpack('!I', value[:4])[0] seq_num, total_fragments = struct.unpack('!HH', value[4:8]) fragment_data_content = value[8:] #print(f"Ricevuto frammento {seq_num + 1}/{total_fragments} per msg_id {msg_id} da {address}") # Debug if msg_id not in self.received_fragments: self.received_fragments[msg_id] = {'fragments': {}, 'address': address, 'timestamp': time.time()} # Controllo anti-spoofing/errore: tutti i frammenti per un msg_id devono provenire dallo stesso indirizzo if self.received_fragments[msg_id]['address'] != address: print(f"WARN: Indirizzo mittente cambiato per msg_id {msg_id}. Atteso {self.received_fragments[msg_id]['address']}, ricevuto da {address}. Frammento scartato.") return self.received_fragments[msg_id]['fragments'][seq_num] = fragment_data_content self.received_fragments[msg_id]['timestamp'] = time.time() # Aggiorna timestamp all'ultimo frammento ricevuto # Controllo se il messaggio è completo if len(self.received_fragments[msg_id]['fragments']) == total_fragments: self.reassemble_decrypt_and_process(msg_id) except struct.error as e: print(f"Errore unpacking frammento da {address}: {e}") except Exception as e: print(f"Errore gestione frammento da {address}: {e}") def reassemble_decrypt_and_process(self, msg_id): """ Riunisce i frammenti, decritta il messaggio simmetrico e lo processa (relay o disconnessione). """ fragment_info = self.received_fragments.pop(msg_id, None) if not fragment_info: print(f"Tentativo di riassemblare msg_id {msg_id} già processato o scaduto.") return sender_address = fragment_info['address'] fragments_dict = fragment_info['fragments'] total_fragments = len(fragments_dict) # Dovrebbe corrispondere al valore nel pacchetto, ma lo ricalcoliamo per sicurezza print(f"Riassemblando {total_fragments} frammenti per msg_id {msg_id} da {sender_address}") # Debug # Assicurati che tutti i frammenti siano presenti (da 0 a total_fragments - 1) if set(fragments_dict.keys()) != set(range(total_fragments)): print(f"Errore: Frammenti mancanti per msg_id {msg_id} da {sender_address}. Scartato.") # Non eliminare da self.received_fragments qui, è già stato fatto con pop return # Ordina e riunisci i frammenti sorted_fragments = [fragments_dict[i] for i in range(total_fragments)] reassembled_data = defragment_data(sorted_fragments) # Il payload riassemblato DOVREBBE essere un TLV TLV_SYMMETRICALLY_ENCRYPTED_MESSAGE (14) try: inner_type, inner_value, _ = parse_tlv(reassembled_data) if inner_type == TLV_SYMMETRICALLY_ENCRYPTED_MESSAGE: # Tipo 14 with self.lock: if sender_address not in self.client_keys: print(f"Messaggio ricevuto da client non registrato o disconnesso ({sender_address}). Ignorato.") return sender_fernet_key = self.client_keys[sender_address][0] # Decritta il messaggio con la chiave simmetrica del mittente decrypted_message = decrypt_data_symmetric(inner_value, sender_fernet_key) if decrypted_message: # Controlla se è un messaggio speciale (es. DISCONNECT) o un messaggio da relayare try: # Tentiamo di decodificare come testo per comandi speciali message_text = decrypted_message.decode('utf-8') print(f"Messaggio decrittato da {sender_address}: {message_text}") if message_text == "DISCONNECT": self.handle_disconnect_request(sender_address) # elif message_text == "CONNECT": # Il client non dovrebbe inviare "CONNECT" crittografato # print(f"Messaggio 'CONNECT' ricevuto da {sender_address}, ignorato (registrazione già avvenuta).") else: # Messaggio normale da relayare self.relay_message_to_others(decrypted_message, sender_address) except UnicodeDecodeError: # Se non è UTF-8, consideralo dati binari da relayare print(f"Dati binari decrittati ricevuti da {sender_address}, relay in corso...") self.relay_message_to_others(decrypted_message, sender_address) else: print(f"Errore nella decrittazione simmetrica del messaggio da {sender_address}.") else: print(f"Tipo TLV interno ({inner_type}) non previsto dopo riassemblaggio da {sender_address}. Atteso {TLV_SYMMETRICALLY_ENCRYPTED_MESSAGE}.") except ValueError as e: print(f"Errore parsing TLV interno dopo riassemblaggio da {sender_address}: {e}") except Exception as e: print(f"Errore generico durante processamento messaggio riassemblato da {sender_address}: {e}") def relay_message_to_others(self, message_plaintext, sender_address): """ Invia un messaggio decrittato a tutti gli altri client connessi, crittografandolo per ciascuno.""" with self.lock: clients_to_relay = list(self.client_keys.items()) # Crea una copia per iterare in sicurezza for recipient_address, (recipient_fernet_key, _) in clients_to_relay: if recipient_address != sender_address: try: # Crea il messaggio relay: TLV SENDER_ID + TLV SYMMETRICALLY_ENCRYPTED_MESSAGE (crittografato per il destinatario) relay_msg_payload = create_relay_message_symmetric(sender_address, message_plaintext, recipient_fernet_key) # Non è necessaria frammentazione qui, UDP gestisce pacchetti grandi (fino a MAX_PACKET_SIZE) # Se il messaggio relayato è troppo grande, fallirà l'invio UDP sottostante. if len(relay_msg_payload) > MAX_PACKET_SIZE: print(f"WARN: Messaggio relayato da {sender_address} a {recipient_address} troppo grande ({len(relay_msg_payload)} bytes). Non inviato.") continue self.sock.sendto(relay_msg_payload, recipient_address) #print(f"Messaggio da {sender_address} relayato a {recipient_address}") # Debug - Verboso except Exception as e: print(f"Errore nel relay del messaggio da {sender_address} a {recipient_address}: {e}") def handle_disconnect_request(self, address): """Gestisce la richiesta di disconnessione esplicita (ricevuta tramite messaggio crittografato).""" print(f"Ricevuta richiesta di disconnessione da {address}") self.remove_client(address, notify_others=True, reason="richiesta esplicita") def cleanup_inactive_clients(self): """ Thread periodico per rimuovere client inattivi e frammenti vecchi. """ while self.running: time.sleep(CLIENT_TIMEOUT / 2) now = time.time() clients_to_remove = [] fragments_to_remove = [] # Pulizia Frammenti Incompleti # Copia delle chiavi per evitare problemi di concorrenza durante l'iterazione current_fragment_keys = list(self.received_fragments.keys()) for msg_id in current_fragment_keys: # Verifica se la chiave esiste ancora (potrebbe essere stata rimossa da reassemble) if msg_id in self.received_fragments: if now - self.received_fragments[msg_id]['timestamp'] > CLIENT_TIMEOUT * 2: # Timeout più lungo per frammenti fragments_to_remove.append(msg_id) for msg_id in fragments_to_remove: if msg_id in self.received_fragments: removed_info = self.received_fragments.pop(msg_id) print(f"Rimosso messaggio frammentato incompleto/vecchio (msg_id {msg_id} da {removed_info['address']})") # Pulizia Client Inattivi with self.lock: # Copia per iterare in sicurezza current_clients = list(self.client_keys.items()) for address, (_, last_activity) in current_clients: if now - last_activity > CLIENT_TIMEOUT: clients_to_remove.append(address) # Rimuovi i client fuori dal lock principale per evitare deadlock durante la notifica for address in clients_to_remove: self.remove_client(address, notify_others=True, reason="timeout") def remove_client(self, address, notify_others=True, reason="sconosciuta"): """ Rimuove un client e opzionalmente notifica gli altri. """ removed_key = None with self.lock: if address in self.client_keys: removed_key = self.client_keys.pop(address) # Rimuove e ottiene la chiave print(f"Client {address} rimosso ({reason}).") else: # Il client potrebbe essere stato già rimosso (es. timeout e disconnessione quasi simultanei) # print(f"Tentativo di rimuovere client {address} non trovato ({reason}).") return # Non c'è nulla da fare o notificare se non esiste più if notify_others and removed_key: # Notifica gli altri client *dopo* aver rimosso la chiave with self.lock: # Copia la lista attuale dei client remaining_clients = list(self.client_keys.keys()) for other_address in remaining_clients: # Non è necessario controllare if other_address != address, perchè 'address' è già stato rimosso try: address_bytes = address[0].encode() + struct.pack("!H", address[1]) tlv_disconnect_notify = create_tlv(TLV_NOTIFY_DISCONNECT, address_bytes) # Tipo 12 # Le notifiche non sono crittografate self.sock.sendto(tlv_disconnect_notify, other_address) except Exception as e: print(f"Errore invio notifica disconnessione ({reason}) di {address} a {other_address}: {e}") def stop(self): print("Arresto server thread...") self.running = False # Non chiudere il socket qui, fallo nella funzione principale run_server # self.sock.close() # Causa problemi se il loop principale sta ancora usando recvfrom # --- Funzione principale del server --- def run_server(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # Esplicito UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Binding specifico per OS try: if os.name == 'nt': # Su Windows, bind all'indirizzo specifico dell'interfaccia o 0.0.0.0 # Binding a '' potrebbe non funzionare correttamente per multicast receive # Prova con '0.0.0.0' che spesso funziona meglio sock.bind(('0.0.0.0', MULTICAST_PORT)) print(f"Server binding su 0.0.0.0:{MULTICAST_PORT} (Windows)") else: # Su Linux/macOS, bind all'indirizzo multicast è preferibile per ricevere solo da quel gruppo sock.bind((MULTICAST_GROUP, MULTICAST_PORT)) print(f"Server binding su {MULTICAST_GROUP}:{MULTICAST_PORT} (Linux/macOS)") except OSError as e: print(f"Errore durante il bind del socket: {e}") print("Potrebbe essere necessario eseguire come amministratore/root o la porta è già in uso.") return # Configurazione membership multicast group = socket.inet_aton(MULTICAST_GROUP) # Ascolta su tutte le interfacce ('0.0.0.0') mreq = group + socket.inet_aton('0.0.0.0') # Alternativa per specificare interfaccia (potrebbe essere necessario su alcuni sistemi) # try: # interface_ip = socket.gethostbyname(socket.gethostname()) # Trova IP locale # mreq = struct.pack('4s4s', group, socket.inet_aton(interface_ip)) # print(f"Utilizzo interfaccia {interface_ip} per multicast") # except socket.gaierror: # print("Impossibile determinare l'IP locale, uso 0.0.0.0") # mreq = group + socket.inet_aton('0.0.0.0') try: sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) print(f"Aggiunta membership al gruppo multicast {MULTICAST_GROUP}") except OSError as e: print(f"Errore nell'aggiungere la membership multicast: {e}") print("Verifica che l'indirizzo multicast sia valido e che le impostazioni di rete lo permettano.") sock.close() return sock.settimeout(1.0) # Timeout per permettere l'uscita controllata print(f"Server in ascolto su {MULTICAST_GROUP}:{MULTICAST_PORT}") # Genera chiavi RSA del server private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) public_key = private_key.public_key() print("Chiavi RSA del server generate.") server_thread = ServerThread(sock, private_key, public_key) server_thread.start() try: while server_thread.is_alive(): time.sleep(1) except KeyboardInterrupt: print("\nServer terminato (Ctrl+C). Inizio shutdown...") server_thread.stop() # Aspetta che il thread termini (potrebbe richiedere il timeout del socket) server_thread.join(timeout=2.0) if server_thread.is_alive(): print("WARN: Server thread non terminato entro il timeout.") finally: # Assicurati che il thread sia fermo prima di chiudere il socket if server_thread.is_alive(): server_thread.stop() server_thread.join(timeout=1.0) # Ultimo tentativo # Rimuovi membership multicast (best effort) try: sock.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq) print("Rimossa membership multicast.") except OSError as e: print(f"Errore durante la rimozione della membership multicast: {e}") except Exception as ex: # Cattura altre possibili eccezioni come socket già chiuso print(f"Errore generico durante la rimozione membership: {ex}") sock.close() print("Socket del server chiuso.") print("Server Terminato Correttamente") if __name__ == "__main__": run_server()