# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import argparse import logging import os import platform import sys import codecs import threading import time import string # --- Blocco per importare Scapy e gestire avvisi --- logging.getLogger("scapy.runtime").setLevel(logging.ERROR) logging.getLogger("scapy.interactive").setLevel(logging.ERROR) try: # Aggiunto ARP per l'identificazione del protocollo L2 comune from scapy.all import (Ether, IP, TCP, UDP, ICMP, ARP, Raw, sniff, conf, get_if_list, get_if_addr, get_if_hwaddr) if platform.system() == "Windows": from scapy.arch.windows import get_windows_if_list except ImportError: print("\nERRORE: Scapy non sembra essere installato.") print("Esegui: pip install scapy") sys.exit(1) # --- Blocchi controllo permessi, elenco interfacce, evento, sanificazione (INVARIATI) --- def check_permissions(): # ... (invariato) if platform.system() == "Windows": try: import ctypes; return ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: return False else: try: return os.geteuid() == 0 except AttributeError: return False def list_interfaces(): # ... (invariato) print("\nInterfacce di rete disponibili:") interfaces = {} try: # ... (logica elenco interfacce) ... if platform.system() == "Windows": win_ifaces = get_windows_if_list(); # ... if not win_ifaces: print("Nessuna interfaccia trovata."); return None for i, iface in enumerate(win_ifaces): name_for_scapy = iface.get('name', iface.get('guid')) if name_for_scapy: ip = iface.get('ip_addresses', ['N/D'])[0] if iface.get('ip_addresses') else 'N/D' mac = iface.get('mac', 'N/D'); desc = iface.get('description', 'N/D') print(f" [{i}] {desc} (Nome Scapy: {name_for_scapy}, IP: {ip}, MAC: {mac})"); interfaces[i] = name_for_scapy else: print(f" [{i}] Interfaccia senza nome/GUID: {iface.get('description', 'N/D')}") else: # Linux/Unix if_names = get_if_list(); # ... if not if_names: print("Nessuna interfaccia trovata."); return None for i, name in enumerate(if_names): interfaces[i] = name; ip, mac = "N/D", "N/D" try: ip = get_if_addr(name); mac = get_if_hwaddr(name) except Exception: pass print(f" [{i}] {name} (IP: {ip}, MAC: {mac})") except Exception as e: print(f"\nERRORE elenco interfacce: {e}"); return None return interfaces stop_event = threading.Event() def sanitize_string(input_str): # ... (invariato) return "".join(char if char in string.printable else '.' for char in input_str) # --- Funzione per processare i pacchetti (MODIFICATA per gestire L2-only) --- def process_packet_func(packet, ip1, ip2, mac1, mac2): """ Analizza un pacchetto. Se ha IP e corrisponde a ip1/ip2, stampa L2/L3/L4/Payload. Se NON ha IP ma ha Ethernet e corrisponde a mac1/mac2 (se forniti), stampa L2 e L2-Payload. """ if not packet.haslayer(Ether): # Ignora pacchetti senza layer Ethernet (raro su reti comuni) return # Estrai sempre info L2 se disponibili eth_src = packet[Ether].src eth_dst = packet[Ether].dst eth_type_val = packet[Ether].type eth_type_hex = hex(eth_type_val) # Dizionario EtherTypes comuni per riferimento eth_type_map = {0x0800: "IPv4", 0x0806: "ARP", 0x86DD: "IPv6", 0x8100: "802.1Q VLAN"} eth_type_name = eth_type_map.get(eth_type_val, f"Tipo={eth_type_hex}") # CASO 1: Pacchetto IP (Filtra per IP) if packet.haslayer(IP): ip_src = packet[IP].src ip_dst = packet[IP].dst # Applica filtro IP if not ((ip_src == ip1 and ip_dst == ip2) or \ (ip_src == ip2 and ip_dst == ip1)): # Correzione: # if not ((ip_src == ip1 and ip_dst == ip2) or \ # (ip_src == ip2 and ip_dst == ip1)): return # Ignora se non è tra gli IP specificati # --- Logica esistente per estrarre L3/L4/Payload --- ip_proto = packet[IP].proto ip_ttl = packet[IP].ttl; ip_len = packet[IP].len proto_map = {1: "ICMP", 6: "TCP", 17: "UDP"} ip_proto_name = proto_map.get(ip_proto, f"Proto={ip_proto}") transport_proto = None; sport = 0; dport = 0; icmp_type = -1; icmp_code = -1 payload_bytes = None; sanitized_payload = "" if ip_proto_name == "TCP" and packet.haslayer(TCP): transport_proto = "TCP"; sport = packet[TCP].sport; dport = packet[TCP].dport if packet[TCP].payload and isinstance(packet[TCP].payload, Raw): payload_bytes = bytes(packet[TCP].payload) elif ip_proto_name == "UDP" and packet.haslayer(UDP): transport_proto = "UDP"; sport = packet[UDP].sport; dport = packet[UDP].dport if packet[UDP].payload and isinstance(packet[UDP].payload, Raw): payload_bytes = bytes(packet[UDP].payload) elif ip_proto_name == "ICMP" and packet.haslayer(ICMP): transport_proto = "ICMP"; icmp_type = packet[ICMP].type; icmp_code = packet[ICMP].code if packet[ICMP].payload and isinstance(packet[ICMP].payload, Raw): payload_bytes = bytes(packet[ICMP].payload) if payload_bytes: try: payload_str = payload_bytes.decode('utf-8', errors='replace'); sanitized_payload = sanitize_string(payload_str) except Exception: sanitized_payload = f"[Errore decodifica]"; payload_bytes = repr(payload_bytes) # Stampa per pacchetto IP with print_lock: print("\n--- Pacchetto IP Rilevato ---") print(f" L2 Ethernet: {eth_src} -> {eth_dst} (Tipo: {eth_type_name} - {eth_type_hex})") print(f" L3 IP: {ip_src} -> {ip_dst} (Protocollo: {ip_proto_name}, TTL: {ip_ttl}, Lunghezza IP: {ip_len})") if transport_proto == "TCP" or transport_proto == "UDP": print(f" L4 {transport_proto}: Porta {sport} -> Porta {dport}") elif transport_proto == "ICMP": type_desc = {0: "Echo Reply", 3: "Dest Unreachable", 8: "Echo Request", 11: "Time Exceeded"}.get(icmp_type, f"Type={icmp_type}"); code_desc = f"Code={icmp_code}"; print(f" L4 ICMP: {type_desc}, {code_desc}") else: print(f" L4 Protocol: {ip_proto_name} (Dettagli non estratti)") if sanitized_payload: print(f" Payload: >> {sanitized_payload}") elif payload_bytes: print(f" Payload: >> [Binario/Non stampabile o Errore: {len(payload_bytes)} bytes]") sys.stdout.flush() # CASO 2: Pacchetto Non-IP (Filtra per MAC se specificati) else: # Controlla solo se sono stati forniti MAC per il filtro L2 if mac1 and mac2: # Applica filtro MAC if not ((eth_src == mac1 and eth_dst == mac2) or \ (eth_src == mac2 and eth_dst == mac1)): return # Ignora se non è tra i MAC specificati # --- Pacchetto L2 Rilevato e Filtrato --- with print_lock: print("\n--- Pacchetto L2 (Non-IP) Rilevato ---") print(f" L2 Ethernet: {eth_src} -> {eth_dst} (Tipo: {eth_type_name} - {eth_type_hex})") # Tenta di identificare protocolli L2 comuni (es. ARP) if eth_type_val == 0x0806 and packet.haslayer(ARP): arp_op = packet[ARP].op # 1=request, 2=reply arp_op_str = {1: "Request", 2: "Reply"}.get(arp_op, f"Op={arp_op}") print(f" L2 Payload: ARP {arp_op_str}") print(f" ARP Sender: MAC={packet[ARP].hwsrc}, IP={packet[ARP].psrc}") print(f" ARP Target: MAC={packet[ARP].hwdst}, IP={packet[ARP].pdst}") else: # Altri tipi L2 o payload non riconosciuto facilmente # Estrai payload dopo Ethernet (se c'è) l2_payload = packet[Ether].payload if l2_payload: # Mostra rappresentazione del layer successivo o Raw if isinstance(l2_payload, Raw): payload_bytes = bytes(l2_payload) try: payload_str = payload_bytes.decode('utf-8', errors='replace'); sanitized_payload = sanitize_string(payload_str) except Exception: sanitized_payload = "[Errore decodifica]"; payload_bytes = repr(payload_bytes) print(f" L2 Payload: Tipo={eth_type_name}. Dati grezzi:") if sanitized_payload: print(f" >> {sanitized_payload}") elif payload_bytes: print(f" >> [Binario/Errore: {len(payload_bytes)} bytes]") else: # Mostra il riassunto del layer successivo fornito da Scapy print(f" L2 Payload: {l2_payload.summary()}") else: print(" L2 Payload: (Nessun payload dopo Ethernet)") sys.stdout.flush() # Se MAC non sono stati forniti, non stampiamo nulla per pacchetti non-IP # per evitare flooding (a meno che non si cambi il filtro BPF) # --- Funzione target per il thread dello sniffer (INVARIATA nella struttura) --- def run_sniffer(iface, bpf_filter, ip1, ip2, mac1, mac2, stop_event_obj): print("[Sniffer Thread] Avviato.") try: # Passa anche mac1 e mac2 alla funzione di processing process_wrapper = lambda pkt: process_packet_func(pkt, ip1, ip2, mac1, mac2) stop_filter_func = lambda pkt: stop_event_obj.is_set() sniff(iface=iface, filter=bpf_filter, prn=process_wrapper, stop_filter=stop_filter_func, store=0) # ... (Gestione errori e finally come prima) ... except PermissionError: # ... with print_lock: print("\n[Sniffer Thread] ERRORE: Permessi insufficienti.") except OSError as e: # ... with print_lock: print(f"\n[Sniffer Thread] ERRORE durante lo sniffing: {e}") except Exception as e: # ... with print_lock: print(f"\n[Sniffer Thread] ERRORE imprevisto: {e}"); import traceback; traceback.print_exc() finally: if not stop_event_obj.is_set(): with print_lock: print("[Sniffer Thread] Thread terminato inaspettatamente...") stop_event_obj.set() with print_lock: print("[Sniffer Thread] Terminato.") # --- Lock per la stampa (INVARIATO) --- print_lock = threading.Lock() # --- Main Execution (MODIFICATO per input MAC e filtro BPF) --- if __name__ == "__main__": # ... (Controllo permessi come prima) ... if not check_permissions(): print("\nATTENZIONE: Sono necessari privilegi elevati...") user_choice = input("Continuare comunque? (s/n): ").lower(); if user_choice != 's': print("Uscita richiesta dall'utente."); sys.exit(1) # --- Selezione Interattiva Interfaccia --- available_interfaces = list_interfaces() interface_name_for_scapy = None # ... (selezione interfaccia interattiva - INVARIATA) ... if available_interfaces: while True: choice = input("Inserisci l'indice o il nome dell'interfaccia... (Invio per default): ").strip() if not choice: break try: # Indice iface_index = int(choice); # ... if iface_index in available_interfaces: interface_name_for_scapy = available_interfaces[iface_index]; print(f"[*] Interfaccia selezionata per indice: '{interface_name_for_scapy}'"); break else: print(f"ERRORE: Indice '{iface_index}' non valido.") except ValueError: # Nome is_valid_name = any(name == choice for name in available_interfaces.values()); # ... if is_valid_name: interface_name_for_scapy = choice; print(f"[*] Interfaccia selezionata per nome: '{interface_name_for_scapy}'"); break else: confirm = input(f"ATTENZIONE: Nome '{choice}' non trovato... Usarlo comunque? (s/n): ").lower() if confirm == 's': interface_name_for_scapy = choice; print(f"[*] Uso dell'interfaccia '{interface_name_for_scapy}' come specificato."); break else: print("Selezione annullata. Riprova.") if interface_name_for_scapy: conf.iface = interface_name_for_scapy else: print("[!] Impossibile elencare interfacce...") # --- Selezione Interattiva Indirizzi IP (Rimane uguale) --- ip_address_1 = ""; ip_address_2 = "" print("\n--- Filtro Livello 3 (IP) ---") while not ip_address_1: ip_address_1 = input("Inserisci il primo indirizzo IP (o Invio per non filtrare su IP): ").strip() if ip_address_1: # Chiedi il secondo solo se il primo è stato dato while not ip_address_2: ip_address_2 = input("Inserisci il secondo indirizzo IP: ").strip() if not ip_address_2: print("Il secondo IP è necessario se hai specificato il primo.") elif ip_address_1 == ip_address_2: print("I due indirizzi IP devono essere diversi."); ip_address_2 = "" else: print("[*] Non verrà applicato un filtro specifico sugli indirizzi IP.") ip_address_2 = None # Assicura che sia None se il primo è vuoto # --- Selezione Interattiva Indirizzi MAC (NUOVO) --- mac_address_1 = ""; mac_address_2 = ""; print("\n--- Filtro Livello 2 (Ethernet) ---") while True: # Ciclo per validare formato MAC (semplice) mac_address_1 = input("Inserisci il primo indirizzo MAC (es. aa:bb:cc:11:22:33) (o Invio per non filtrare su MAC): ").strip().lower() if not mac_address_1: break # Utente non vuole filtro MAC # Aggiungere qui una validazione più robusta del formato MAC se necessario if len(mac_address_1) != 17 or mac_address_1.count(':') != 5: print("Formato MAC non valido. Usa xx:xx:xx:xx:xx:xx. Riprova.") mac_address_1 = "" # Resetta per richiederlo else: break # Formato base OK if mac_address_1: # Chiedi il secondo solo se il primo è stato dato while True: mac_address_2 = input("Inserisci il secondo indirizzo MAC: ").strip().lower() if not mac_address_2: print("Il secondo MAC è necessario se hai specificato il primo."); continue if len(mac_address_2) != 17 or mac_address_2.count(':') != 5: print("Formato MAC non valido. Riprova."); continue if mac_address_1 == mac_address_2: print("I due indirizzi MAC devono essere diversi."); continue break # Tutto OK else: print("[*] Non verrà applicato un filtro specifico sugli indirizzi MAC per i pacchetti non-IP.") mac_address_2 = None # --- Costruzione Filtro BPF Dinamico --- ip_filter_part = "" mac_filter_part = "" final_filter = "" if ip_address_1 and ip_address_2: # Filtro IP specifico (includendo TCP/UDP/ICMP per efficienza) ip_filter_part = f"((tcp or udp or icmp) and host {ip_address_1} and host {ip_address_2})" if mac_address_1 and mac_address_2: # Filtro MAC specifico mac_filter_part = f"(ether host {mac_address_1} and ether host {mac_address_2})" # Combina i filtri if ip_filter_part and mac_filter_part: # Filtra per MAC *o* per IP (cattura entrambi i tipi di traffico specifici) final_filter = f"{mac_filter_part} or {ip_filter_part}" elif ip_filter_part: # Filtra solo per IP final_filter = ip_filter_part elif mac_filter_part: # Filtra solo per MAC (catturerà anche IP tra quei MAC, ma va bene) final_filter = mac_filter_part else: # Nessun filtro specifico richiesto, cattura tutto (potrebbe essere molto!) # Potremmo decidere di non permetterlo o mettere un filtro base tipo 'ip or arp' print("[ATTENZIONE] Nessun filtro IP o MAC specificato. Lo sniffing catturerà molto traffico.") final_filter = "" # Cattura tutto sull'interfaccia (o usa un filtro più generico) # final_filter = "ip or arp" # Alternativa: cattura solo IP e ARP print(f"\n[*] Configurazione Finale:") print(f"[*] Interfaccia: {conf.iface if conf.iface else 'Default (Scelta da Scapy)'}") print(f"[*] Filtro IP: {ip_address_1 if ip_address_1 else 'N/D'} <-> {ip_address_2 if ip_address_2 else 'N/D'}") print(f"[*] Filtro MAC (per non-IP): {mac_address_1 if mac_address_1 else 'N/D'} <-> {mac_address_2 if mac_address_2 else 'N/D'}") print(f"[*] Filtro BPF Applicato: '{final_filter if final_filter else 'Nessuno (Cattura Tutto!)'}'") # --- Creazione e Avvio Thread Sniffer --- sniffer_thread = threading.Thread( target=run_sniffer, # Passa anche i MAC (possono essere None) args=(interface_name_for_scapy, final_filter, ip_address_1, ip_address_2, mac_address_1, mac_address_2, stop_event), daemon=True ) sniffer_thread.start() print("[Main Thread] Sniffer avviato in background.") print("[Main Thread] Digita 'quit' e premi Invio per fermare.") # --- Loop Input Utente --- try: # ... (loop input invariato) ... while True: if stop_event.is_set(): time.sleep(0.2) if not sniffer_thread.is_alive(): with print_lock: print("\n[Main Thread] Il thread dello sniffer è terminato.") break elif not sniffer_thread.is_alive(): with print_lock: print("\n[Main Thread] Il thread dello sniffer è terminato inaspettatamente.") stop_event.set(); break try: user_input = input("Comando ('quit' per uscire): ").strip().lower() if user_input == 'quit': with print_lock: print("\n[Main Thread] Richiesta di uscita...") stop_event.set(); break except EOFError: with print_lock: print("\n[Main Thread] Rilevata fine input (EOF). Uscita...") stop_event.set(); break except KeyboardInterrupt: # ... (gestione Ctrl+C invariata) ... with print_lock: print("\n[Main Thread] Interruzione forzata ricevuta (Ctrl+C)...") stop_event.set() finally: # ... (gestione uscita e join invariata, con correzione sintassi) ... if sniffer_thread.is_alive(): with print_lock: print("[Main Thread] In attesa della terminazione dello sniffer...") sniffer_thread.join(timeout=5.0) if sniffer_thread.is_alive(): with print_lock: print("[Main Thread] ATTENZIONE: Timeout attesa sniffer.") else: with print_lock: print("[Main Thread] Thread sniffer terminato.") print("[Main Thread] Programma terminato.")