Merge pull request #7 from fmatray/main

Probenpwn improvements.


    move toml reading by standard function on_config_changed()

    removed unnecessary _agent and _ui => Those variables were already availables

    properly exit watchdog loop (_watchdog_thread_running + join) => The plugin was waiting for the never coming threads end.

    retrieve debug log path from config => Cleaner if this files is moved in conf for some reason (new version, new plateform, etc.)

    simplify name changing => pwnagotchi's name is in main.name retrieved in on_config_changed() and the old one could mess the displayed name is another plugin wanted to change it.

    remove sudo to restart as pwnagotchi is already root

   shoutout to fmatray
This commit is contained in:
AlienMajik
2025-02-25 20:50:34 -08:00
committed by GitHub

View File

@ -4,7 +4,6 @@ import threading
import os
import subprocess
import random
import toml
import pwnagotchi.plugins as plugins
class probenpwn(plugins.Plugin):
@ -19,12 +18,12 @@ class probenpwn(plugins.Plugin):
def __init__(self):
logging.debug("ProbeNpwn plugin created")
self._agent = None
self.old_name = None
self.recents = {}
self.attack_threads = []
self.epoch_duration = 60 # default epoch duration in seconds
self._watchdog_thread = None
self._watchdog_thread_running = True
# Track number of attack attempts per AP MAC address
self.attack_attempts = {}
# Optionally, track the number of successful handshakes per AP
@ -35,59 +34,61 @@ class probenpwn(plugins.Plugin):
# Track the performance of each AP for dynamic adjustments
self.performance_stats = {}
self.whitelist = set()
# Load whitelist from the global Pwnagotchi config
self.load_whitelist()
def on_loaded(self):
logging.info(f"Plugin ProbeNpwn loaded")
def load_whitelist(self):
def on_config_changed(self, config):
"""Load the whitelist from Pwnagotchi's global config."""
try:
with open('/etc/pwnagotchi/config.toml', 'r') as config_file:
data = toml.load(config_file)
# Load SSIDs from the whitelist in the global config
self.whitelist = set(data.get('main', {}).get('whitelist', []))
logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}")
except Exception as e:
logging.error(f"Failed to load whitelist from config: {e}")
self.whitelist = set(config["main"].get("whitelist", []))
except KeyError:
self.whitelist = set()
logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}")
try:
self.debug_log_path = config["main"]["log"].get("path-debug", None)
except KeyError:
logging.error(f"Failed to configure debug log path")
self.old_name = config.get("main").get("name", "")
def on_unload(self, ui):
if self.old_name:
ui.set('name', "%s " % self.old_name)
else:
ui.set('name', "%s> " % ui.get('name')[:-3])
self.old_name = None
with ui._lock:
if self.old_name:
ui.set('name', f"{self.old_name}>")
try:
self._watchdog_thread_running = False # properly exit the thread
self._watchdog_thread.join()
except AttributeError: # Handle unload before on_ready()
pass
logging.info("Probing out.")
def on_ui_setup(self, ui):
self._ui = ui
def on_ui_update(self, ui):
if self.old_name is None:
self.old_name = ui.get('name')
if self.old_name:
i = self.old_name.find('>')
if i:
ui.set('name', "%s%s" % (self.old_name[:i], "!!!"))
if ui.get('name').endswith("!!!"): # No need to to update
return
if self.old_name:
with ui._lock:
ui.set('name', f"{self.old_name}!!!")
def on_ready(self, agent):
self._agent = agent
logging.info("Probed and Pwnd!")
agent.run("wifi.clear")
if self._ui:
self._ui.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True)
self._watchdog_thread.start()
with agent._view._lock: # agent._view is the same as th variable "ui"
agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
def _watchdog(self):
CHECK_INTERVAL = 5 # seconds between checks
while True:
while self._watchdog_thread_running:
# Check for wlan0mon interface missing
if not os.path.exists("/sys/class/net/wlan0mon"):
logging.error("wlan0mon interface missing! This likely indicates a WiFi adapter crash. "
"Executing 'sudo systemctl restart pwnagotchi' to recover.")
try:
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True)
logging.info("pwnagotchi service restarted successfully.")
except Exception as e:
logging.error("Failed to execute restart command: %s", e)
@ -95,12 +96,12 @@ class probenpwn(plugins.Plugin):
# Check for 'wifi.interface not set or not found' error in logs
try:
with open('/etc/pwnagotchi/log/pwnagotchi-debug.log', 'r') as log_file:
with open(self.debug_log_path, 'r') as log_file:
logs = log_file.read()
if "error 400: wifi.interface not set or not found" in logs:
logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.")
try:
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True)
logging.info("pwnagotchi service restarted successfully.")
except Exception as e:
logging.error("Failed to restart pwnagotchi service: %s", e)
@ -117,16 +118,14 @@ class probenpwn(plugins.Plugin):
cl['_track_time'] = ap['_track_time']
self.recents[cl['mac'].lower()] = cl
def ok_to_attack(self, ap):
if not self._agent:
return False
def ok_to_attack(self, agent, ap):
# Check if the AP is in the whitelist loaded from the global config
if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
return False
return True
def attack_target(self, agent, ap, cl):
if not self.ok_to_attack(ap):
if not self.ok_to_attack(agent, ap):
return
ap_mac = ap['mac'].lower()
self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1
@ -191,7 +190,7 @@ class probenpwn(plugins.Plugin):
def on_bcap_wifi_ap_new(self, agent, event):
try:
ap = event['data']
if agent._config['personality']['associate'] and self.ok_to_attack(ap):
if agent._config['personality']['associate'] and self.ok_to_attack(agent, ap):
logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None))
attack_thread.start()
@ -204,8 +203,8 @@ class probenpwn(plugins.Plugin):
ap = event['data']['AP']
cl = event['data']['Client']
if (agent._config['personality']['deauth'] and
self.ok_to_attack(ap) and
self.ok_to_attack(cl)):
self.ok_to_attack(agent, ap) and
self.ok_to_attack(agent, cl)):
logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)",
ap.get('hostname', 'Unknown AP'), ap['mac'],
cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
@ -246,7 +245,7 @@ class probenpwn(plugins.Plugin):
def on_bcap_wifi_ap_updated(self, agent, event):
try:
ap = event['data']
if self.ok_to_attack(ap):
if self.ok_to_attack(agent, ap):
logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
self.track_recent(ap)
except Exception as e:
@ -256,7 +255,7 @@ class probenpwn(plugins.Plugin):
try:
ap = event['data']['AP']
cl = event['data']['Client']
if self.ok_to_attack(ap) and self.ok_to_attack(cl):
if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl):
logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)",
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
self.track_recent(ap, cl)