# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 # server.py (versione relay con timeout + disconnessione + keep-alive + sender ID) import socket import struct import os import threading import time from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.backends import default_backend # --- Configurazione --- MULTICAST_GROUP = '224.1.1.1' MULTICAST_PORT = 5007 MAX_PACKET_SIZE = 65507 FRAGMENT_SIZE = 256 CLIENT_TIMEOUT = 60 # Timeout in secondi # --- Funzioni di utilità --- def create_tlv(type, value): type_bytes = struct.pack('!H', type) length_bytes = struct.pack('!H', len(value)) return type_bytes + length_bytes + value def parse_tlv(data): if len(data) < 4: raise ValueError("Dati TLV insufficienti.") type, length = struct.unpack('!HH', data[:4]) if len(data) < 4 + length: raise ValueError("Lunghezza TLV non corrisponde ai dati.") value = data[4:4 + length] remaining_data = data[4:4 + length:] return type, value, remaining_data def encrypt_data(data, public_key): return public_key.encrypt(data, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) def decrypt_data(ciphertext, private_key): try: return private_key.decrypt(ciphertext, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) except Exception as e: print(f"Errore durante la decrittazione: {e}"); return None def fragment_data(data, fragment_size): return [data[i:i+fragment_size] for i in range(0, len(data), fragment_size)] def defragment_data(fragments): return b''.join(fragments) def create_relay_message(sender_address, message, recipient_public_key): """Crea il messaggio da relayare, includendo l'indirizzo del mittente.""" sender_address_bytes = sender_address[0].encode() + struct.pack("!H", sender_address[1]) # Indirizzo IP e porta tlv_sender = create_tlv(10, sender_address_bytes) # Tipo 10 per l'indirizzo del mittente encrypted_message = encrypt_data(message, recipient_public_key) tlv_encrypted_message = create_tlv(6, encrypted_message) return tlv_sender + tlv_encrypted_message # --- Classe ServerThread --- class ServerThread(threading.Thread): def __init__(self, sock, private_key, public_key): super().__init__() self.sock = sock self.private_key = private_key self.public_key = public_key self.running = True self.received_fragments = {} self.client_keys = {} # address -> (public_key, last_activity) self.lock = threading.Lock() # Lock per proteggere client_keys def run(self): cleanup_thread = threading.Thread(target=self.cleanup_inactive_clients, daemon=True) cleanup_thread.start() while self.running: try: data, address = self.sock.recvfrom(MAX_PACKET_SIZE) #print(f"Ricevuto da {address}") self.process_data(data, address) except (socket.timeout, ValueError, struct.error, OSError) as e: if self.running: pass except Exception as ex: if self.running: print("Errore non gestito", ex) def process_data(self, data, address): try: offset = 0 while offset < len(data): type, value, _ = parse_tlv(data[offset:]) offset += 4 + len(value) if type == 1: self.handle_fragment(value, address) elif type == 4: self.register_client_key(value, address) elif type == 7: self.send_server_public_key(address) elif type == 8: self.handle_disconnect(address) # Gestione disconnessione elif type == 9: self.handle_keepalive(address) # Gestione keep-alive else: print(f"Tipo TLV sconosciuto: {type}") with self.lock: if address in self.client_keys: self.client_keys[address] = (self.client_keys[address][0], time.time()) except Exception as e: print("Errore elaborazione", e) def register_client_key(self, value, address): try: public_key = serialization.load_pem_public_key(value, backend=default_backend()) with self.lock: self.client_keys[address] = (public_key, time.time()) print(f"Chiave pubblica del client {address} registrata.") except Exception as e: print(f"Errore nella registrazione della chiave pubblica: {e}") def send_server_public_key(self, address): public_pem = self.public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) tlv_response = create_tlv(5, public_pem) self.sock.sendto(tlv_response, address) print(f"Chiave pubblica del server inviata a {address}") def handle_fragment(self, value, address): msg_id = struct.unpack('!I', value[:4])[0] seq_num, total_fragments = struct.unpack('!HH', value[4:8]) if msg_id not in self.received_fragments: self.received_fragments[msg_id] = {} self.received_fragments[msg_id][seq_num] = value[8:] if len(self.received_fragments[msg_id]) == total_fragments: self.reassemble_decrypt_and_relay(msg_id, address) def reassemble_decrypt_and_relay(self, msg_id, sender_address): sorted_fragments = [self.received_fragments[msg_id][i] for i in range(len(self.received_fragments[msg_id]))] defragmented_message = defragment_data(sorted_fragments) decrypted_blocks = [] offset = 0 while offset < len(defragmented_message): msg_type, msg_value, _ = parse_tlv(defragmented_message[offset:]) offset += 4 + len(msg_value) if msg_type == 3: decrypted_block = decrypt_data(msg_value, self.private_key) if decrypted_block: decrypted_blocks.append(decrypted_block) if not decrypted_blocks: print("Nessun blocco decifrato."); del self.received_fragments[msg_id]; return final_message = b"".join(decrypted_blocks) print(f"Messaggio ricevuto da {sender_address}: {final_message.decode('utf-8', errors='replace')}") with self.lock: for client_address, (client_pk, _) in self.client_keys.items(): if client_address != sender_address: self.relay_message(final_message, client_pk, client_address, sender_address) # Passa sender_address del self.received_fragments[msg_id] def relay_message(self, message, recipient_public_key, recipient_address, sender_address): """Invia un messaggio relayato, includendo l'indirizzo del mittente.""" try: relay_msg = create_relay_message(sender_address, message, recipient_public_key) self.sock.sendto(relay_msg, recipient_address) print(f"Messaggio da {sender_address} relayato a {recipient_address}") except Exception as e: print(f"Errore nel relay del messaggio a {recipient_address}: {e}") def handle_disconnect(self, address): """Gestisce la disconnessione esplicita di un client.""" with self.lock: if address in self.client_keys: # Notifica gli altri client *prima* di rimuovere la chiave for other_address, (other_pk, _) in self.client_keys.items(): if other_address != address: try: address_bytes = address[0].encode() + struct.pack("!H", address[1]) tlv_disconnect = create_tlv(12, address_bytes) self.sock.sendto(tlv_disconnect, other_address) except Exception as e: print(f"Errore durante la notifica di disconnessione a {other_address}: {e}") # Rimuovi il client *immediatamente* del self.client_keys[address] print(f"Client {address} disconnesso (messaggio di disconnessione).") else: print(f"Richiesta di disconnessione da un client non registrato: {address}") def handle_keepalive(self, address): """Gestisce i messaggi di keep-alive.""" with self.lock: if address in self.client_keys: self.client_keys[address] = (self.client_keys[address][0], time.time()) #print(f"Keep-alive ricevuto da {address}") # Debug - Rimosso per brevità # else: # Rimosso per evitare troppi messaggi nel log # print(f"Keep-alive ricevuto da un client non registrato: {address}") def cleanup_inactive_clients(self): while self.running: time.sleep(CLIENT_TIMEOUT / 2) now = time.time() clients_to_remove = [] with self.lock: # Gestisci *solo* i timeout, le disconnessioni sono gestite immediatamente for address, (_, last_activity) in self.client_keys.items(): if now - last_activity > CLIENT_TIMEOUT: clients_to_remove.append(address) for address in clients_to_remove: # Notifica gli altri client for other_address, (other_pk, _) in self.client_keys.items(): if other_address != address: try: address_bytes = address[0].encode() + struct.pack("!H", address[1]) tlv_disconnect = create_tlv(12, address_bytes) # Tipo 12 per notifica self.sock.sendto(tlv_disconnect, other_address) except Exception as e: print(f"Errore invio notifica disconnessione (timeout) a {other_address}: {e}") del self.client_keys[address] # Rimozione per inattività print(f"Client {address} rimosso per inattività.") def stop(self): self.running = False self.sock.close() # --- Funzione principale del server --- def run_server(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(1.0) if os.name == 'nt': sock.bind(('', MULTICAST_PORT)) else: sock.bind((MULTICAST_GROUP, MULTICAST_PORT)) group = socket.inet_aton(MULTICAST_GROUP) mreq = group + socket.inet_aton('0.0.0.0') if os.name == 'nt' else struct.pack('4sL', group, socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) print(f"Server in ascolto su {MULTICAST_GROUP}:{MULTICAST_PORT}") private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) public_key = private_key.public_key() server_thread = ServerThread(sock, private_key, public_key) server_thread.start() try: while server_thread.is_alive(): time.sleep(1) except KeyboardInterrupt: print("Server terminato (Ctrl+C)."); server_thread.stop(); server_thread.join() finally: if server_thread.is_alive(): server_thread.stop(); server_thread.join() print("Server Terminato Correttamente") if __name__ == "__main__": run_server()