- move toml reading by standard function on_config_changed()

- removed unecessary _agent and _ui
- properly exit watchdog loop (_watchdog_thread_running + join)
- retreive debug log path from config
- simplify name changing
- remove sudo as pwnagotchi is already root
This commit is contained in:
frédéric
2025-02-23 02:02:21 +01:00
parent d407590b62
commit 8aecbf3e55

View File

@ -4,7 +4,6 @@ import threading
import os
import subprocess
import random
import toml
import pwnagotchi.plugins as plugins
class probenpwn(plugins.Plugin):
@ -19,12 +18,12 @@ class probenpwn(plugins.Plugin):
def __init__(self):
logging.debug("ProbeNpwn plugin created")
self._agent = None
self.old_name = None
self.recents = {}
self.attack_threads = []
self.epoch_duration = 60 # default epoch duration in seconds
self._watchdog_thread = None
self._watchdog_thread_running = True
# Track number of attack attempts per AP MAC address
self.attack_attempts = {}
# Optionally, track the number of successful handshakes per AP
@ -36,58 +35,60 @@ class probenpwn(plugins.Plugin):
self.performance_stats = {}
self.whitelist = set()
# Load whitelist from the global Pwnagotchi config
self.load_whitelist()
def on_loaded(self):
logging.info(f"Plugin ProbeNpwn loaded")
def load_whitelist(self):
def on_config_changed(self, config):
"""Load the whitelist from Pwnagotchi's global config."""
try:
with open('/etc/pwnagotchi/config.toml', 'r') as config_file:
data = toml.load(config_file)
# Load SSIDs from the whitelist in the global config
self.whitelist = set(data.get('main', {}).get('whitelist', []))
logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}")
except Exception as e:
logging.error(f"Failed to load whitelist from config: {e}")
self.whitelist = set(config["main"].get("whitelist", []))
except KeyError:
self.whitelist = set()
logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}")
try:
self.debug_log_path = config["main"]["log"].get("path-debug", None)
except KeyError:
logging.error(f"Failed to configure debug log path")
self.old_name = config.get("main").get("name", "")
def on_unload(self, ui):
with ui._lock:
if self.old_name:
ui.set('name', "%s " % self.old_name)
else:
ui.set('name', "%s> " % ui.get('name')[:-3])
self.old_name = None
ui.set('name', f"{self.old_name}>")
try:
self._watchdog_thread_running = False # properly exit the thread
self._watchdog_thread.join()
except AttributeError: # Handle unload before on_ready()
pass
logging.info("Probing out.")
def on_ui_setup(self, ui):
self._ui = ui
def on_ui_update(self, ui):
if self.old_name is None:
self.old_name = ui.get('name')
if ui.get('name').endswith("!!!"): # No need to to update
return
if self.old_name:
i = self.old_name.find('>')
if i:
ui.set('name', "%s%s" % (self.old_name[:i], "!!!"))
with ui._lock:
ui.set('name', f"{self.old_name}!!!")
def on_ready(self, agent):
self._agent = agent
logging.info("Probed and Pwnd!")
agent.run("wifi.clear")
if self._ui:
self._ui.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True)
self._watchdog_thread.start()
with agent._view._lock: # agent._view is the same as th variable "ui"
agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
def _watchdog(self):
CHECK_INTERVAL = 5 # seconds between checks
while True:
while self._watchdog_thread_running:
# Check for wlan0mon interface missing
if not os.path.exists("/sys/class/net/wlan0mon"):
logging.error("wlan0mon interface missing! This likely indicates a WiFi adapter crash. "
"Executing 'sudo systemctl restart pwnagotchi' to recover.")
try:
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True)
logging.info("pwnagotchi service restarted successfully.")
except Exception as e:
logging.error("Failed to execute restart command: %s", e)
@ -95,12 +96,12 @@ class probenpwn(plugins.Plugin):
# Check for 'wifi.interface not set or not found' error in logs
try:
with open('/etc/pwnagotchi/log/pwnagotchi-debug.log', 'r') as log_file:
with open(self.debug_log_path, 'r') as log_file:
logs = log_file.read()
if "error 400: wifi.interface not set or not found" in logs:
logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.")
try:
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True)
logging.info("pwnagotchi service restarted successfully.")
except Exception as e:
logging.error("Failed to restart pwnagotchi service: %s", e)
@ -117,16 +118,14 @@ class probenpwn(plugins.Plugin):
cl['_track_time'] = ap['_track_time']
self.recents[cl['mac'].lower()] = cl
def ok_to_attack(self, ap):
if not self._agent:
return False
def ok_to_attack(self, agent, ap):
# Check if the AP is in the whitelist loaded from the global config
if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
return False
return True
def attack_target(self, agent, ap, cl):
if not self.ok_to_attack(ap):
if not self.ok_to_attack(agent, ap):
return
ap_mac = ap['mac'].lower()
self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1
@ -191,7 +190,7 @@ class probenpwn(plugins.Plugin):
def on_bcap_wifi_ap_new(self, agent, event):
try:
ap = event['data']
if agent._config['personality']['associate'] and self.ok_to_attack(ap):
if agent._config['personality']['associate'] and self.ok_to_attack(agent, ap):
logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None))
attack_thread.start()
@ -204,8 +203,8 @@ class probenpwn(plugins.Plugin):
ap = event['data']['AP']
cl = event['data']['Client']
if (agent._config['personality']['deauth'] and
self.ok_to_attack(ap) and
self.ok_to_attack(cl)):
self.ok_to_attack(agent, ap) and
self.ok_to_attack(agent, cl)):
logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)",
ap.get('hostname', 'Unknown AP'), ap['mac'],
cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
@ -246,7 +245,7 @@ class probenpwn(plugins.Plugin):
def on_bcap_wifi_ap_updated(self, agent, event):
try:
ap = event['data']
if self.ok_to_attack(ap):
if self.ok_to_attack(agent, ap):
logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
self.track_recent(ap)
except Exception as e:
@ -256,7 +255,7 @@ class probenpwn(plugins.Plugin):
try:
ap = event['data']['AP']
cl = event['data']['Client']
if self.ok_to_attack(ap) and self.ok_to_attack(cl):
if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl):
logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)",
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
self.track_recent(ap, cl)