import logging import sqlite3 import os from threading import Lock, Thread from datetime import datetime, timedelta import time from math import radians, sin, cos, sqrt, atan2 import subprocess import pwnagotchi.plugins as plugins from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts from flask import render_template_string, request, jsonify import json import socket import select import requests class Database: def __init__(self, path): self.__path = path self.__db_connect() def __db_connect(self): logging.info('[SnoopR] Setting up database connection...') self.__connection = sqlite3.connect(self.__path, check_same_thread=False) cursor = self.__connection.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS networks ( id INTEGER PRIMARY KEY AUTOINCREMENT, mac TEXT NOT NULL UNIQUE, type TEXT NOT NULL, name TEXT, device_type TEXT NOT NULL, is_snooper INTEGER DEFAULT 0 ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS detections ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, network_id INTEGER NOT NULL, encryption TEXT, signal_strength INTEGER, latitude TEXT, longitude TEXT, channel INTEGER, auth_mode TEXT, timestamp TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(id), FOREIGN KEY(network_id) REFERENCES networks(id) ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_detections_network_id ON detections(network_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_networks_mac ON networks(mac)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_detections_timestamp ON detections(timestamp)') cursor.execute("PRAGMA table_info(detections)") columns = [column[1] for column in cursor.fetchall()] if 'channel' not in columns: cursor.execute("ALTER TABLE detections ADD COLUMN channel INTEGER") logging.info('[SnoopR] Added "channel" column to detections table') if 'auth_mode' not in columns: cursor.execute("ALTER TABLE detections ADD COLUMN auth_mode TEXT") logging.info('[SnoopR] Added "auth_mode" column to detections table') cursor.close() self.__connection.commit() logging.info('[SnoopR] Successfully connected to db') def disconnect(self): self.__connection.commit() self.__connection.close() logging.info('[SnoopR] Closed db connection') def new_session(self): cursor = self.__connection.cursor() cursor.execute('INSERT INTO sessions DEFAULT VALUES') session_id = cursor.lastrowid cursor.close() self.__connection.commit() return session_id def add_detection(self, session_id, mac, type_, name, device_type, encryption, signal_strength, latitude, longitude, channel, auth_mode): cursor = self.__connection.cursor() try: cursor.execute('SELECT id FROM networks WHERE mac = ? AND device_type = ?', (mac, device_type)) result = cursor.fetchone() if result: network_id = result[0] else: cursor.execute('INSERT INTO networks (mac, type, name, device_type) VALUES (?, ?, ?, ?)', (mac, type_, name, device_type)) network_id = cursor.lastrowid cursor.execute(''' INSERT INTO detections (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode)) self.__connection.commit() logging.debug(f'[SnoopR] Added detection: {mac} ({device_type}) at {latitude},{longitude}') return network_id except Exception as e: logging.error(f'[SnoopR] Database error: {e}') raise finally: cursor.close() def add_detection_batch(self, detections): cursor = self.__connection.cursor() try: for mac, type_, name, device_type, encryption, signal_strength, latitude, longitude, channel, auth_mode, session_id in detections: cursor.execute('SELECT id FROM networks WHERE mac = ? AND device_type = ?', (mac, device_type)) result = cursor.fetchone() if result: network_id = result[0] else: cursor.execute('INSERT INTO networks (mac, type, name, device_type) VALUES (?, ?, ?, ?)', (mac, type_, name, device_type)) network_id = cursor.lastrowid cursor.execute(''' INSERT INTO detections (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode)) logging.debug(f'[SnoopR] Batch added: {mac} ({device_type}) at {latitude},{longitude}') self.__connection.commit() except Exception as e: logging.error(f'[SnoopR] Batch database error: {e}') raise finally: cursor.close() def get_network_details(self, mac, device_type): cursor = self.__connection.cursor() try: cursor.execute(''' SELECT n.mac, n.type, n.name, n.device_type, n.is_snooper, COUNT(DISTINCT d.session_id) as sessions_count, MIN(d.timestamp) as first_seen, MAX(d.timestamp) as last_seen FROM networks n LEFT JOIN detections d ON n.id = d.network_id WHERE n.mac = ? AND n.device_type = ? GROUP BY n.id ''', (mac, device_type)) network = cursor.fetchone() if not network: return None cursor.execute(''' SELECT latitude, longitude, timestamp, signal_strength FROM detections d JOIN networks n ON d.network_id = n.id WHERE n.mac = ? AND n.device_type = ? ORDER BY d.timestamp ''', (mac, device_type)) detections = cursor.fetchall() result = { 'mac': network[0], 'type': network[1], 'name': network[2], 'device_type': network[3], 'is_snooper': bool(network[4]), 'sessions_count': network[5], 'first_seen': network[6], 'last_seen': network[7], 'detections': [{'latitude': float(d[0]) if d[0] != '-' else None, 'longitude': float(d[1]) if d[1] != '-' else None, 'timestamp': d[2], 'signal_strength': d[3]} for d in detections] } return result finally: cursor.close() def get_all_networks(self, sort_by=None, filter_by=None): cursor = self.__connection.cursor() try: query = ''' SELECT n.mac, n.type, n.name, n.device_type, MIN(d.timestamp) as first_seen, MIN(d.session_id) as first_session, MAX(d.timestamp) as last_seen, MAX(d.session_id) as last_session, COUNT(DISTINCT d.session_id) as sessions_count, d.latitude, d.longitude, n.is_snooper FROM networks n JOIN detections d ON n.id = d.network_id WHERE 1=1 ''' if filter_by == 'snoopers': query += ' AND n.is_snooper = 1' elif filter_by == 'bluetooth': query += ' AND n.device_type = "bluetooth"' elif filter_by == 'aircraft': query += ' AND n.device_type = "aircraft"' query += ' GROUP BY n.id, n.mac, n.type, n.name, n.device_type' if sort_by == 'device_type': query += ' ORDER BY n.device_type' elif sort_by == 'is_snooper': query += ' ORDER BY n.is_snooper DESC' cursor.execute(query) rows = cursor.fetchall() networks = [] for row in rows: mac, type_, name, device_type, first_seen, first_session, last_seen, last_session, sessions_count, latitude, longitude, is_snooper = row lat = float(latitude) if latitude and latitude != '-' else None lon = float(longitude) if longitude and longitude != '-' else None networks.append({ 'mac': mac, 'type': type_, 'name': name, 'device_type': device_type, 'first_seen': first_seen, 'first_session': first_session, 'last_seen': last_seen, 'last_session': last_session, 'sessions_count': sessions_count, 'latitude': lat, 'longitude': lon, 'is_snooper': bool(is_snooper) }) logging.debug(f'[SnoopR] Web UI networks: {len(networks)} entries, first: {networks[0] if networks else "None"}') return networks except Exception as e: logging.error(f'[SnoopR] get_all_networks error: {e}') return [] finally: cursor.close() def network_count(self, device_type=None): cursor = self.__connection.cursor() try: if device_type: cursor.execute('SELECT COUNT(DISTINCT mac) FROM networks WHERE device_type = ?', (device_type,)) else: cursor.execute('SELECT COUNT(DISTINCT mac) FROM networks') count = cursor.fetchone()[0] return count finally: cursor.close() def snooper_count(self, device_type=None): cursor = self.__connection.cursor() try: if device_type: cursor.execute('SELECT COUNT(*) FROM networks WHERE is_snooper = 1 AND device_type = ?', (device_type,)) else: cursor.execute('SELECT COUNT(*) FROM networks WHERE is_snooper = 1') count = cursor.fetchone()[0] return count finally: cursor.close() def update_snooper_status(self, mac, device_type, is_snooper): cursor = self.__connection.cursor() try: cursor.execute('UPDATE networks SET is_snooper = ? WHERE mac = ? AND device_type = ?', (is_snooper, mac, device_type)) self.__connection.commit() finally: cursor.close() def prune_old_data(self, days): cursor = self.__connection.cursor() try: cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute('DELETE FROM detections WHERE timestamp < ?', (cutoff_date,)) self.__connection.commit() logging.info(f'[SnoopR] Pruned data older than {days} days') finally: cursor.close() class MeshNetwork: def __init__(self, host_ip, port, peers): self.host_ip = host_ip self.port = port self.peers = peers self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.bind((host_ip, port)) self.socket.setblocking(False) def broadcast_detection(self, detection): data = json.dumps(detection).encode('utf-8') for peer in self.peers: try: self.socket.sendto(data, (peer, self.port)) except Exception as e: logging.error(f'[SnoopR] Failed to send to {peer}: {e}') def receive_detections(self, db): ready, _, _ = select.select([self.socket], [], [], 0.1) if ready: data, _ = self.socket.recvfrom(4096) detection = json.loads(data.decode('utf-8')) db.add_detection_batch([( detection['mac'], detection['type'], detection['name'], detection['device_type'], detection['encryption'], detection['signal_strength'], detection['latitude'], detection['longitude'], detection['channel'], detection['auth_mode'], detection['session_id'] )]) logging.debug(f'[SnoopR] Received mesh detection: {detection["mac"]}') def close(self): self.socket.close() class SnoopR(plugins.Plugin): __author__ = 'AlienMajik' __version__ = '2.0.1' __license__ = 'GPL3' __description__ = 'Enhanced wardriving plugin with robust GPS/Bluetooth/Wi-Fi and SkyHigh integration, including aircraft tracking.' DEFAULT_PATH = '/root/snoopr' DATABASE_NAME = 'snoopr.db' def __init__(self): self.__db = None self.ready = False self.__gps_available = True self.__lock = Lock() self.__last_gps = {'latitude': '-', 'longitude': '-', 'altitude': '-'} self.__last_valid_gps = None self.__session_id = None self.__bluetooth_enabled = False self.last_scan_time = 0 self.__whitelist = [] self.prune_days = 30 self.__mesh = None self.__ap_batch = [] self.__recent_aircraft = {} def on_loaded(self): logging.info('[SnoopR] Plugin loaded.') self.__path = self.options.get('path', self.DEFAULT_PATH) self.__ui_enabled = self.options.get('ui', {}).get('enabled', True) self.__gps_config = {'method': self.options.get('gps', {}).get('method', 'bettercap')} self.movement_threshold = self.options.get('movement_threshold', 0.1) self.time_threshold_minutes = self.options.get('time_threshold_minutes', 5) self.__bluetooth_enabled = self.options.get('bluetooth_enabled', False) self.timer = self.options.get('timer', 45) self.__whitelist = self.options.get('whitelist', []) self.prune_days = self.options.get('prune_days', 30) self.mesh_enabled = self.options.get('mesh_enabled', False) self.mesh_host_ip = self.options.get('mesh_host_ip', '192.168.1.1') self.mesh_port = self.options.get('mesh_port', 9999) self.mesh_peers = self.options.get('mesh_peers', []) self.aircraft_file = self.options.get('aircraft_file', '/root/handshakes/skyhigh_aircraft.json') self.log_without_gps = self.options.get('log_without_gps', True) if not os.path.exists(self.__path): os.makedirs(self.__path) self.__db = Database(os.path.join(self.__path, self.DATABASE_NAME)) self.__session_id = self.__db.new_session() self.__db.prune_old_data(self.prune_days) if self.mesh_enabled: try: self.__mesh = MeshNetwork(self.mesh_host_ip, self.mesh_port, self.mesh_peers) except Exception as e: logging.error(f'[SnoopR] Mesh setup failed: {e}') self.__mesh = None self.ready = True def on_ui_setup(self, ui): if self.__ui_enabled: ui.add_element('snoopr_wifi_networks', LabeledValue( color=BLACK, label='WiFi Nets:', value='0', position=(7, 95), label_font=fonts.Small, text_font=fonts.Small)) ui.add_element('snoopr_wifi_snoopers', LabeledValue( color=BLACK, label='WiFi Snoopers:', value='0', position=(7, 105), label_font=fonts.Small, text_font=fonts.Small)) ui.add_element('snoopr_last_scan', LabeledValue( color=BLACK, label='Last Scan:', value='N/A', position=(7, 135), label_font=fonts.Small, text_font=fonts.Small)) if self.__bluetooth_enabled: ui.add_element('snoopr_bt_networks', LabeledValue( color=BLACK, label='BT Nets:', value='0', position=(7, 115), label_font=fonts.Small, text_font=fonts.Small)) ui.add_element('snoopr_bt_snoopers', LabeledValue( color=BLACK, label='BT Snoopers:', value='0', position=(7, 125), label_font=fonts.Small, text_font=fonts.Small)) def on_ui_update(self, ui): if self.__ui_enabled and self.ready: current_time = time.time() if current_time - self.last_scan_time >= self.timer: self.last_scan_time = current_time Thread(target=self.on_bluetooth_scan).start() if self.mesh_enabled and self.__mesh: self.__mesh.receive_detections(self.__db) if os.path.exists(self.aircraft_file): try: with open(self.aircraft_file, 'r') as f: aircraft = json.load(f) if isinstance(aircraft, dict): aircraft_data = list(aircraft.values()) logging.debug(f'[SnoopR] Aircraft loaded: {len(aircraft_data)} entries, first: {aircraft_data[0] if aircraft_data else "None"}') elif not isinstance(aircraft, list): logging.warning(f"[SnoopR] Invalid aircraft file format: {type(aircraft)}") aircraft_data = [] else: aircraft_data = aircraft for plane in aircraft_data: if not isinstance(plane, dict): logging.warning(f"[SnoopR] Invalid aircraft entry: {plane}") continue icao = plane.get('icao24', 'UNKNOWN') callsign = plane.get('callsign', 'UNKNOWN').strip() lat = plane.get('latitude') lon = plane.get('longitude') is_drone = plane.get('is_drone', False) altitude = plane.get('alt', 10000) if icao not in self.__recent_aircraft: self.__recent_aircraft[icao] = {'last_seen': current_time, 'positions': [], 'snooper': is_drone} self.__recent_aircraft[icao]['positions'].append((lat, lon, current_time, altitude)) self.__recent_aircraft[icao]['last_seen'] = current_time # Log aircraft detection to database if lat not in (None, 'N/A') and lon not in (None, 'N/A'): try: network_id = self.__db.add_detection( self.__session_id, icao, 'aircraft', callsign, 'aircraft', '', 0, str(lat), str(lon), 0, '' ) self.check_aircraft_snooper_status(icao, 'aircraft') except Exception as e: logging.error(f'[SnoopR] Failed to log aircraft detection: {e}') # Proximity alert if self.__last_gps['latitude'] != '-' and lat not in (None, 'N/A') and lon not in (None, 'N/A'): try: dist = self.__calculate_distance( float(self.__last_gps['latitude']), float(self.__last_gps['longitude']), float(lat), float(lon) ) if dist < 5 and (icao not in self.__recent_aircraft or current_time - self.__recent_aircraft[icao]['last_seen'] > 60): alert_type = 'Drone' if is_drone else 'Aircraft' snooper_flag = 'Snooper ' if self.__recent_aircraft[icao]['snooper'] else '' logging.critical(f"[SnoopR] {snooper_flag}{alert_type} {icao} ({callsign}) detected {dist:.2f} miles away!") self.__notify(f"{snooper_flag}{alert_type} {icao} ({callsign}) {dist:.2f}mi away!") self.__recent_aircraft[icao]['last_seen'] = current_time except (ValueError, TypeError) as e: logging.warning(f"[SnoopR] Invalid aircraft coords: {e}") self.__recent_aircraft = {k: v for k, v in self.__recent_aircraft.items() if current_time - v['last_seen'] < 3600} except Exception as e: logging.error(f"[SnoopR] Failed to process aircraft file: {e}") ui.set('snoopr_wifi_networks', str(self.__db.network_count('wifi'))) ui.set('snoopr_wifi_snoopers', str(self.__db.snooper_count('wifi'))) if self.last_scan_time == 0: ui.set('snoopr_last_scan', 'N/A') else: ui.set('snoopr_last_scan', datetime.fromtimestamp(self.last_scan_time).strftime('%H:%M:%S')) if self.__bluetooth_enabled: ui.set('snoopr_bt_networks', str(self.__db.network_count('bluetooth'))) ui.set('snoopr_bt_snoopers', str(self.__db.snooper_count('bluetooth'))) def on_unload(self, ui): if self.__ui_enabled: with ui._lock: ui.remove_element('snoopr_wifi_networks') ui.remove_element('snoopr_wifi_snoopers') ui.remove_element('snoopr_last_scan') if self.__bluetooth_enabled: ui.remove_element('snoopr_bt_networks') ui.remove_element('snoopr_bt_snoopers') self.__db.disconnect() if self.mesh_enabled and self.__mesh: self.__mesh.close() logging.info('[SnoopR] Plugin unloaded') def __get_gps(self, agent): for attempt in range(5): try: time.sleep(5) # Increased delay for bettercap API info = agent.session() gps_data = info.get('gps', None) if gps_data and gps_data.get('Latitude') and gps_data.get('Longitude'): logging.debug(f'[SnoopR] Bettercap GPS acquired: {gps_data["Latitude"]}, {gps_data["Longitude"]}') self.__last_valid_gps = gps_data return gps_data except Exception as e: logging.warning(f'[SnoopR] Bettercap GPS failed (attempt {attempt + 1}/5): {e}') logging.warning('[SnoopR] No GPS data after retries') return self.__last_valid_gps if self.__last_valid_gps else None def on_unfiltered_ap_list(self, agent, aps): if not self.ready: return with self.__lock: gps_data = self.__get_gps(agent) if gps_data and all([gps_data['Latitude'], gps_data['Longitude']]): self.__last_gps = { 'latitude': gps_data['Latitude'], 'longitude': gps_data['Longitude'], 'altitude': gps_data['Altitude'] or '-' } coordinates = {'latitude': str(gps_data['Latitude']), 'longitude': str(gps_data['Longitude'])} else: coordinates = {'latitude': '-', 'longitude': '-'} self.__ap_batch = [] for ap in aps: mac = ap['mac'] ssid = ap['hostname'] if ap['hostname'] != '' else '' if ssid in self.__whitelist: logging.debug(f'[SnoopR] Skipping whitelisted AP: {ssid}') continue encryption = f"{ap['encryption']}{ap.get('cipher', '')}{ap.get('authentication', '')}" rssi = ap['rssi'] channel = ap.get('channel', 0) auth_mode = ap.get('authentication', '') self.__ap_batch.append(( mac, 'wi-fi ap', ssid, 'wifi', encryption, rssi, coordinates['latitude'], coordinates['longitude'], channel, auth_mode, self.__session_id )) logging.info(f'[SnoopR] Detected Wi-Fi AP: {mac} ({ssid}) at {coordinates["latitude"]},{coordinates["longitude"]}') self.check_and_update_snooper_status(mac, 'wifi') if self.__ap_batch: try: self.__db.add_detection_batch(self.__ap_batch) logging.info(f'[SnoopR] Saved {len(self.__ap_batch)} Wi-Fi detections') if self.mesh_enabled and self.__mesh: for detection in self.__ap_batch: self.__mesh.broadcast_detection({ 'mac': detection[0], 'type': detection[1], 'name': detection[2], 'device_type': detection[3], 'encryption': detection[4], 'signal_strength': detection[5], 'latitude': detection[6], 'longitude': detection[7], 'channel': detection[8], 'auth_mode': detection[9], 'session_id': detection[10] }) except Exception as e: logging.error(f'[SnoopR] Failed to save Wi-Fi batch: {e}') def on_bluetooth_scan(self): if not self.ready or not self.__bluetooth_enabled: logging.debug('[SnoopR] Bluetooth scan skipped: not ready or disabled') return with self.__lock: gps_data = self.__get_gps(None) if gps_data and all([gps_data['Latitude'], gps_data['Longitude']]): self.__last_gps = { 'latitude': gps_data['Latitude'], 'longitude': gps_data['Longitude'], 'altitude': gps_data['Altitude'] or '-' } coordinates = {'latitude': str(gps_data['Latitude']), 'longitude': str(gps_data['Longitude'])} else: if not self.log_without_gps: logging.warning("[SnoopR] No valid GPS data available, skipping Bluetooth scan.") return coordinates = {'latitude': '-', 'longitude': '-'} for attempt in range(3): try: logging.debug(f'[SnoopR] Attempting Bluetooth scan (attempt {attempt + 1}/3)') cmd_inq = "hcitool inq --flush" inq_output = subprocess.check_output(cmd_inq.split(), stderr=subprocess.STDOUT).decode().splitlines() for line in inq_output[1:]: fields = line.split() if len(fields) < 1: continue mac_address = fields[0] name = self.get_device_name(mac_address) if name in self.__whitelist: logging.debug(f'[SnoopR] Skipping whitelisted Bluetooth: {name}') continue network_id = self.__db.add_detection( self.__session_id, mac_address, 'bluetooth', name, 'bluetooth', '', 0, coordinates['latitude'], coordinates['longitude'], 0, '' ) self.check_and_update_snooper_status(mac_address, 'bluetooth') logging.info(f'[SnoopR] Logged Bluetooth device: {mac_address} ({name}) at {coordinates["latitude"]},{coordinates["longitude"]}') break # Exit loop on success except subprocess.CalledProcessError as e: logging.error(f"[SnoopR] hcitool scan failed (attempt {attempt + 1}/3): {e.output.decode()}") if attempt < 2: time.sleep(1) continue logging.error("[SnoopR] Bluetooth scan failed after 3 attempts") self.__notify("Bluetooth scan failed!") def get_device_name(self, mac_address): for attempt in range(3): try: cmd_name = f"hcitool name {mac_address}" name_output = subprocess.check_output(cmd_name.split(), stderr=subprocess.STDOUT).decode().strip() return name_output if name_output else 'Unknown' except subprocess.CalledProcessError: if attempt < 2: time.sleep(1) continue logging.warning(f"[SnoopR] Failed to get name for {mac_address} after 3 attempts") return 'Unknown' def check_and_update_snooper_status(self, mac, device_type): cursor = self.__db._Database__connection.cursor() try: cursor.execute(''' SELECT d.latitude, d.longitude, d.timestamp FROM detections d JOIN networks n ON d.network_id = n.id WHERE n.mac = ? AND n.device_type = ? ORDER BY d.timestamp ''', (mac, device_type)) rows = cursor.fetchall() cursor.execute('SELECT COUNT(DISTINCT session_id) FROM detections d JOIN networks n ON d.network_id = n.id WHERE n.mac = ? AND n.device_type = ?', (mac, device_type)) session_count = cursor.fetchone()[0] if len(rows) < 3 or session_count < 2: return is_snooper = False for i in range(1, len(rows)): lat1, lon1, t1 = rows[i-1] lat2, lon2, t2 = rows[i] if lat1 == '-' or lon1 == '-' or lat2 == '-' or lon2 == '-': continue lat1, lon1, lat2, lon2 = map(float, [lat1, lon1, lat2, lon2]) t1 = datetime.strptime(t1, '%Y-%m-%d %H:%M:%S') t2 = datetime.strptime(t2, '%Y-%m-%d %H:%M:%S') time_diff = (t2 - t1).total_seconds() / 60.0 if time_diff > self.time_threshold_minutes: dist = self.__calculate_distance(lat1, lon1, lat2, lon2) velocity = (dist * 1609.34) / (time_diff * 60) if dist > self.movement_threshold or velocity > 1.5: is_snooper = True break self.__db.update_snooper_status(mac, device_type, int(is_snooper)) finally: cursor.close() def check_aircraft_snooper_status(self, icao, device_type): cursor = self.__db._Database__connection.cursor() try: cursor.execute(''' SELECT d.latitude, d.longitude, d.timestamp FROM detections d JOIN networks n ON d.network_id = n.id WHERE n.mac = ? AND n.device_type = ? ORDER BY d.timestamp ''', (icao, device_type)) rows = cursor.fetchall() cursor.execute('SELECT COUNT(DISTINCT session_id) FROM detections d JOIN networks n ON d.network_id = n.id WHERE n.mac = ? AND n.device_type = ?', (icao, device_type)) session_count = cursor.fetchone()[0] if len(rows) < 3 or session_count < 2: return is_snooper = False for i in range(1, len(rows)): lat1, lon1, t1 = rows[i-1] lat2, lon2, t2 = rows[i] if lat1 == '-' or lon1 == '-' or lat2 == '-' or lon2 == '-': continue lat1, lon1, lat2, lon2 = map(float, [lat1, lon1, lat2, lon2]) t1 = datetime.strptime(t1, '%Y-%m-%d %H:%M:%S') t2 = datetime.strptime(t2, '%Y-%m-%d %H:%M:%S') time_diff = (t2 - t1).total_seconds() / 3600.0 # Hours for aircraft if time_diff > self.time_threshold_minutes / 60.0: dist = self.__calculate_distance(lat1, lon1, lat2, lon2) velocity = (dist * 1609.34) / (time_diff * 3600) # mph if dist < 5 and velocity < 50: # Loitering or slow-moving is_snooper = True break self.__db.update_snooper_status(icao, device_type, int(is_snooper)) finally: cursor.close() def __calculate_distance(self, lat1, lon1, lat2, lon2): R = 3958.8 lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * atan2(sqrt(a), sqrt(1-a)) return R * c def __notify(self, message): logging.info(f"[SnoopR] Notification: {message}") def on_webhook(self, path, request): if request.method == 'GET': sort_by = request.args.get('sort_by', None) filter_by = request.args.get('filter_by', None) if path == 'network': mac = request.args.get('mac') device_type = request.args.get('device_type') if mac and device_type: details = self.__db.get_network_details(mac, device_type) return jsonify(details) if details else ("Not found.", 404) if path == '/' or not path: all_networks = self.__db.get_all_networks(sort_by=sort_by, filter_by=filter_by) snoopers = [n for n in all_networks if n['is_snooper']] center = [float(self.__last_gps['latitude']), float(self.__last_gps['longitude'])] if self.__last_gps['latitude'] != '-' else [37.7177, -122.4393] # Add aircraft to map aircraft_markers = [] if os.path.exists(self.aircraft_file): try: with open(self.aircraft_file, 'r') as f: aircraft = json.load(f) if isinstance(aircraft, dict): aircraft_data = list(aircraft.values()) else: aircraft_data = aircraft if isinstance(aircraft, list) else [] for plane in aircraft_data: if plane.get('latitude') and plane.get('longitude'): icao = plane.get('icao24', 'UNKNOWN') is_snooper = self.__recent_aircraft.get(icao, {}).get('snooper', plane.get('is_drone', False)) aircraft_markers.append({ 'mac': icao, 'type': 'aircraft', 'name': plane.get('callsign', 'UNKNOWN').strip(), 'device_type': 'aircraft', 'first_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'first_session': self.__session_id, 'last_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'last_session': self.__session_id, 'sessions_count': 1, 'latitude': float(plane['latitude']), 'longitude': float(plane['longitude']), 'is_snooper': is_snooper }) except Exception as e: logging.error(f'[SnoopR] Failed to load aircraft for map: {e}') all_networks.extend(aircraft_markers) logging.debug(f'[SnoopR] Web UI center: {center}, networks: {len(all_networks)}') return render_template_string(HTML_PAGE, networks=all_networks, snoopers=snoopers, center=center, sort_by=sort_by, filter_by=filter_by) return "Not found.", 404 HTML_PAGE = ''' SnoopR - Wardrived Networks & Aircraft

SnoopR - Wardrived Networks & Aircraft

{% for network in networks %} {% endfor %}
Device Type MAC/ICAO24 Type Name/Callsign First Seen Last Seen # Sessions Snooper
{{ network.device_type }} {{ network.mac }} {{ network.type }} {{ network.name }} {{ network.first_seen }} {{ network.last_seen }} {{ network.sessions_count }} {{ 'Yes' if network.is_snooper else 'No' }}
'''