From ed769f591db8687c449a2b59e645317e2673d458 Mon Sep 17 00:00:00 2001 From: AlienMajik <118037572+AlienMajik@users.noreply.github.com> Date: Sun, 4 May 2025 15:07:32 -0700 Subject: [PATCH] Update snoopr.py This updated version (2.0.0) brings a host of new features, including richer data collection, smarter snooper detection, whitelisting, automatic data pruning, and an improved web interface. --- snoopr.py | 295 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 225 insertions(+), 70 deletions(-) diff --git a/snoopr.py b/snoopr.py index 8eefe7c..ad004d4 100644 --- a/snoopr.py +++ b/snoopr.py @@ -1,16 +1,16 @@ import logging import sqlite3 import os -from threading import Lock +from threading import Lock, Thread +from datetime import datetime, timedelta +import time +from math import radians, sin, cos, sqrt, atan2 +import subprocess import pwnagotchi.plugins as plugins from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts -from flask import render_template_string -import time -from math import radians, sin, cos, sqrt, atan2 -from datetime import datetime -import subprocess +from flask import render_template_string, request, jsonify class Database: def __init__(self, path): @@ -21,6 +21,7 @@ class Database: logging.info('[SnoopR] Setting up database connection...') self.__connection = sqlite3.connect(self.__path, check_same_thread=False) cursor = self.__connection.cursor() + cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -33,7 +34,7 @@ class Database: mac TEXT NOT NULL UNIQUE, type TEXT NOT NULL, name TEXT, - device_type TEXT NOT NULL, -- 'wifi' or 'bluetooth' + device_type TEXT NOT NULL, is_snooper INTEGER DEFAULT 0 ) ''') @@ -46,11 +47,23 @@ class Database: signal_strength INTEGER, latitude TEXT, longitude TEXT, + channel INTEGER, + auth_mode TEXT, timestamp TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(id), FOREIGN KEY(network_id) REFERENCES networks(id) ) ''') + + cursor.execute("PRAGMA table_info(detections)") + columns = [column[1] for column in cursor.fetchall()] + if 'channel' not in columns: + cursor.execute("ALTER TABLE detections ADD COLUMN channel INTEGER") + logging.info('[SnoopR] Added "channel" column to detections table') + if 'auth_mode' not in columns: + cursor.execute("ALTER TABLE detections ADD COLUMN auth_mode TEXT") + logging.info('[SnoopR] Added "auth_mode" column to detections table') + cursor.close() self.__connection.commit() logging.info('[SnoopR] Successfully connected to db') @@ -68,7 +81,7 @@ class Database: self.__connection.commit() return session_id - def add_detection(self, session_id, mac, type_, name, device_type, encryption, signal_strength, latitude, longitude): + def add_detection(self, session_id, mac, type_, name, device_type, encryption, signal_strength, latitude, longitude, channel, auth_mode): cursor = self.__connection.cursor() cursor.execute('SELECT id FROM networks WHERE mac = ? AND device_type = ?', (mac, device_type)) result = cursor.fetchone() @@ -78,16 +91,16 @@ class Database: cursor.execute('INSERT INTO networks (mac, type, name, device_type) VALUES (?, ?, ?, ?)', (mac, type_, name, device_type)) network_id = cursor.lastrowid cursor.execute(''' - INSERT INTO detections (session_id, network_id, encryption, signal_strength, latitude, longitude) - VALUES (?, ?, ?, ?, ?, ?) - ''', (session_id, network_id, encryption, signal_strength, latitude, longitude)) + INSERT INTO detections (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode)) cursor.close() self.__connection.commit() return network_id - def get_all_networks(self): + def get_all_networks(self, sort_by=None, filter_by=None): cursor = self.__connection.cursor() - cursor.execute(''' + query = ''' SELECT n.mac, n.type, n.name, n.device_type, MIN(d.timestamp) as first_seen, MIN(d.session_id) as first_session, MAX(d.timestamp) as last_seen, MAX(d.session_id) as last_session, COUNT(DISTINCT d.session_id) as sessions_count, d2.latitude, d2.longitude, n.is_snooper @@ -96,8 +109,18 @@ class Database: LEFT JOIN detections d2 ON n.id = d2.network_id AND d2.timestamp = ( SELECT MAX(timestamp) FROM detections WHERE network_id = n.id ) - GROUP BY n.id, n.mac, n.type, n.name, n.device_type - ''') + WHERE 1=1 + ''' + if filter_by == 'snoopers': + query += ' AND n.is_snooper = 1' + elif filter_by == 'bluetooth': + query += ' AND n.device_type = "bluetooth"' + query += ' GROUP BY n.id, n.mac, n.type, n.name, n.device_type' + if sort_by == 'device_type': + query += ' ORDER BY n.device_type' + elif sort_by == 'is_snooper': + query += ' ORDER BY n.is_snooper DESC' + cursor.execute(query) rows = cursor.fetchall() networks = [] for row in rows: @@ -145,11 +168,19 @@ class Database: cursor.close() self.__connection.commit() + def prune_old_data(self, days): + cursor = self.__connection.cursor() + cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') + cursor.execute('DELETE FROM detections WHERE timestamp < ?', (cutoff_date,)) + cursor.close() + self.__connection.commit() + logging.info(f'[SnoopR] Pruned data older than {days} days') + class SnoopR(plugins.Plugin): __author__ = 'AlienMajik' - __version__ = '1.0.0' + __version__ = '2.0.0' __license__ = 'GPL3' - __description__ = 'A plugin for wardriving Wi-Fi and Bluetooth networks and detecting snoopers.' + __description__ = 'A plugin for wardriving Wi-Fi and Bluetooth networks and detecting snoopers with enhanced functionality.' DEFAULT_PATH = '/root/snoopr' DATABASE_NAME = 'snoopr.db' @@ -163,21 +194,26 @@ class SnoopR(plugins.Plugin): self.__session_id = None self.__bluetooth_enabled = False self.last_scan_time = 0 + self.__whitelist = [] + self.prune_days = 30 def on_loaded(self): logging.info('[SnoopR] Plugin loaded.') self.__path = self.options.get('path', self.DEFAULT_PATH) self.__ui_enabled = self.options.get('ui', {}).get('enabled', True) self.__gps_config = {'method': self.options.get('gps', {}).get('method', 'bettercap')} - self.movement_threshold = self.options.get('movement_threshold', 0.1) # miles - self.time_threshold_minutes = self.options.get('time_threshold_minutes', 5) # minutes + self.movement_threshold = self.options.get('movement_threshold', 0.1) + self.time_threshold_minutes = self.options.get('time_threshold_minutes', 5) self.__bluetooth_enabled = self.options.get('bluetooth_enabled', False) - self.timer = self.options.get('timer', 45) # Scan interval in seconds + self.timer = self.options.get('timer', 45) + self.__whitelist = self.options.get('whitelist', []) + self.prune_days = self.options.get('prune_days', 30) if not os.path.exists(self.__path): os.makedirs(self.__path) self.__db = Database(os.path.join(self.__path, self.DATABASE_NAME)) self.__session_id = self.__db.new_session() + self.__db.prune_old_data(self.prune_days) self.ready = True def on_ui_setup(self, ui): @@ -188,6 +224,9 @@ class SnoopR(plugins.Plugin): ui.add_element('snoopr_wifi_snoopers', LabeledValue( color=BLACK, label='WiFi Snoopers:', value='0', position=(7, 105), label_font=fonts.Small, text_font=fonts.Small)) + ui.add_element('snoopr_last_scan', LabeledValue( + color=BLACK, label='Last Scan:', value='N/A', position=(7, 135), + label_font=fonts.Small, text_font=fonts.Small)) if self.__bluetooth_enabled: ui.add_element('snoopr_bt_networks', LabeledValue( color=BLACK, label='BT Nets:', value='0', position=(7, 115), @@ -201,9 +240,13 @@ class SnoopR(plugins.Plugin): current_time = time.time() if current_time - self.last_scan_time >= self.timer: self.last_scan_time = current_time - self.on_bluetooth_scan() + Thread(target=self.on_bluetooth_scan).start() ui.set('snoopr_wifi_networks', str(self.__db.network_count('wifi'))) ui.set('snoopr_wifi_snoopers', str(self.__db.snooper_count('wifi'))) + if self.last_scan_time == 0: + ui.set('snoopr_last_scan', 'N/A') + else: + ui.set('snoopr_last_scan', datetime.fromtimestamp(self.last_scan_time).strftime('%H:%M:%S')) if self.__bluetooth_enabled: ui.set('snoopr_bt_networks', str(self.__db.network_count('bluetooth'))) ui.set('snoopr_bt_snoopers', str(self.__db.snooper_count('bluetooth'))) @@ -213,6 +256,7 @@ class SnoopR(plugins.Plugin): with ui._lock: ui.remove_element('snoopr_wifi_networks') ui.remove_element('snoopr_wifi_snoopers') + ui.remove_element('snoopr_last_scan') if self.__bluetooth_enabled: ui.remove_element('snoopr_bt_networks') ui.remove_element('snoopr_bt_snoopers') @@ -222,55 +266,68 @@ class SnoopR(plugins.Plugin): def on_unfiltered_ap_list(self, agent, aps): if not self.ready: return - gps_data = self.__get_gps(agent) - if gps_data and all([gps_data['Latitude'], gps_data['Longitude']]): - self.__last_gps = { - 'latitude': gps_data['Latitude'], - 'longitude': gps_data['Longitude'], - 'altitude': gps_data['Altitude'] or '-' - } - coordinates = {'latitude': str(gps_data['Latitude']), 'longitude': str(gps_data['Longitude'])} + with self.__lock: + gps_data = self.__get_gps(agent) + if gps_data and all([gps_data['Latitude'], gps_data['Longitude']]): + self.__last_gps = { + 'latitude': gps_data['Latitude'], + 'longitude': gps_data['Longitude'], + 'altitude': gps_data['Altitude'] or '-' + } + coordinates = {'latitude': str(gps_data['Latitude']), 'longitude': str(gps_data['Longitude'])} + else: + coordinates = {'latitude': '-', 'longitude': '-'} for ap in aps: mac = ap['mac'] ssid = ap['hostname'] if ap['hostname'] != '' else '' + if ssid in self.__whitelist: + continue encryption = f"{ap['encryption']}{ap.get('cipher', '')}{ap.get('authentication', '')}" rssi = ap['rssi'] - self.__db.add_detection(self.__session_id, mac, 'wi-fi ap', ssid, 'wifi', encryption, rssi, coordinates['latitude'], coordinates['longitude']) + channel = ap.get('channel', 0) + auth_mode = ap.get('authentication', '') + network_id = self.__db.add_detection(self.__session_id, mac, 'wi-fi ap', ssid, 'wifi', encryption, rssi, coordinates['latitude'], coordinates['longitude'], channel, auth_mode) self.check_and_update_snooper_status(mac, 'wifi') - else: - self.__gps_available = False - self.__last_gps = {'latitude': '-', 'longitude': '-', 'altitude': '-'} def on_bluetooth_scan(self): if not self.ready or not self.__bluetooth_enabled: return - gps_data = self.__last_gps - if gps_data['latitude'] == '-' or gps_data['longitude'] == '-': - logging.warning('[SnoopR] GPS not available... skipping Bluetooth scan') - return - coordinates = {'latitude': gps_data['latitude'], 'longitude': gps_data['longitude']} - try: - cmd_inq = "hcitool inq --flush" - inq_output = subprocess.check_output(cmd_inq.split(), stderr=subprocess.DEVNULL).decode().splitlines() - for line in inq_output[1:]: - fields = line.split() - if len(fields) < 1: - continue - mac_address = fields[0] - name = self.get_device_name(mac_address) - self.__db.add_detection(self.__session_id, mac_address, 'bluetooth', name, 'bluetooth', '', 0, coordinates['latitude'], coordinates['longitude']) - self.check_and_update_snooper_status(mac_address, 'bluetooth') - logging.debug(f'[SnoopR] Logged Bluetooth device: {mac_address} ({name})') - except subprocess.CalledProcessError as e: - logging.error(f"[SnoopR] Error running hcitool: {e}") + with self.__lock: + gps_data = self.__last_gps + if gps_data['latitude'] == '-' or gps_data['longitude'] == '-': + logging.warning("[SnoopR] No valid GPS data available, skipping Bluetooth scan.") + return + coordinates = {'latitude': gps_data['latitude'], 'longitude': gps_data['longitude']} + try: + cmd_inq = "hcitool inq --flush" + inq_output = subprocess.check_output(cmd_inq.split(), stderr=subprocess.DEVNULL).decode().splitlines() + for line in inq_output[1:]: + fields = line.split() + if len(fields) < 1: + continue + mac_address = fields[0] + name = self.get_device_name(mac_address) + if name in self.__whitelist: + continue + network_id = self.__db.add_detection(self.__session_id, mac_address, 'bluetooth', name, 'bluetooth', '', 0, coordinates['latitude'], coordinates['longitude'], 0, '') + self.check_and_update_snooper_status(mac_address, 'bluetooth') + logging.debug(f'[SnoopR] Logged Bluetooth device: {mac_address} ({name})') + except subprocess.CalledProcessError as e: + logging.error(f"[SnoopR] Error running hcitool: {e}") def get_device_name(self, mac_address): - try: - cmd_name = f"hcitool name {mac_address}" - name_output = subprocess.check_output(cmd_name.split(), stderr=subprocess.DEVNULL).decode().strip() - return name_output if name_output else 'Unknown' - except subprocess.CalledProcessError: - return 'Unknown' + for attempt in range(3): + try: + cmd_name = f"hcitool name {mac_address}" + name_output = subprocess.check_output(cmd_name.split(), stderr=subprocess.DEVNULL).decode().strip() + return name_output if name_output else 'Unknown' + except subprocess.CalledProcessError: + if attempt < 2: + time.sleep(1) + continue + else: + logging.warning(f"[SnoopR] Failed to get name for {mac_address} after 3 attempts") + return 'Unknown' def check_and_update_snooper_status(self, mac, device_type): cursor = self.__db._Database__connection.cursor() @@ -282,7 +339,7 @@ class SnoopR(plugins.Plugin): ORDER BY d.timestamp ''', (mac, device_type)) rows = cursor.fetchall() - if len(rows) < 2: + if len(rows) < 3: return is_snooper = False for i in range(1, len(rows)): @@ -317,11 +374,14 @@ class SnoopR(plugins.Plugin): return R * c def on_webhook(self, path, request): - if request.method == 'GET' and (path == '/' or not path): - all_networks = self.__db.get_all_networks() - snoopers = [n for n in all_networks if n['is_snooper']] - center = [float(self.__last_gps['latitude']), float(self.__last_gps['longitude'])] if self.__last_gps['latitude'] != '-' else [0, 0] - return render_template_string(HTML_PAGE, networks=all_networks, snoopers=snoopers, center=center) + if request.method == 'GET': + sort_by = request.args.get('sort_by', None) + filter_by = request.args.get('filter_by', None) + if path == '/' or not path: + all_networks = self.__db.get_all_networks(sort_by=sort_by, filter_by=filter_by) + snoopers = [n for n in all_networks if n['is_snooper']] + center = [float(self.__last_gps['latitude']), float(self.__last_gps['longitude'])] if self.__last_gps['latitude'] != '-' else [0, 0] + return render_template_string(HTML_PAGE, networks=all_networks, snoopers=snoopers, center=center, sort_by=sort_by, filter_by=filter_by) return "Not found.", 404 HTML_PAGE = ''' @@ -335,30 +395,63 @@ HTML_PAGE = ''' +

SnoopR - Wardrived Networks

+
+ + + +
- + - + {% for network in networks %} - + @@ -372,25 +465,87 @@ HTML_PAGE = '''
Device TypeDevice Type MAC Address Type Name First Seen Last Seen # SessionsSnooperSnooper
{{ network.device_type }} {{ network.mac }} {{ network.type }}
+