Files
pwnagotchi_plugins/neurolyzer.py
AlienMajik 6903ab88e1 Update neurolyzer.py
Neurolyzer 1.5.2 elevates Pwnagotchi’s stealth and privacy with advanced WIDS/WIPS evasion, hardware-aware operations, realistic MAC generation, and flexible modes. Compared to earlier versions, it offers superior reliability (via retries and error handling), deeper stealth (traffic throttling, probe sanitization), and better usability (enhanced UI and logging). Whether you’re testing security or keeping a low profile, Neurolyzer 1.5.2 is a significant upgrade—more versatile, stealthy, and robust than ever.
2025-03-16 20:59:21 -07:00

533 lines
21 KiB
Python

import logging
import subprocess
import time
import random
import os
import re
import fcntl
import pwnagotchi.plugins as plugins
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
import pwnagotchi.ui.fonts as fonts
class Neurolyzer(plugins.Plugin):
__author__ = 'AlienMajik'
__version__ = '1.5.2'
__license__ = 'GPL3'
__description__ = "Advanced WIDS/WIPS evasion system with hardware-aware adaptive countermeasures"
DEFAULT_OUI = [
'00:14:22', '34:AB:95', 'DC:A6:32',
'00:1A:11', '08:74:02', '50:32:37'
]
DEFAULT_WIDS = ['wids-guardian', 'airdefense', 'cisco-ips', 'cisco-awips', 'fortinet-wids', 'aruba-widp', 'kismet']
SAFE_CHANNELS = [1, 6, 11]
MIN_MAC_CHANGE_INTERVAL = 30 # Minimum seconds between MAC changes
LOCK_FILE = '/tmp/neurolyzer.lock'
def __init__(self):
self.enabled = False
self.wifi_interface = 'wlan0'
self.operation_mode = 'stealth'
self.mac_change_interval = 3600 # Default interval
self.last_operations = {
'mac_change': 0,
'wids_check': 0,
'channel_hop': 0,
'tx_power_change': 0
}
# Hardware capabilities cache
self.hw_caps = {
'tx_power': {'min': 1, 'max': 20, 'supported': True},
'supported_channels': self.SAFE_CHANNELS,
'monitor_mode': True,
'mac_spoofing': True,
'iproute2': True
}
# State tracking
self.current_channel = 1
self.current_tx_power = 20
self.probe_blacklist = []
self.current_mac = None
self.lock_fd = None
# UI configuration
self.ui_config = {
'mode': (0, 0),
'mac_timer': (0, 10),
'tx_power': (0, 20),
'channel': (0, 30)
}
def _acquire_lock(self):
"""Acquire exclusive lock for atomic operations"""
try:
self.lock_fd = open(self.LOCK_FILE, 'w')
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except (IOError, BlockingIOError):
return False
def _release_lock(self):
"""Release exclusive lock"""
if self.lock_fd:
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
self.lock_fd.close()
os.remove(self.LOCK_FILE)
except:
pass
def _execute(self, command, critical=False, retries=2, timeout=8):
"""Robust command execution with retries and alternate methods"""
methods = [
command,
['iwconfig' if cmd == 'iw' else cmd for cmd in command] # Fallback to iwconfig
]
for attempt in range(retries + 1):
for method in methods:
try:
result = subprocess.run(
['sudo'] + method,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout
)
return result
except subprocess.CalledProcessError as e:
error = e.stderr.strip()
if attempt == retries:
if "Device or resource busy" in error:
logging.debug(f"[Neurolyzer] Interface busy, retrying {method[0]}")
time.sleep(random.uniform(0.5, 1.5))
continue
if "Operation not supported" in error:
self._update_hw_capability(method[0], False)
logging.debug(f"[Neurolyzer] Attempt {attempt+1} failed: {' '.join(method)} - {error}")
except Exception as e:
logging.debug(f"[Neurolyzer] Unexpected error: {str(e)}")
return None
def _update_hw_capability(self, feature, supported):
"""Dynamically adjust hardware capabilities"""
if feature == 'txpower':
self.hw_caps['tx_power']['supported'] = supported
elif feature == 'mac':
self.hw_caps['mac_spoofing'] = supported
elif feature == 'iproute2':
self.hw_caps['iproute2'] = supported
def _current_tx_power(self):
"""Get current TX power level"""
try:
result = self._execute(['iw', 'dev', self.wifi_interface, 'get', 'txpower'])
if result:
match = re.search(r'txpower (\d+) dBm', result.stdout)
return int(match.group(1)) if match else self.current_tx_power
return self.current_tx_power
except Exception as e:
logging.debug(f"[Neurolyzer] TX power read failed: {str(e)}")
return self.current_tx_power
def _safe_mac_change(self):
"""Atomic MAC rotation with locking"""
if time.time() - self.last_operations['mac_change'] < self.MIN_MAC_CHANGE_INTERVAL:
return
if not self._acquire_lock():
logging.debug("[Neurolyzer] MAC change skipped - operation in progress")
return
try:
original_mac = self._get_current_mac()
new_mac = self._generate_valid_mac()
# Use alternate method if primary fails
sequence = [
['ip', 'link', 'set', 'dev', self.wifi_interface, 'down'],
['ip', 'link', 'set', 'dev', self.wifi_interface, 'address', new_mac],
['ip', 'link', 'set', 'dev', self.wifi_interface, 'up'],
['ifconfig', self.wifi_interface, 'down'],
['ifconfig', self.wifi_interface, 'hw', 'ether', new_mac],
['ifconfig', self.wifi_interface, 'up']
]
for cmd in sequence[:3] if self.hw_caps['iproute2'] else sequence[3:]:
if not self._execute(cmd):
break
else:
verified_mac = self._get_current_mac()
if verified_mac.lower() != new_mac.lower(): # Case-insensitive comparison
logging.error(f"[Neurolyzer] MAC verification failed (expected: {new_mac.lower()}, got: {verified_mac.lower()})")
self._release_lock()
return
self.last_operations['mac_change'] = time.time()
self.current_mac = new_mac
logging.info(f"[Neurolyzer] MAC rotated to {new_mac.lower()}") # Log in lowercase for consistency
except Exception as e:
logging.error(f"[Neurolyzer] MAC rotation failed: {str(e)}")
finally:
self._release_lock()
def _adjust_tx_power(self):
"""Hardware-adaptive TX power control"""
if not self.hw_caps['tx_power']['supported']:
return
try:
current_power = self._current_tx_power()
valid_powers = [p for p in self.options.get('tx_power_levels', [])
if self.hw_caps['tx_power']['min'] <= p <= self.hw_caps['tx_power']['max']]
if valid_powers:
new_power = random.choice(valid_powers)
if new_power != current_power:
# Try multiple control methods
self._execute(['iw', 'dev', self.wifi_interface, 'set', 'txpower', 'fixed', str(new_power)]) or \
self._execute(['iwconfig', self.wifi_interface, 'txpower', str(new_power)])
except Exception as e:
logging.debug(f"[Neurolyzer] TX power adjustment skipped: {str(e)}")
def _throttle_traffic(self):
"""Compatibility-focused traffic shaping"""
try:
# Try modern qdisc
result = self._execute([
'tc', 'qdisc', 'replace', 'dev', self.wifi_interface,
'root', 'netem', 'delay', '100ms', '10ms', 'distribution', 'normal'
])
if not result:
# Fallback to simple shaping
self._execute([
'tc', 'qdisc', 'replace', 'dev', self.wifi_interface,
'root', 'pfifo', 'limit', '1000'
])
except Exception as e:
logging.debug(f"[Neurolyzer] Traffic shaping unavailable: {str(e)}")
def _validate_interface(self):
"""Ensure interface exists and is ready"""
retries = 0
while retries < 3:
if os.path.exists(f'/sys/class/net/{self.wifi_interface}'):
return True
logging.warning(f"[Neurolyzer] Interface missing, retrying... ({retries+1}/3)")
time.sleep(2 ** retries)
retries += 1
return False
def on_loaded(self):
if not hasattr(self, 'options'):
logging.warning("[Neurolyzer] Options not provided by framework, using defaults")
self.options = {}
else:
logging.debug("[Neurolyzer] Options loaded: {}".format(self.options))
valid_modes = ['normal', 'stealth', 'noided']
self.operation_mode = self.options.get('operation_mode', 'stealth')
if self.operation_mode not in valid_modes:
logging.error(f"[Neurolyzer] Invalid mode: {self.operation_mode}")
self.enabled = False
return
self.wifi_interface = self.options.get('wifi_interface', 'wlan0')
self.mac_change_interval = self.options.get('mac_change_interval', 3600)
self.enabled = self.options.get('enabled', True)
# Enhanced initialization sequence
try:
if not self._validate_interface():
raise RuntimeError("Network interface unavailable")
# Dynamic capability discovery
self._discover_hardware_capabilities()
# Fallback to sane defaults if detection failed
if not self.hw_caps['supported_channels']:
self.hw_caps['supported_channels'] = self.SAFE_CHANNELS
if not self.hw_caps['tx_power']['supported']:
self.hw_caps['tx_power'] = {'min': 1, 'max': 20, 'supported': False}
self._apply_initial_config()
logging.info(f"[Neurolyzer] Active in {self.operation_mode} mode")
except Exception as e:
logging.error(f"[Neurolyzer] Initialization failed: {str(e)}")
self.enabled = False
def _discover_hardware_capabilities(self):
"""Comprehensive hardware capability discovery"""
try:
# Get interface info
info = self._execute(['iw', 'dev', self.wifi_interface, 'info'])
if not info:
raise RuntimeError("Interface information unavailable")
# TX power capabilities
tx_info = self._execute(['iw', 'dev', self.wifi_interface, 'get', 'txpower'])
if tx_info:
tx_matches = re.findall(r'(\d+) dBm', tx_info.stdout)
if tx_matches:
self.hw_caps['tx_power']['min'] = min(int(m) for m in tx_matches)
self.hw_caps['tx_power']['max'] = max(int(m) for m in tx_matches)
else:
self.hw_caps['tx_power']['supported'] = False
# Supported channels
phy_match = re.search(r'phy#(\d+)', info.stdout)
if phy_match:
phy = phy_match.group(1)
chan_info = self._execute(['iw', 'phy', phy, 'info'])
if chan_info:
self.hw_caps['supported_channels'] = [
int(m) for m in re.findall(r'(\d+) MHz', chan_info.stdout)
]
# Monitor mode support
self.hw_caps['monitor_mode'] = 'monitor' in info.stdout
# MAC spoofing test
self.hw_caps['mac_spoofing'] = self._test_mac_spoofing()
except Exception as e:
logging.error(f"[Neurolyzer] Hardware discovery failed: {str(e)}")
self.enabled = False
def _test_mac_spoofing(self):
"""Verify MAC address spoofing capability"""
try:
original_mac = self._get_current_mac()
test_mac = '00:11:22:33:44:55'
# Try to change MAC
self._execute(['ip', 'link', 'set', 'dev', self.wifi_interface, 'down'])
self._execute(['ip', 'link', 'set', 'dev', self.wifi_interface, 'address', test_mac])
self._execute(['ip', 'link', 'set', 'dev', self.wifi_interface, 'up'])
# Verify change
new_mac = self._get_current_mac()
# Restore original MAC
self._execute(['ip', 'link', 'set', 'dev', self.wifi_interface, 'down'])
self._execute(['ip', 'link', 'set', 'dev', self.wifi_interface, 'address', original_mac])
self._execute(['ip', 'link', 'set', 'dev', self.wifi_interface, 'up'])
return new_mac.lower() == test_mac.lower()
except:
return False
def _get_current_mac(self):
"""Get current MAC address"""
try:
with open(f'/sys/class/net/{self.wifi_interface}/address') as f:
return f.read().strip()
except:
return None
def _apply_initial_config(self):
"""Initial setup with error protection"""
if not self._validate_interface():
raise RuntimeError("Network interface not available")
if self.operation_mode == 'noided':
self._safe_mac_change()
self._set_interface_mode('monitor')
self._adjust_tx_power()
self._channel_hop()
self._sanitize_probes()
self._throttle_traffic()
def on_ui_setup(self, ui):
if not self.enabled:
return
elements = [
('neuro_mode', 'Mode:', 'mode', self.operation_mode.capitalize()),
('neuro_mac', 'Next MAC:', 'mac_timer', 'Calculating...'),
('neuro_tx', 'TX:', 'tx_power', f'{self.current_tx_power}dBm'),
('neuro_chan', 'CH:', 'channel', str(self.current_channel))
]
for elem_id, label, pos_key, value in elements:
if pos_key in self.ui_config:
ui.add_element(elem_id, LabeledValue(
color=BLACK,
label=label,
value=value,
position=self.ui_config[pos_key],
label_font=fonts.Small,
text_font=fonts.Small
))
def on_ui_update(self, ui):
if not self.enabled:
return
ui.set('neuro_mode', self.operation_mode.capitalize())
ui.set('neuro_mac', f"{self._next_mac_time()}m")
ui.set('neuro_tx', f"{self.current_tx_power}dBm")
ui.set('neuro_chan', str(self.current_channel))
def on_wifi_update(self, agent, access_points):
if self.enabled:
if self.operation_mode == 'noided':
# Keep existing behavior for noided mode
self._safe_mac_change()
self._channel_hop()
self._adjust_tx_power()
self._sanitize_probes()
self._throttle_traffic()
elif self.operation_mode == 'stealth':
# Only perform MAC randomization in stealth mode
self._safe_mac_change()
def _check_wids(self, access_points):
"""WIDS detection with multiple fingerprint checks"""
if time.time() - self.last_operations['wids_check'] < 300:
return
wids_triggers = set(wids.lower() for wids in self.options.get('wids_ssids', self.DEFAULT_WIDS))
for ap in access_points:
essid = ap.get('essid', '')
if essid.lower() in wids_triggers:
logging.warning(f"[Neurolyzer] WIDS detected: {essid}")
self._evasion_protocol()
break
self.last_operations['wids_check'] = time.time()
def _evasion_protocol(self):
"""Execute coordinated evasion measures"""
logging.info("[Neurolyzer] Initiating evasion sequence")
measures = [
self._safe_mac_change,
self._channel_hop,
self._adjust_tx_power,
lambda: time.sleep(random.randint(10, 30))
]
try:
for measure in random.sample(measures, k=3):
measure()
except Exception as e:
logging.error(f"[Neurolyzer] Evasion protocol failed: {str(e)}")
def _generate_valid_mac(self):
"""Create manufacturer-plausible MAC address"""
try:
if self.operation_mode == 'noided':
oui = random.choice(self.DEFAULT_OUI).replace(':', '')
return f"{oui[:2]}:{oui[2:4]}:{oui[4:6]}:" \
f"{random.randint(0,255):02x}:" \
f"{random.randint(0,255):02x}:" \
f"{random.randint(0,255):02x}"
return ':'.join(f"{random.randint(0,255):02x}" for _ in range(6))
except Exception as e:
logging.error(f"[Neurolyzer] MAC generation failed: {str(e)}")
return '00:00:00:00:00:00'
def _channel_hop(self):
"""Channel selection with interference avoidance"""
try:
if time.time() - self.last_operations['channel_hop'] < 60:
return
safe_channels = [c for c in self.SAFE_CHANNELS if c in self.hw_caps['supported_channels']]
valid_channels = safe_channels or self.hw_caps['supported_channels'][-3:]
if valid_channels:
new_channel = random.choice(valid_channels)
if self._execute(['iw', 'dev', self.wifi_interface, 'set', 'channel', str(new_channel)]):
self.current_channel = new_channel
self.last_operations['channel_hop'] = time.time()
except Exception as e:
logging.error(f"[Neurolyzer] Channel hop failed: {str(e)}")
def _set_interface_mode(self, mode):
"""Safe mode transition with validation"""
try:
if mode not in ['managed', 'monitor'] or not self.hw_caps['monitor_mode']:
return
current_mode = self._current_interface_mode()
if current_mode != mode:
if self._execute(['iw', 'dev', self.wifi_interface, 'set', 'type', mode]):
logging.debug(f"[Neurolyzer] Interface mode set to {mode}")
except Exception as e:
logging.error(f"[Neurolyzer] Mode change failed: {str(e)}")
def _current_interface_mode(self):
"""Detect current interface mode safely"""
try:
info = self._execute(['iw', 'dev', self.wifi_interface, 'info'])
return 'monitor' if info and 'monitor' in info.stdout else 'managed'
except:
return 'managed'
def _sanitize_probes(self):
"""Filter sensitive probe requests"""
try:
if self.probe_blacklist:
with open('/tmp/neuro_filter', 'w') as f:
f.write('\n'.join(self.probe_blacklist))
self._execute([
'hcxdumptool', '-i', self.wifi_interface,
'--filterlist=/tmp/neuro_filter', '--filtermode=2'
])
except Exception as e:
logging.error(f"[Neurolyzer] Probe filtering error: {str(e)}")
def _throttle_traffic(self):
"""Limit packet rates for stealth"""
try:
self._execute([
'tc', 'qdisc', 'replace',
'dev', self.wifi_interface, 'root', 'pfifo_fast', 'limit', '100'
])
except Exception as e:
logging.error(f"[Neurolyzer] Traffic shaping failed: {str(e)}")
def _random_operation(self):
"""Randomize operational parameters safely"""
try:
actions = [
self._adjust_tx_power,
self._channel_hop,
self._safe_mac_change,
lambda: None
]
random.choice(actions)()
except Exception as e:
logging.error(f"[Neurolyzer] Random operation failed: {str(e)}")
def _next_mac_time(self):
try:
return max(0, int((self.last_operations['mac_change'] + self.mac_change_interval - time.time()) // 60))
except:
return 0
def on_unload(self, ui=None):
"""Enhanced cleanup with resource release"""
try:
if self.enabled:
logging.info("[Neurolyzer] Restoring network configurations")
self._execute(['tc', 'qdisc', 'del', 'dev', self.wifi_interface, 'root'])
self._set_interface_mode('monitor')
except Exception as e:
logging.error(f"[Neurolyzer] Cleanup failed: {str(e)}")
finally:
self._release_lock()