# compatibile Windows 11 # compatibile Ubuntu 24.10 # compatibile python 3.12.7 import argparse # Lo manteniamo se vuoi aggiungere altri argomenti in futuro import logging import os import platform import sys import threading import time # --- Blocco per importare Scapy e gestire avvisi --- logging.getLogger("scapy.runtime").setLevel(logging.ERROR) logging.getLogger("scapy.interactive").setLevel(logging.ERROR) try: from scapy.all import (IP, UDP, Raw, send, conf, AsyncSniffer, get_if_list, get_if_addr, get_if_hwaddr) if platform.system() == "Windows": from scapy.arch.windows import get_windows_if_list except ImportError: print("\nERRORE: Scapy non sembra essere installato.") print("Esegui: pip install scapy") sys.exit(1) # --- Blocco per controllo permessi --- def check_permissions(): if platform.system() == "Windows": try: import ctypes; return ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: return False else: try: return os.geteuid() == 0 except AttributeError: return False # --- Funzione per Elencare Interfacce --- def list_interfaces(): """Elenca le interfacce di rete mostrando anche l'indirizzo IP.""" print("\nInterfacce di rete disponibili:") interfaces = {} try: if platform.system() == "Windows": win_ifaces = get_windows_if_list() if not win_ifaces: print("Nessuna interfaccia trovata."); return None for i, iface in enumerate(win_ifaces): name_for_scapy = iface.get('name', iface.get('guid')) if name_for_scapy: ip = iface.get('ip_addresses', ['N/D'])[0] if iface.get('ip_addresses') else 'N/D' mac = iface.get('mac', 'N/D') desc = iface.get('description', 'N/D') print(f" [{i}] {desc} (Nome Scapy: {name_for_scapy}, IP: {ip}, MAC: {mac})") interfaces[i] = name_for_scapy else: print(f" [{i}] Interfaccia senza nome/GUID: {iface.get('description', 'N/D')}") else: # Linux/Unix if_names = get_if_list() if not if_names: print("Nessuna interfaccia trovata."); return None for i, name in enumerate(if_names): interfaces[i] = name try: ip = get_if_addr(name) mac = get_if_hwaddr(name) print(f" [{i}] {name} (IP: {ip}, MAC: {mac})") except Exception: print(f" [{i}] {name} (IP/MAC non ottenibili)") except Exception as e: print(f"\nERRORE durante l'elenco delle interfacce: {e}") print(" -> Potrebbe essere un problema con Npcap/libpcap o permessi.") return None return interfaces # --- Funzione per processare i pacchetti ricevuti --- def process_packet(packet): """Processa un pacchetto UDP ricevuto sulla porta corretta.""" if packet.haslayer(IP) and packet.haslayer(UDP) and packet.haslayer(Raw): src_ip = packet[IP].src dst_ip = packet[IP].dst sport = packet[UDP].sport dport = packet[UDP].dport payload_bytes = packet[Raw].load try: payload_str = payload_bytes.decode('utf-8', errors='replace') print(f"\n[RECV UDP] Da {src_ip}:{sport}: {payload_str}") print("Inserisci comando: ", end='', flush=True) # Ristampa prompt except Exception as e: print(f"\n[RECV UDP] Errore decodifica da {src_ip}:{sport}: {e}") print("Inserisci comando: ", end='', flush=True) # Ristampa prompt # --- Funzione Mittente (IP/UDP) --- def send_ip_udp_packet(dst_ip, dst_port, src_ip=None, src_port=9999, payload=""): """Invia un pacchetto IP/UDP.""" try: payload_bytes = payload.encode('utf-8') ip_layer = IP(dst=dst_ip, src=src_ip) udp_layer = UDP(dport=dst_port, sport=src_port) packet = ip_layer / udp_layer / Raw(load=payload_bytes) send(packet, verbose=False) return True except ValueError as e: print(f"\n[SEND UDP] ERRORE Indirizzo IP/Porta non valido?: {e}") except OSError as e: # Spesso indica problemi di permessi o interfaccia non valida per l'invio print(f"\n[SEND UDP] ERRORE Sistema Operativo durante l'invio: {e}") print("[SEND UDP] -> Verifica l'interfaccia selezionata (se presente), IP/Porte e permessi.") except Exception as e: print(f"\n[SEND UDP] ERRORE invio generico: {e}") return False # --- Main Execution --- if __name__ == "__main__": if not check_permissions(): print("\nATTENZIONE: Privilegi elevati (sudo/Admin) potrebbero essere necessari per inviare/ricevere IP/UDP o usare certe interfacce.") if input("Continuare? (s/n): ").lower() != 's': sys.exit(1) # --- Selezione Interattiva Interfaccia --- available_interfaces = list_interfaces() interface_name_for_scapy = None if available_interfaces: # Se sono state trovate interfacce while True: choice = input("Inserisci l'indice o il nome dell'interfaccia da usare (premi Invio per lasciare scegliere a Scapy): ").strip() if not choice: # L'utente ha premuto Invio print("[*] Nessuna interfaccia specifica selezionata. Scapy userà la tabella di routing.") break try: # Prova a interpretare come indice iface_index = int(choice) if iface_index in available_interfaces: interface_name_for_scapy = available_interfaces[iface_index] print(f"[*] Interfaccia selezionata per indice: '{interface_name_for_scapy}'") break else: print(f"ERRORE: Indice '{iface_index}' non valido. Riprova.") except ValueError: # Interpreta come nome is_valid_name = any(name == choice for name in available_interfaces.values()) if is_valid_name: interface_name_for_scapy = choice print(f"[*] Interfaccia selezionata per nome: '{interface_name_for_scapy}'") break else: # Nome non trovato, chiedi conferma se usarlo comunque confirm = input(f"ATTENZIONE: Nome '{choice}' non trovato nell'elenco. Usarlo comunque? (s/n): ").lower() if confirm == 's': interface_name_for_scapy = choice print(f"[*] Uso dell'interfaccia '{interface_name_for_scapy}' come specificato.") break else: print("Selezione annullata. Riprova.") else: print("[!] Impossibile elencare interfacce. Scapy proverà a usare un default, ma potrebbe fallire.") # Continua comunque, lasciando interface_name_for_scapy a None # --- Impostazione Interfaccia e IP Sorgente (se selezionata) --- source_ip = None # Inizializza source_ip if interface_name_for_scapy: print(f"\n[*] Uso specifico dell'interfaccia: '{interface_name_for_scapy}'") conf.iface = interface_name_for_scapy try: source_ip = get_if_addr(interface_name_for_scapy) if source_ip == "0.0.0.0": # A volte get_if_addr restituisce 0.0.0.0 print(f"[*] Rilevato IP 0.0.0.0 per '{interface_name_for_scapy}'. Lasceremo Scapy scegliere l'IP sorgente.") source_ip = None # Tratta come se non fossimo riusciti a trovarlo else: print(f"[*] IP Sorgente (potenziale) dall'interfaccia: {source_ip}") except Exception: print(f"[*] Impossibile rilevare IP per l'interfaccia '{interface_name_for_scapy}'. Scapy sceglierà l'IP sorgente.") source_ip = None # Lascia che Scapy scelga else: # Nessuna interfaccia selezionata o scelta non valida pass # source_ip rimane None, Scapy sceglierà # --- Selezione Interattiva Porte e IP Destinazione --- DEFAULT_PORT = 9999 DEFAULT_DEST_IP = "127.0.0.1" while True: try: lport_str = input(f"Inserisci la porta UDP locale su cui ascoltare (default: {DEFAULT_PORT}): ").strip() listen_port = int(lport_str) if lport_str else DEFAULT_PORT if not (0 < listen_port <= 65535): raise ValueError("La porta deve essere tra 1 e 65535.") break except ValueError as e: print(f"ERRORE: Input non valido. {e}. Inserisci un numero di porta valido.") while True: try: dport_str = input(f"Inserisci la porta UDP di destinazione iniziale (default: {DEFAULT_PORT}): ").strip() destination_port = int(dport_str) if dport_str else DEFAULT_PORT if not (0 < destination_port <= 65535): raise ValueError("La porta deve essere tra 1 e 65535.") break except ValueError as e: print(f"ERRORE: Input non valido. {e}. Inserisci un numero di porta valido.") while True: dest_ip_str = input(f"Inserisci l'indirizzo IP di destinazione iniziale (default: {DEFAULT_DEST_IP}): ").strip() destination_ip = dest_ip_str if dest_ip_str else DEFAULT_DEST_IP if not destination_ip: # Controllo semplice che non sia vuoto se non si usa il default print("ERRORE: L'indirizzo IP non può essere vuoto.") else: # Qui potresti aggiungere una validazione più robusta dell'IP se necessario # import ipaddress # try: ipaddress.ip_address(destination_ip); break # except ValueError: print("ERRORE: Formato IP non valido.") break # Per ora, accettiamo qualsiasi stringa non vuota print(f"\n[*] Configurazione:") print(f"[*] Interfaccia: {conf.iface if conf.iface else 'Scelta da Scapy'}") if source_ip: print(f"[*] IP Sorgente (rilevato): {source_ip}") print(f"[*] Porta Ascolto: {listen_port}") print(f"[*] Destinazione Iniziale: {destination_ip}:{destination_port}") # --- Configurazione e Avvio AsyncSniffer --- sniffer = None bpf_filter = f"udp and dst port {listen_port}" print(f"[*] Filtro Sniffer: \"{bpf_filter}\"") try: print("[MAIN] Avvio sniffer asincrono...") sniffer = AsyncSniffer( iface=interface_name_for_scapy, # Usa l'interfaccia scelta o None prn=process_packet, filter=bpf_filter, store=0 ) sniffer.start() print("[MAIN] Sniffer avviato.") time.sleep(1) # Dai tempo allo sniffer di avviarsi completamente except OSError as e: print(f"\nERRORE Avvio sniffer: {e}") print(" -> Verifica Npcap/libpcap, permessi e interfaccia (se specificata).") sys.exit(1) except Exception as e: print(f"\nERRORE Avvio sniffer generico: {e}") sys.exit(1) # --- Loop Input Utente (Thread Principale) --- print("\n--- Chat IP/UDP ---") print("Inserisci messaggio | dest ip=A.B.C.D port=N | quit") try: while True: # Verifica se lo sniffer è ancora attivo prima di chiedere input # Usa sniffer.thread.is_alive() come nella correzione precedente if not sniffer or not sniffer.thread or not sniffer.thread.is_alive(): print("\n[MAIN] Lo sniffer ha smesso di funzionare. Uscita.") break user_input = input("Inserisci comando: ") # Modificato prompt if user_input.lower() == 'quit': print("\n[MAIN] Richiesta di uscita...") break elif user_input.lower().startswith("dest "): parts = user_input.split() new_ip = None new_port = None valid_change = True for part in parts[1:]: if part.lower().startswith("ip="): new_ip = part[len("ip="):].strip() # Aggiungi validazione IP se vuoi essere più sicuro elif part.lower().startswith("port="): try: port_val = int(part[len("port="):].strip()) if 0 < port_val <= 65535: new_port = port_val else: print("[MAIN] Porta non valida (1-65535). Modifica ignorata.") valid_change = False except ValueError: print("[MAIN] Porta non numerica. Modifica ignorata.") valid_change = False if valid_change: if new_ip: destination_ip = new_ip print(f"[MAIN] Nuovo IP Destinazione: {destination_ip}") if new_port is not None: destination_port = new_port print(f"[MAIN] Nuova Porta Destinazione: {destination_port}") elif user_input: # Se l'utente ha digitato un messaggio # Determina IP sorgente da usare: quello rilevato se disponibile, altrimenti None effective_src_ip = source_ip if source_ip else None send_ip_udp_packet(destination_ip, destination_port, src_ip=effective_src_ip, # Usa IP rilevato se possibile src_port=listen_port, # Invia dalla nostra porta di ascolto payload=user_input) except (KeyboardInterrupt, EOFError): print("\n[MAIN] Interruzione ricevuta...") finally: # --- Sequenza di Shutdown --- print("\n[MAIN] Avvio procedura di uscita...") if sniffer and sniffer.thread and sniffer.thread.is_alive(): print("[MAIN] Arresto sniffer...") sniffer.stop() print("[MAIN] Attesa terminazione sniffer...") sniffer.join(timeout=5) if sniffer.thread.is_alive(): print("[MAIN] ATTENZIONE: Lo sniffer non è terminato entro il timeout.") else: print("[MAIN] Sniffer terminato.") elif sniffer and sniffer.thread: print("[MAIN] Lo sniffer non era in esecuzione al momento dell'uscita.") elif sniffer: print("[MAIN] Lo sniffer esiste ma il suo thread interno non è stato trovato/avviato correttamente.") else: print("[MAIN] Lo sniffer non è mai stato avviato correttamente.") print("[MAIN] Programma terminato.")