# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import argparse import logging import os import platform import sys import codecs import threading import time import string # --- Blocco per importare Scapy e gestire avvisi --- logging.getLogger("scapy.runtime").setLevel(logging.ERROR) logging.getLogger("scapy.interactive").setLevel(logging.ERROR) try: # *** AGGIUNTO Ether ALL'IMPORT *** from scapy.all import (Ether, IP, TCP, UDP, ICMP, Raw, sniff, conf, get_if_list, get_if_addr, get_if_hwaddr) if platform.system() == "Windows": from scapy.arch.windows import get_windows_if_list except ImportError: print("\nERRORE: Scapy non sembra essere installato.") print("Esegui: pip install scapy") sys.exit(1) # --- Blocchi controllo permessi, elenco interfacce, evento, sanificazione (INVARIATI) --- def check_permissions(): # ... (codice invariato) ... if platform.system() == "Windows": try: import ctypes; return ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: return False else: try: return os.geteuid() == 0 except AttributeError: return False def list_interfaces(): # ... (codice invariato) ... print("\nInterfacce di rete disponibili:") interfaces = {} try: if platform.system() == "Windows": win_ifaces = get_windows_if_list() if not win_ifaces: print("Nessuna interfaccia trovata."); return None for i, iface in enumerate(win_ifaces): name_for_scapy = iface.get('name', iface.get('guid')) if name_for_scapy: ip = iface.get('ip_addresses', ['N/D'])[0] if iface.get('ip_addresses') else 'N/D' mac = iface.get('mac', 'N/D') desc = iface.get('description', 'N/D') print(f" [{i}] {desc} (Nome Scapy: {name_for_scapy}, IP: {ip}, MAC: {mac})") interfaces[i] = name_for_scapy else: print(f" [{i}] Interfaccia senza nome/GUID: {iface.get('description', 'N/D')}") else: # Linux/Unix if_names = get_if_list() if not if_names: print("Nessuna interfaccia trovata."); return None for i, name in enumerate(if_names): interfaces[i] = name try: ip = get_if_addr(name) mac = get_if_hwaddr(name) print(f" [{i}] {name} (IP: {ip}, MAC: {mac})") except Exception: print(f" [{i}] {name} (IP/MAC non ottenibili)") except Exception as e: print(f"\nERRORE durante l'elenco delle interfacce: {e}") return None return interfaces stop_event = threading.Event() def sanitize_string(input_str): # ... (codice invariato) ... return "".join(char if char in string.printable else '.' for char in input_str) # --- Funzione per processare i pacchetti (MODIFICATA per mostrare livelli inferiori) --- def process_packet_func(packet, ip1, ip2): """ Analizza un pacchetto, stampa i livelli Ethernet, IP e Trasporto (TCP/UDP/ICMP) se il traffico è tra gli IP specificati. """ # Livello Ethernet (Livello 2) - Opzionale, potrebbe mancare (es. loopback) eth_src = "N/A" eth_dst = "N/A" eth_type = "N/A" if packet.haslayer(Ether): eth_src = packet[Ether].src eth_dst = packet[Ether].dst eth_type = hex(packet[Ether].type) # Mostra EtherType in esadecimale # Livello IP (Livello 3) - Filtriamo qui if packet.haslayer(IP): ip_src = packet[IP].src ip_dst = packet[IP].dst ip_proto = packet[IP].proto ip_ttl = packet[IP].ttl ip_len = packet[IP].len # Filtra per IP sorgente e destinazione if not ((ip_src == ip1 and ip_dst == ip2) or \ (ip_src == ip2 and ip_dst == ip1)): return # Ignora se non è tra gli IP specificati # Mappa numeri protocollo IP a nomi comuni proto_map = {1: "ICMP", 6: "TCP", 17: "UDP"} ip_proto_name = proto_map.get(ip_proto, f"Proto={ip_proto}") # Nome o numero # Variabili per Livello Trasporto (Livello 4) e Payload transport_proto = None sport = 0 dport = 0 icmp_type = -1 icmp_code = -1 payload_bytes = None sanitized_payload = "" # Estrai dettagli Trasporto e Payload if ip_proto_name == "TCP" and packet.haslayer(TCP): transport_proto = "TCP" sport = packet[TCP].sport dport = packet[TCP].dport # Controlla se c'è payload dopo TCP if packet[TCP].payload: # Accesso diretto al payload del layer # Verifica se il payload è Raw (potrebbe essere un altro layer) if isinstance(packet[TCP].payload, Raw): payload_bytes = bytes(packet[TCP].payload) # Converti Raw in bytes elif ip_proto_name == "UDP" and packet.haslayer(UDP): transport_proto = "UDP" sport = packet[UDP].sport dport = packet[UDP].dport if packet[UDP].payload: if isinstance(packet[UDP].payload, Raw): payload_bytes = bytes(packet[UDP].payload) elif ip_proto_name == "ICMP" and packet.haslayer(ICMP): transport_proto = "ICMP" icmp_type = packet[ICMP].type icmp_code = packet[ICMP].code if packet[ICMP].payload: if isinstance(packet[ICMP].payload, Raw): payload_bytes = bytes(packet[ICMP].payload) # Sanifica il payload se esiste if payload_bytes: try: payload_str = payload_bytes.decode('utf-8', errors='replace') sanitized_payload = sanitize_string(payload_str) except Exception: # Errore raro, ma gestiamolo sanitized_payload = f"[Errore decodifica grave]" payload_bytes = repr(payload_bytes) # Mostra repr sicuro # Stampa tutto con lock with print_lock: print("\n--- Pacchetto Rilevato ---") # Stampa Livello Ethernet print(f" L2 Ethernet: {eth_src} -> {eth_dst} (Tipo: {eth_type})") # Stampa Livello IP print(f" L3 IP: {ip_src} -> {ip_dst} (Protocollo: {ip_proto_name}, TTL: {ip_ttl}, Lunghezza IP: {ip_len})") # Stampa Livello Trasporto if transport_proto == "TCP" or transport_proto == "UDP": print(f" L4 {transport_proto}: Porta {sport} -> Porta {dport}") elif transport_proto == "ICMP": type_desc = {0: "Echo Reply", 3: "Dest Unreachable", 8: "Echo Request", 11: "Time Exceeded"}.get(icmp_type, f"Type={icmp_type}") code_desc = f"Code={icmp_code}" print(f" L4 ICMP: {type_desc}, {code_desc}") else: # Caso di protocollo IP non gestito specificamente sopra (es. IGMP) print(f" L4 Protocol: {ip_proto_name} (Dettagli non estratti)") # Stampa Payload if sanitized_payload: print(f" Payload: >> {sanitized_payload}") elif payload_bytes: # Payload esisteva ma non è stato sanificato (o errore) print(f" Payload: >> [Binario/Non stampabile o Errore: {len(payload_bytes)} bytes]") # else: Non c'era payload sys.stdout.flush() # --- Funzione target per il thread dello sniffer (INVARIATA) --- def run_sniffer(iface, bpf_filter, ip1, ip2, stop_event_obj): # ... (codice invariato, usa process_packet_func modificata) ... print("[Sniffer Thread] Avviato.") try: process_wrapper = lambda pkt: process_packet_func(pkt, ip1, ip2) stop_filter_func = lambda pkt: stop_event_obj.is_set() sniff(iface=iface, filter=bpf_filter, prn=process_wrapper, stop_filter=stop_filter_func, store=0) except PermissionError: with print_lock: print("\n[Sniffer Thread] ERRORE: Permessi insufficienti.") except OSError as e: with print_lock: print(f"\n[Sniffer Thread] ERRORE durante lo sniffing: {e}") except Exception as e: with print_lock: print(f"\n[Sniffer Thread] ERRORE imprevisto: {e}") import traceback; traceback.print_exc() finally: if not stop_event_obj.is_set(): with print_lock: print("[Sniffer Thread] Thread terminato inaspettatamente...") stop_event_obj.set() with print_lock: print("[Sniffer Thread] Terminato.") # --- Lock per la stampa (INVARIATO) --- print_lock = threading.Lock() # --- Main Execution (INVARIATO - setup e loop principale) --- if __name__ == "__main__": # ... (Controllo permessi, selezione interfaccia, selezione IP - INVARIATO) ... if not check_permissions(): print("\nATTENZIONE: Sono necessari privilegi elevati...") user_choice = input("Continuare comunque? (s/n): ").lower() if user_choice != 's': print("Uscita richiesta dall'utente.") sys.exit(1) available_interfaces = list_interfaces() interface_name_for_scapy = None # ... (selezione interfaccia interattiva) ... if available_interfaces: while True: choice = input("Inserisci l'indice o il nome dell'interfaccia... (Invio per default): ").strip() if not choice: break try: # Indice iface_index = int(choice) if iface_index in available_interfaces: interface_name_for_scapy = available_interfaces[iface_index] print(f"[*] Interfaccia selezionata per indice: '{interface_name_for_scapy}'") break else: print(f"ERRORE: Indice '{iface_index}' non valido.") except ValueError: # Nome is_valid_name = any(name == choice for name in available_interfaces.values()) if is_valid_name: interface_name_for_scapy = choice; print(f"[*] Interfaccia selezionata per nome: '{interface_name_for_scapy}'"); break else: confirm = input(f"ATTENZIONE: Nome '{choice}' non trovato... Usarlo comunque? (s/n): ").lower() if confirm == 's': interface_name_for_scapy = choice; print(f"[*] Uso dell'interfaccia '{interface_name_for_scapy}' come specificato."); break else: print("Selezione annullata. Riprova.") if interface_name_for_scapy: conf.iface = interface_name_for_scapy else: print("[!] Impossibile elencare interfacce...") ip_address_1 = ""; ip_address_2 = "" # ... (selezione IP interattiva) ... while not ip_address_1: ip_address_1 = input("Inserisci il primo indirizzo IP...: ").strip() while not ip_address_2: ip_address_2 = input("Inserisci il secondo indirizzo IP...: ").strip() #if ip_address_1 == ip_address_2: print("I due indirizzi IP devono essere diversi."); ip_address_2 = "" print(f"\n[*] Configurazione:") # ... (stampa config) print(f"[*] Interfaccia: {conf.iface if conf.iface else 'Default (Scelta da Scapy)'}") print(f"[*] Monitoraggio traffico tra: {ip_address_1} e {ip_address_2}") bpf_filter = f"(tcp or udp or icmp) and host {ip_address_1} and host {ip_address_2}" print(f"[*] Filtro BPF applicato: '{bpf_filter}'") sniffer_thread = threading.Thread( # ... (avvio thread) target=run_sniffer, args=(interface_name_for_scapy, bpf_filter, ip_address_1, ip_address_2, stop_event), daemon=True ) sniffer_thread.start() print("[Main Thread] Sniffer avviato in background.") print("[Main Thread] Digita 'quit' e premi Invio per fermare.") # --- Loop Input Utente --- try: # ... (loop input invariato) ... while True: if stop_event.is_set(): time.sleep(0.2) if not sniffer_thread.is_alive(): with print_lock: print("\n[Main Thread] Il thread dello sniffer è terminato.") break elif not sniffer_thread.is_alive(): with print_lock: print("\n[Main Thread] Il thread dello sniffer è terminato inaspettatamente.") stop_event.set(); break try: user_input = input("Comando ('quit' per uscire): ").strip().lower() if user_input == 'quit': with print_lock: print("\n[Main Thread] Richiesta di uscita...") stop_event.set(); break except EOFError: with print_lock: print("\n[Main Thread] Rilevata fine input (EOF). Uscita...") stop_event.set(); break except KeyboardInterrupt: # ... (gestione Ctrl+C invariata) ... with print_lock: print("\n[Main Thread] Interruzione forzata ricevuta (Ctrl+C)...") stop_event.set() finally: # ... (gestione uscita e join invariata, con correzione sintassi) ... if sniffer_thread.is_alive(): with print_lock: print("[Main Thread] In attesa della terminazione dello sniffer...") sniffer_thread.join(timeout=5.0) if sniffer_thread.is_alive(): with print_lock: print("[Main Thread] ATTENZIONE: Timeout attesa sniffer.") else: with print_lock: print("[Main Thread] Thread sniffer terminato.") print("[Main Thread] Programma terminato.")