# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import socket import threading import time import select import struct # Importante: per il packing/unpacking di dati binari class ServerTCP: """ Un server TCP che utilizza il protocollo TLV e gestisce la frammentazione. """ #MAX_MESSAGE_SIZE = 10240 # Dimensione massima di un singolo messaggio (prima della frammentazione) MAX_MESSAGE_SIZE = 1024 # Dimensione massima di un singolo messaggio (prima della frammentazione) #MAX_FRAME_SIZE = 4096 # Dimensione massima di un singolo frammento MAX_FRAME_SIZE = 256 # Dimensione massima di un singolo frammento ACCEPT_TIMEOUT = 0.5 TIMEOUT_MS = 100 def __init__(self, port=5000): self.active_control = False self.listen_port = port self.listener = None self.clients = [] self.listening_thread = None self.on_msg_arrived = None self.on_msg_error = None self.on_msg_state = None self.lock = threading.Lock() self.close_event = threading.Event() self.is_closing = False # Mappa per tenere traccia dei frammenti in arrivo per ogni client self.client_buffers = {} def start(self): self.active_control = True self.clients = [] self.client_buffers = {} # Inizializza il dizionario dei buffer self.listening_thread = threading.Thread(target=self._start_listening) self.listening_thread.start() self.close_event.clear() self.is_closing = False def close(self): self.is_closing = True self.active_control = False self.close_event.set() try: with self.lock: clients_to_close = self.clients.copy() self.clients.clear() for client in clients_to_close: client.keep_alive = False try: client.sock.shutdown(socket.SHUT_RDWR) client.sock.close() except Exception: pass if client.thread.is_alive(): client.thread.join(timeout=1) if self.listener: try: # Ensure listener socket is properly shutdown before closing self.listener.shutdown(socket.SHUT_RDWR) except OSError as e: # Ignore errors like "[Errno 107] Transport endpoint is not connected" # which can happen if the socket is already closed or in a bad state. if e.errno != 107: #print(f"Warning during listener shutdown: {e}") pass except Exception as e: print(f"Exception during listener shutdown: {e}") finally: try: self.listener.close() except Exception as e: print(f"Exception during listener close: {e}") if self.listening_thread and self.listening_thread.is_alive(): self.listening_thread.join(timeout=1) except Exception as ex: print(f"Errore durante la chiusura del server: {ex}") def _start_listening(self): try: self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listener.bind(("", self.listen_port)) self.listener.listen() self.listener.settimeout(self.ACCEPT_TIMEOUT) if self.on_msg_state: self.on_msg_state(self, f"LISTENING on port {self.listen_port}") while not self.is_closing: try: s, addr = self.listener.accept() client_thread = threading.Thread(target=self._service_client, args=(s,addr,)) client_thread.start() except socket.timeout: continue except OSError as e: if self.is_closing: # Expected error when closing the listening socket break else: print(f"Errore nel thread di ascolto (OSError): {e}") # Optionally, attempt to recover or log more details break # Decide if the server should stop on accept error except Exception as ex: if not self.is_closing: print(f"Errore nel thread di ascolto (Exception): {ex}") break except Exception as ex: if not self.is_closing: print(f"Errore nell'avvio del listener: {ex}") if self.on_msg_state: self.on_msg_state(self, f"ERROR starting listener: {ex}") finally: if self.listener: try: self.listener.close() # Close here as well, just in case except: pass if not self.is_closing and self.on_msg_state: # Check is_closing again self.on_msg_state(self, "LISTENING STOPPED") def _service_client(self, client_socket, address): client = client_socket keep_alive = True c = None # Client object representing the connection client_addr_str = f"{address[0]}:{address[1]}" try: # Use address directly instead of getpeername() which might fail if connection drops early # remote_endpoint = client.getpeername() # Can throw OSError if connection resets remote_endpoint = address c = Client(f"cl_{client_addr_str}", remote_endpoint, threading.current_thread(), client) with self.lock: self.clients.append(c) # Inizializza il buffer per questo client self.client_buffers[c] = {"fragments": [], "total_length": 0, "message_type": 0, "header_bytes_received": 0, "expected_message_length": -1} if self.on_msg_state: self.on_msg_state(self, f"CONNECT from {client_addr_str}") client.settimeout(self.TIMEOUT_MS / 1000.0 * 20) # Set a reasonable timeout for recv while keep_alive and not self.is_closing: # Check if the client object itself requests closure if not c.keep_alive: keep_alive = False if self.on_msg_state: self.on_msg_state(self, f"DISCONNECT requested by server for {client_addr_str}") break # Use select for non-blocking check, but recv might still block for its timeout # ready = select.select([client], [], [], self.TIMEOUT_MS / 1000.0) # if ready[0]: try: # Read up to MAX_FRAME_SIZE bytes data = client.recv(self.MAX_FRAME_SIZE) if not data: # Connection closed by the client keep_alive = False if self.on_msg_state: self.on_msg_state(self, f"DISCONNECT by client {client_addr_str}") break # Process the received chunk of data self._process_data(c, data) except socket.timeout: # No data received within the timeout, continue loop # This is normal if the client is idle # print(f"Timeout receiving from {client_addr_str}") # Debug only continue except ConnectionResetError: keep_alive = False if self.on_msg_state: self.on_msg_state(self, f"DISCONNECT (Connection Reset) {client_addr_str}") break except OSError as e: keep_alive = False if not self.is_closing: # Avoid duplicate messages during shutdown print(f"Errore di rete nel thread client {client_addr_str}: {e}") if self.on_msg_state: self.on_msg_state(self, f"DISCONNECT (Network Error) {client_addr_str}: {e}") break # Exit loop on socket error except Exception as ex: keep_alive = False if not self.is_closing: print(f"Errore nel thread di servizio client {client_addr_str}: {ex}") if self.on_msg_state: self.on_msg_state(self, f"DISCONNECT (Error) {client_addr_str}: {ex}") break # Exit loop on general error # Check for server-initiated closure again after potential processing if self.close_event.is_set(): keep_alive = False break except Exception as ex: # Catch errors during client setup (e.g., getpeername) if not self.is_closing: print(f"Errore iniziale nel thread di servizio client ({client_addr_str}): {ex}") if self.on_msg_state: self.on_msg_state(self, f"ERROR setting up client {client_addr_str}: {ex}") finally: # --- Cleanup --- # Ensure socket is closed try: client.shutdown(socket.SHUT_RDWR) except (OSError, socket.error): pass # Ignore errors if already closed/disconnected try: client.close() except (OSError, socket.error): pass # Remove client from the list and its buffer with self.lock: if c in self.clients: self.clients.remove(c) # Rimuovi il buffer del client if c in self.client_buffers: del self.client_buffers[c] # Log final disconnect state if not already logged due to error/closure # Check if 'c' was successfully created and keep_alive became false for a reason other than shutdown if c and not keep_alive and not self.is_closing: # This might be redundant if already logged in the loop, but acts as a fallback # Consider adding more specific logging based on why keep_alive became false if needed pass # Example: if self.on_msg_state: self.on_msg_state(self, f"CLEANUP for {client_addr_str}") def _process_data(self, client, data): """Gestisce i dati in arrivo, assemblando i frammenti TLV.""" HEADER_SIZE = 5 # 1 byte Type + 4 bytes Length buffer_info = self.client_buffers[client] buffer_info["fragments"].append(data) buffer_info["total_length"] += len(data) # Loop finché potenzialmente abbiamo messaggi completi nel buffer while True: # State 1: Trying to read the header if buffer_info["expected_message_length"] == -1: if buffer_info["total_length"] >= HEADER_SIZE: # We have enough data for a header full_buffer = b"".join(buffer_info["fragments"]) header_data = full_buffer[:HEADER_SIZE] try: message_type, message_length = struct.unpack('!BI', header_data) # ! network byte order(big-endian), B(1 byte) I (4 byte) except struct.error as e: print(f"Error unpacking header from client {client.endpoint}: {e}. Buffer content (partial): {full_buffer[:20]}") # Decide how to handle corrupted header: close connection or try to resync? # For now, let's assume corruption means we should close. client.keep_alive = False # Signal thread to close # Clear buffer to prevent reprocessing bad data buffer_info["fragments"] = [] buffer_info["total_length"] = 0 return # Stop processing for this client # --- Header Validation (Optional but Recommended) --- # Example: Check if message_length exceeds a reasonable maximum if message_length > self.MAX_MESSAGE_SIZE * 2: # Allow some buffer, but prevent huge allocation print(f"Error: Received excessive message length ({message_length}) from {client.endpoint}. Closing connection.") client.keep_alive = False buffer_info["fragments"] = [] buffer_info["total_length"] = 0 return # Example: Check if message_type is known/valid # if message_type not in [0x01, 0x02, ...]: # Add known types # print(f"Error: Received unknown message type ({message_type:#04x}) from {client.endpoint}. Closing connection.") # client.keep_alive = False # buffer_info["fragments"] = [] # buffer_info["total_length"] = 0 # return # --- End Header Validation --- buffer_info["message_type"] = message_type buffer_info["expected_message_length"] = message_length buffer_info["header_bytes_received"] = HEADER_SIZE # We've consumed the header # Do not remove header from buffer yet, wait until full message is processed else: # Not enough data for a header yet, break and wait for more data break # State 2: Trying to read the message body if buffer_info["expected_message_length"] != -1: total_data_needed = HEADER_SIZE + buffer_info["expected_message_length"] if buffer_info["total_length"] >= total_data_needed: # We have the complete message (header + body) full_buffer = b"".join(buffer_info["fragments"]) # Extract message body message_data = full_buffer[HEADER_SIZE:total_data_needed] # --- Process the complete message --- message_type = buffer_info["message_type"] # Process the message (in base al tipo) if message_type == 0x01: # Tipo 0x01: Messaggio di testo try: # Use rstrip() cautiously, might remove intended spaces. # Consider if trailing whitespace is significant. message = message_data.decode('utf-8')#.rstrip() if self.on_msg_arrived: self.on_msg_arrived(self, message) except UnicodeDecodeError: if self.on_msg_error: self.on_msg_error(self, f"Errore di decodifica UTF-8 from {client.endpoint}") # Decide action: log, send error message back, or close? # client.keep_alive = False # Example: close on decode error # Add other message type handlers here # elif message_type == 0x02: # # Handle binary data, etc. # pass # --- Cleanup after processing message --- # Remove the processed message (header + body) from the buffer remaining_data = full_buffer[total_data_needed:] buffer_info["fragments"] = [remaining_data] if remaining_data else [] buffer_info["total_length"] = len(remaining_data) # Reset state for the next message buffer_info["message_type"] = 0 buffer_info["expected_message_length"] = -1 buffer_info["header_bytes_received"] = 0 # Continue the loop in case remaining_data contains another message continue else: # Not enough data for the *body* yet, break and wait for more data break else: # Should not happen if logic is correct, but break just in case break def _send_to_client(self, client, message_type, message_data): """Invia un messaggio TLV a uno specifico client, frammentandolo se necessario.""" if not client or not client.sock or client.sock.fileno() == -1: print(f"Attempted to send to an invalid or closed client socket: {client.name if client else 'N/A'}") return False try: # Validate message size before proceeding if len(message_data) > self.MAX_MESSAGE_SIZE: print(f"Error: Message size ({len(message_data)}) exceeds MAX_MESSAGE_SIZE ({self.MAX_MESSAGE_SIZE}) for client {client.endpoint}. Message not sent.") if self.on_msg_error: self.on_msg_error(self, f"Message too large to send ({len(message_data)} bytes)") return False # Costruisci il messaggio TLV completo header = struct.pack('!BI', message_type, len(message_data)) # Tipo e lunghezza full_message = header + message_data bytes_sent_total = 0 # Send the message potentially in fragments using sendall for robustness per fragment while bytes_sent_total < len(full_message): chunk_size = min(self.MAX_FRAME_SIZE, len(full_message) - bytes_sent_total) fragment = full_message[bytes_sent_total : bytes_sent_total + chunk_size] try: sent = client.sock.send(fragment) # send might send less than requested if sent == 0: # This typically means the socket is closed or in a bad state raise OSError("Socket connection broken (send returned 0)") bytes_sent_total += sent except socket.timeout: print(f"Timeout sending fragment to {client.endpoint}. Retrying or aborting might be needed.") # Decide: retry, abort, log? For now, let's assume error. raise # Re-raise timeout to be caught below except BlockingIOError: # This shouldn't happen with default blocking sockets, but handle defensively # If using non-blocking sockets, this would need select/poll logic print(f"Warning: BlockingIOError sending to {client.endpoint}. Waiting briefly.") time.sleep(0.01) # Simple wait, proper handling needs select/epoll continue # Retry sending the same fragment except Exception as e_send: # Catch specific send errors print(f"Error sending fragment to {client.endpoint}: {e_send}") raise # Re-raise to trigger cleanup return True # Indicate successful send completion except Exception as e: # General exception during send process (could be packing error, socket error, etc.) print(f"Failed to send message to client {client.endpoint}: {e}") client_addr_str = f"{client.endpoint[0]}:{client.endpoint[1]}" # For logging # --- Cleanup client on send error --- try: client.keep_alive = False # Signal the client's service thread to stop client.sock.shutdown(socket.SHUT_RDWR) except (OSError, socket.error): pass # Ignore if already closed/invalid try: client.sock.close() except (OSError, socket.error): pass # Remove from active clients list (use lock) with self.lock: if client in self.clients: self.clients.remove(client) if client in self.client_buffers: del self.client_buffers[client] if self.on_msg_state: self.on_msg_state(self, f"DISCONNECT (Send Error) {client_addr_str}: {e}") return False # Indicate send failure def send(self, message): """ Invia un messaggio di testo (tipo 0x01) a tutti i client connessi. """ if not self.active_control: print("Server not active, cannot send message.") return try: message_bytes = message.encode('utf-8') if len(message_bytes) > self.MAX_MESSAGE_SIZE: print(f"Error: Message too large ({len(message_bytes)} bytes) to send. Max is {self.MAX_MESSAGE_SIZE}.") if self.on_msg_error: self.on_msg_error(self, f"Message too large to send ({len(message_bytes)} bytes)") return # Lock the client list while iterating and sending with self.lock: # Iterate over a copy of the list in case _send_to_client modifies it on error clients_to_send = self.clients[:] sent_to_all = True for client in clients_to_send: # The _send_to_client method now handles its own exceptions and client removal if not self._send_to_client(client, 0x01, message_bytes): # 0x01 indica messaggio text sent_to_all = False # Client removal is handled within _send_to_client upon error # Optional: Log if message wasn't sent to all originally intended clients if not sent_to_all: print("Note: Message sending failed for one or more clients.") except UnicodeEncodeError as ex: print(f"Errore durante la codifica del messaggio in UTF-8: {ex}") if self.on_msg_error: self.on_msg_error(self, f"Encoding error: {ex}") except Exception as ex: print(f"Errore generale durante l'invio del messaggio broadcast: {ex}") class Client: """ Rappresenta un client connesso. """ def __init__(self, name, endpoint, thread, sock): self.name = name self.endpoint = endpoint # Tuple (ip, port) self.thread = thread self.sock = sock self.keep_alive = True # Flag to signal the service thread to stop # Optional: Add __eq__ and __hash__ if you need to use Client objects in sets or dict keys directly # based on something unique like the socket or endpoint. Be careful if endpoint/socket can change. def __eq__(self, other): if not isinstance(other, Client): return NotImplemented # Equality based on socket object identity might be sufficient if sockets aren't reused rapidly # Or use endpoint if guaranteed unique per active connection return self.sock is other.sock # Or self.endpoint == other.endpoint def __hash__(self): # Hash based on the same attribute used for equality return hash(self.sock) # Or hash(self.endpoint) # --- END OF CONTENT FROM servertcp_TLV.py --- # --- START OF CONTENT FROM main_servertcp_TLV.py --- # No need to import ServerTCP as it's defined above now # from servertcp_TLV import ServerTCP # Importa la versione TLV def handle_message(sender, message): # Assuming sender is the ServerTCP instance server_instance = sender # To show which client sent it, we'd need to modify _process_data # to pass the 'client' object to the handler. # For now, just print the message. print(f"Messaggio ricevuto: {message}") # Example: Echo message back to all clients # server_instance.send(f"Echo: {message}") def handle_error(sender, message): print(f"Errore gestito: {message}") def handle_state(sender, message): print(f"Stato del server: {message}\n") if __name__ == '__main__': server = ServerTCP(port=5000) # Usa la classe ServerTCP definita sopra server.on_msg_arrived = handle_message server.on_msg_error = handle_error server.on_msg_state = handle_state server.start() running = True try: while running: # Use try-except for input() in case of EOF or other issues try: command = input("Inserisci un messaggio da inviare a tutti (o 'quit' per uscire): \n") except EOFError: print("\nInput chiuso (EOF), uscendo...") running = False continue # Skip the rest of the loop if command.lower() == 'quit': running = False elif command: # Check if command is not empty server.send(command) # Invia il comando (la classe gestisce la codifica e TLV) # else: # Optional: Handle empty input if needed # print("Input vuoto, nessun messaggio inviato.") except KeyboardInterrupt: print("\nInterruzione da tastiera ricevuta (Ctrl+C)...") running = False # Ensure loop terminates finally: print("Avvio chiusura server...") server.close() print("Server chiuso.")