# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import argparse import logging import os import platform import sys import threading # Ancora necessario per l'input utente, ma non per gestire sniff import time # import queue # Non più strettamente necessario con AsyncSniffer, ma può rimanere se si vuole disaccoppiare # --- Blocco per importare Scapy e gestire avvisi --- logging.getLogger("scapy.runtime").setLevel(logging.ERROR) logging.getLogger("scapy.interactive").setLevel(logging.ERROR) try: from scapy.all import (Ether, Raw, sendp, conf, AsyncSniffer, # <-- Cambiato sniff con AsyncSniffer get_if_list, get_if_hwaddr) if platform.system() == "Windows": from scapy.arch.windows import get_windows_if_list except ImportError: print("\nERRORE: Scapy non sembra essere installato.") print("Esegui: pip install scapy") sys.exit(1) # --- Blocco per controllo permessi (Invariato) --- def check_permissions(): # ... (Codice identico) ... if platform.system() == "Windows": try: import ctypes; return ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: return False else: try: return os.geteuid() == 0 except AttributeError: return False # --- Funzione per Elencare Interfacce (Invariata) --- def list_interfaces(): # ... (Codice identico) ... print("\nInterfacce di rete disponibili:") interfaces = {} if platform.system() == "Windows": try: win_ifaces = get_windows_if_list() if not win_ifaces: print("Nessuna interfaccia trovata."); return None for i, iface in enumerate(win_ifaces): name_for_scapy = iface.get('name', iface.get('guid')) if name_for_scapy: desc=iface.get('description','N/D'); ip=iface.get('ip_addresses',['N/D'])[0] if iface.get('ip_addresses') else 'N/D'; mac=iface.get('mac','N/D') print(f" [{i}] {desc} (Nome Scapy: {name_for_scapy}, IP: {ip}, MAC: {mac})"); interfaces[i]=name_for_scapy else: print(f" [{i}] Interfaccia senza nome/GUID: {iface.get('description', 'N/D')}") except Exception as e: print(f"Errore elenco Win: {e}. Tentativo generico..."); if_names=get_if_list() for i, name in enumerate(if_names): interfaces[i]=name; print(f" [{i}] {name}") else: if_names = get_if_list() if not if_names: print("Nessuna interfaccia trovata."); return None for i, name in enumerate(if_names): interfaces[i]=name; mac=get_if_hwaddr(name); ip=conf.ifaces.get(name,{}).ip if conf.ifaces.get(name) else 'N/D' print(f" [{i}] {name} (IP: {ip}, MAC: {mac})") return interfaces # --- Funzione per processare i pacchetti ricevuti (Invariata) --- # Questa funzione viene ora chiamata dal thread interno di AsyncSniffer def process_packet(packet): """Processa un pacchetto ricevuto.""" if packet.haslayer(Ether) and packet.haslayer(Raw): src_mac = packet[Ether].src payload_bytes = packet[Raw].load try: payload_str = payload_bytes.decode('utf-8', errors='replace') print(f"\n[RECV] Da {src_mac}: {payload_str}") print("Enter message, 'dest MAC=...', or 'quit': ", end='', flush=True) # Ristampa prompt except Exception as e: print(f"\n[RECV] Errore decodifica da {src_mac}: {e}") print("Enter message, 'dest MAC=...', or 'quit': ", end='', flush=True) # --- Funzione Mittente (Invariata) --- def send_ether_frame(interface, dst_mac, src_mac, ether_type, payload): """Invia un frame Ethernet raw.""" try: payload_bytes = payload.encode('utf-8') ether_frame = Ether(dst=dst_mac, src=src_mac, type=ether_type) packet = ether_frame / Raw(load=payload_bytes) sendp(packet, iface=interface, verbose=False) return True except Exception as e: print(f"\n[SEND] ERRORE invio: {e}") return False # --- Main Execution --- if __name__ == "__main__": if not check_permissions(): print("\nATTENZIONE: Privilegi elevati (sudo/Admin) necessari.") if input("Continuare? (s/n): ").lower() != 's': sys.exit(1) parser = argparse.ArgumentParser(description="Chat Ethernet minimale con AsyncSniffer.") parser.add_argument('--iface', help='Indice o Nome interfaccia') parser.add_argument('--etype', type=lambda x: int(x, 0), default=0xAAAA, help='EtherType (default: 0xAAAA)') parser.add_argument('--dst', default="FF:FF:FF:FF:FF:FF", help='MAC Destinazione iniziale (default: broadcast)') # --- Selezione Interfaccia (Invariata) --- available_interfaces = list_interfaces() if not available_interfaces: sys.exit(1) args = parser.parse_args() interface_name_for_scapy = None # ... (Logica di selezione interattiva o da argomenti identica a prima) ... if args.iface: selected_iface_input = args.iface try: iface_index = int(selected_iface_input) if iface_index in available_interfaces: interface_name_for_scapy = available_interfaces[iface_index] else: print(f"ERRORE: Indice interfaccia '{iface_index}' non valido.") except ValueError: is_valid_name = any(name == selected_iface_input for name in available_interfaces.values()) if is_valid_name: interface_name_for_scapy = selected_iface_input else: print(f"ATTENZIONE: Nome '{selected_iface_input}' non nell'elenco. Uso diretto."); interface_name_for_scapy = selected_iface_input else: while not interface_name_for_scapy: try: idx_str = input("Indice [X] interfaccia: "); iface_index = int(idx_str) if iface_index in available_interfaces: interface_name_for_scapy = available_interfaces[iface_index] else: print("Indice non valido.") except ValueError: print("Inserisci numero.") except (EOFError, KeyboardInterrupt): print("\nUscita."); sys.exit(0) if not interface_name_for_scapy: sys.exit(1) print(f"\n[*] Uso interfaccia: '{interface_name_for_scapy}'") # --- Ottieni MAC Sorgente (Invariato) --- try: src_mac = get_if_hwaddr(interface_name_for_scapy) print(f"[*] MAC Sorgente: {src_mac}") except Exception as e: print(f"\nERRORE MAC sorgente: {e}"); sys.exit(1) ether_type = args.etype destination_mac = args.dst.strip() print(f"[*] EtherType: {hex(ether_type)}") print(f"[*] Destinazione iniziale: {destination_mac}") # --- Configurazione e Avvio AsyncSniffer --- sniffer = None # Inizializza a None try: print("[MAIN] Avvio sniffer asincrono...") sniffer = AsyncSniffer( iface=interface_name_for_scapy, prn=process_packet, filter=f"ether proto {ether_type}", store=0 # Non memorizzare pacchetti in memoria ) sniffer.start() # Avvia lo sniffing nel suo thread print("[MAIN] Sniffer avviato.") time.sleep(1) # Dai tempo allo sniffer di avviarsi except OSError as e: print(f"\nERRORE Avvio sniffer: {e}") print(" -> Verifica Npcap/libpcap, permessi e interfaccia.") sys.exit(1) except Exception as e: print(f"\nERRORE Avvio sniffer: {e}") sys.exit(1) # --- Loop Input Utente (Thread Principale - Invariato) --- print("\n--- Chat Ethernet ---") print("Messaggio | dest MAC=XX:XX:XX:XX:XX:XX | quit") try: while True: user_input = input("Enter command: ") if not sniffer or not sniffer.running: print("\n[MAIN] Sniffer non è in esecuzione. Uscita.") break if user_input.lower() == 'quit': print("\n[MAIN] Richiesta di uscita...") break elif user_input.lower().startswith("dest mac="): new_dest = user_input[len("dest mac="):].strip() if len(new_dest.split(':')) == 6: destination_mac = new_dest print(f"[MAIN] Nuova destinazione: {destination_mac}") else: print("[MAIN] Formato MAC non valido.") elif user_input: send_ether_frame(interface_name_for_scapy, destination_mac, src_mac, ether_type, user_input) except (KeyboardInterrupt, EOFError): print("\n[MAIN] Interruzione ricevuta...") finally: # --- Sequenza di Shutdown --- print("\n[MAIN] Avvio procedura di uscita...") if sniffer and sniffer.running: print("[MAIN] Arresto sniffer...") sniffer.stop() # Richiede allo sniffer di fermarsi print("[MAIN] Attesa terminazione sniffer...") sniffer.join(timeout=5) # Attende che il thread dello sniffer termini if sniffer.running: # Controlla se è ancora vivo dopo join print("[MAIN] ATTENZIONE: Lo sniffer non è terminato entro il timeout.") # In questo caso, il processo potrebbe bloccarsi qui o uscire comunque # lasciando il thread in esecuzione (se non era daemon), # ma AsyncSniffer dovrebbe gestire meglio lo stop. else: print("[MAIN] Sniffer terminato.") elif sniffer: print("[MAIN] Lo sniffer non era in esecuzione al momento dell'uscita.") else: print("[MAIN] Lo sniffer non è mai stato avviato correttamente.") print("[MAIN] Programma terminato.")