diff --git a/probenpwn.py b/probenpwn.py index 19ac82b..0819b65 100644 --- a/probenpwn.py +++ b/probenpwn.py @@ -4,7 +4,6 @@ import threading import os import subprocess import random -import toml import pwnagotchi.plugins as plugins class probenpwn(plugins.Plugin): @@ -19,12 +18,12 @@ class probenpwn(plugins.Plugin): def __init__(self): logging.debug("ProbeNpwn plugin created") - self._agent = None self.old_name = None self.recents = {} self.attack_threads = [] self.epoch_duration = 60 # default epoch duration in seconds self._watchdog_thread = None + self._watchdog_thread_running = True # Track number of attack attempts per AP MAC address self.attack_attempts = {} # Optionally, track the number of successful handshakes per AP @@ -35,59 +34,61 @@ class probenpwn(plugins.Plugin): # Track the performance of each AP for dynamic adjustments self.performance_stats = {} self.whitelist = set() - - # Load whitelist from the global Pwnagotchi config - self.load_whitelist() + + def on_loaded(self): + logging.info(f"Plugin ProbeNpwn loaded") - def load_whitelist(self): + def on_config_changed(self, config): """Load the whitelist from Pwnagotchi's global config.""" try: - with open('/etc/pwnagotchi/config.toml', 'r') as config_file: - data = toml.load(config_file) - # Load SSIDs from the whitelist in the global config - self.whitelist = set(data.get('main', {}).get('whitelist', [])) - logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}") - except Exception as e: - logging.error(f"Failed to load whitelist from config: {e}") + self.whitelist = set(config["main"].get("whitelist", [])) + except KeyError: self.whitelist = set() + logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}") + + try: + self.debug_log_path = config["main"]["log"].get("path-debug", None) + except KeyError: + logging.error(f"Failed to configure debug log path") + + self.old_name = config.get("main").get("name", "") def on_unload(self, ui): - if self.old_name: - ui.set('name', "%s " % self.old_name) - else: - ui.set('name', "%s> " % ui.get('name')[:-3]) - self.old_name = None + with ui._lock: + if self.old_name: + ui.set('name', f"{self.old_name}>") + + try: + self._watchdog_thread_running = False # properly exit the thread + self._watchdog_thread.join() + except AttributeError: # Handle unload before on_ready() + pass logging.info("Probing out.") - def on_ui_setup(self, ui): - self._ui = ui - def on_ui_update(self, ui): - if self.old_name is None: - self.old_name = ui.get('name') - if self.old_name: - i = self.old_name.find('>') - if i: - ui.set('name', "%s%s" % (self.old_name[:i], "!!!")) + if ui.get('name').endswith("!!!"): # No need to to update + return + if self.old_name: + with ui._lock: + ui.set('name', f"{self.old_name}!!!") def on_ready(self, agent): - self._agent = agent logging.info("Probed and Pwnd!") agent.run("wifi.clear") - if self._ui: - self._ui.set("status", "Probe engaged... \nPWNing your signals, Earthlings!") self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True) self._watchdog_thread.start() + with agent._view._lock: # agent._view is the same as th variable "ui" + agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!") def _watchdog(self): CHECK_INTERVAL = 5 # seconds between checks - while True: + while self._watchdog_thread_running: # Check for wlan0mon interface missing if not os.path.exists("/sys/class/net/wlan0mon"): logging.error("wlan0mon interface missing! This likely indicates a Wi‑Fi adapter crash. " "Executing 'sudo systemctl restart pwnagotchi' to recover.") try: - subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True) + subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True) logging.info("pwnagotchi service restarted successfully.") except Exception as e: logging.error("Failed to execute restart command: %s", e) @@ -95,12 +96,12 @@ class probenpwn(plugins.Plugin): # Check for 'wifi.interface not set or not found' error in logs try: - with open('/etc/pwnagotchi/log/pwnagotchi-debug.log', 'r') as log_file: + with open(self.debug_log_path, 'r') as log_file: logs = log_file.read() if "error 400: wifi.interface not set or not found" in logs: logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.") try: - subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True) + subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True) logging.info("pwnagotchi service restarted successfully.") except Exception as e: logging.error("Failed to restart pwnagotchi service: %s", e) @@ -117,16 +118,14 @@ class probenpwn(plugins.Plugin): cl['_track_time'] = ap['_track_time'] self.recents[cl['mac'].lower()] = cl - def ok_to_attack(self, ap): - if not self._agent: - return False + def ok_to_attack(self, agent, ap): # Check if the AP is in the whitelist loaded from the global config if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist: return False return True def attack_target(self, agent, ap, cl): - if not self.ok_to_attack(ap): + if not self.ok_to_attack(agent, ap): return ap_mac = ap['mac'].lower() self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1 @@ -191,7 +190,7 @@ class probenpwn(plugins.Plugin): def on_bcap_wifi_ap_new(self, agent, event): try: ap = event['data'] - if agent._config['personality']['associate'] and self.ok_to_attack(ap): + if agent._config['personality']['associate'] and self.ok_to_attack(agent, ap): logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac']) attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None)) attack_thread.start() @@ -204,8 +203,8 @@ class probenpwn(plugins.Plugin): ap = event['data']['AP'] cl = event['data']['Client'] if (agent._config['personality']['deauth'] and - self.ok_to_attack(ap) and - self.ok_to_attack(cl)): + self.ok_to_attack(agent, ap) and + self.ok_to_attack(agent, cl)): logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) @@ -246,7 +245,7 @@ class probenpwn(plugins.Plugin): def on_bcap_wifi_ap_updated(self, agent, event): try: ap = event['data'] - if self.ok_to_attack(ap): + if self.ok_to_attack(agent, ap): logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac']) self.track_recent(ap) except Exception as e: @@ -256,7 +255,7 @@ class probenpwn(plugins.Plugin): try: ap = event['data']['AP'] cl = event['data']['Client'] - if self.ok_to_attack(ap) and self.ok_to_attack(cl): + if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl): logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) self.track_recent(ap, cl)