Update snoopr.py

This updated version (2.0.0) brings a host of new features, including richer data collection, smarter snooper detection, whitelisting, automatic data pruning, and an improved web interface.
This commit is contained in:
AlienMajik
2025-05-04 15:07:32 -07:00
committed by GitHub
parent 5bfa59c226
commit ed769f591d

229
snoopr.py
View File

@ -1,16 +1,16 @@
import logging
import sqlite3
import os
from threading import Lock
from threading import Lock, Thread
from datetime import datetime, timedelta
import time
from math import radians, sin, cos, sqrt, atan2
import subprocess
import pwnagotchi.plugins as plugins
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
import pwnagotchi.ui.fonts as fonts
from flask import render_template_string
import time
from math import radians, sin, cos, sqrt, atan2
from datetime import datetime
import subprocess
from flask import render_template_string, request, jsonify
class Database:
def __init__(self, path):
@ -21,6 +21,7 @@ class Database:
logging.info('[SnoopR] Setting up database connection...')
self.__connection = sqlite3.connect(self.__path, check_same_thread=False)
cursor = self.__connection.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -33,7 +34,7 @@ class Database:
mac TEXT NOT NULL UNIQUE,
type TEXT NOT NULL,
name TEXT,
device_type TEXT NOT NULL, -- 'wifi' or 'bluetooth'
device_type TEXT NOT NULL,
is_snooper INTEGER DEFAULT 0
)
''')
@ -46,11 +47,23 @@ class Database:
signal_strength INTEGER,
latitude TEXT,
longitude TEXT,
channel INTEGER,
auth_mode TEXT,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(session_id) REFERENCES sessions(id),
FOREIGN KEY(network_id) REFERENCES networks(id)
)
''')
cursor.execute("PRAGMA table_info(detections)")
columns = [column[1] for column in cursor.fetchall()]
if 'channel' not in columns:
cursor.execute("ALTER TABLE detections ADD COLUMN channel INTEGER")
logging.info('[SnoopR] Added "channel" column to detections table')
if 'auth_mode' not in columns:
cursor.execute("ALTER TABLE detections ADD COLUMN auth_mode TEXT")
logging.info('[SnoopR] Added "auth_mode" column to detections table')
cursor.close()
self.__connection.commit()
logging.info('[SnoopR] Successfully connected to db')
@ -68,7 +81,7 @@ class Database:
self.__connection.commit()
return session_id
def add_detection(self, session_id, mac, type_, name, device_type, encryption, signal_strength, latitude, longitude):
def add_detection(self, session_id, mac, type_, name, device_type, encryption, signal_strength, latitude, longitude, channel, auth_mode):
cursor = self.__connection.cursor()
cursor.execute('SELECT id FROM networks WHERE mac = ? AND device_type = ?', (mac, device_type))
result = cursor.fetchone()
@ -78,16 +91,16 @@ class Database:
cursor.execute('INSERT INTO networks (mac, type, name, device_type) VALUES (?, ?, ?, ?)', (mac, type_, name, device_type))
network_id = cursor.lastrowid
cursor.execute('''
INSERT INTO detections (session_id, network_id, encryption, signal_strength, latitude, longitude)
VALUES (?, ?, ?, ?, ?, ?)
''', (session_id, network_id, encryption, signal_strength, latitude, longitude))
INSERT INTO detections (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (session_id, network_id, encryption, signal_strength, latitude, longitude, channel, auth_mode))
cursor.close()
self.__connection.commit()
return network_id
def get_all_networks(self):
def get_all_networks(self, sort_by=None, filter_by=None):
cursor = self.__connection.cursor()
cursor.execute('''
query = '''
SELECT n.mac, n.type, n.name, n.device_type, MIN(d.timestamp) as first_seen, MIN(d.session_id) as first_session,
MAX(d.timestamp) as last_seen, MAX(d.session_id) as last_session, COUNT(DISTINCT d.session_id) as sessions_count,
d2.latitude, d2.longitude, n.is_snooper
@ -96,8 +109,18 @@ class Database:
LEFT JOIN detections d2 ON n.id = d2.network_id AND d2.timestamp = (
SELECT MAX(timestamp) FROM detections WHERE network_id = n.id
)
GROUP BY n.id, n.mac, n.type, n.name, n.device_type
''')
WHERE 1=1
'''
if filter_by == 'snoopers':
query += ' AND n.is_snooper = 1'
elif filter_by == 'bluetooth':
query += ' AND n.device_type = "bluetooth"'
query += ' GROUP BY n.id, n.mac, n.type, n.name, n.device_type'
if sort_by == 'device_type':
query += ' ORDER BY n.device_type'
elif sort_by == 'is_snooper':
query += ' ORDER BY n.is_snooper DESC'
cursor.execute(query)
rows = cursor.fetchall()
networks = []
for row in rows:
@ -145,11 +168,19 @@ class Database:
cursor.close()
self.__connection.commit()
def prune_old_data(self, days):
cursor = self.__connection.cursor()
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('DELETE FROM detections WHERE timestamp < ?', (cutoff_date,))
cursor.close()
self.__connection.commit()
logging.info(f'[SnoopR] Pruned data older than {days} days')
class SnoopR(plugins.Plugin):
__author__ = 'AlienMajik'
__version__ = '1.0.0'
__version__ = '2.0.0'
__license__ = 'GPL3'
__description__ = 'A plugin for wardriving Wi-Fi and Bluetooth networks and detecting snoopers.'
__description__ = 'A plugin for wardriving Wi-Fi and Bluetooth networks and detecting snoopers with enhanced functionality.'
DEFAULT_PATH = '/root/snoopr'
DATABASE_NAME = 'snoopr.db'
@ -163,21 +194,26 @@ class SnoopR(plugins.Plugin):
self.__session_id = None
self.__bluetooth_enabled = False
self.last_scan_time = 0
self.__whitelist = []
self.prune_days = 30
def on_loaded(self):
logging.info('[SnoopR] Plugin loaded.')
self.__path = self.options.get('path', self.DEFAULT_PATH)
self.__ui_enabled = self.options.get('ui', {}).get('enabled', True)
self.__gps_config = {'method': self.options.get('gps', {}).get('method', 'bettercap')}
self.movement_threshold = self.options.get('movement_threshold', 0.1) # miles
self.time_threshold_minutes = self.options.get('time_threshold_minutes', 5) # minutes
self.movement_threshold = self.options.get('movement_threshold', 0.1)
self.time_threshold_minutes = self.options.get('time_threshold_minutes', 5)
self.__bluetooth_enabled = self.options.get('bluetooth_enabled', False)
self.timer = self.options.get('timer', 45) # Scan interval in seconds
self.timer = self.options.get('timer', 45)
self.__whitelist = self.options.get('whitelist', [])
self.prune_days = self.options.get('prune_days', 30)
if not os.path.exists(self.__path):
os.makedirs(self.__path)
self.__db = Database(os.path.join(self.__path, self.DATABASE_NAME))
self.__session_id = self.__db.new_session()
self.__db.prune_old_data(self.prune_days)
self.ready = True
def on_ui_setup(self, ui):
@ -188,6 +224,9 @@ class SnoopR(plugins.Plugin):
ui.add_element('snoopr_wifi_snoopers', LabeledValue(
color=BLACK, label='WiFi Snoopers:', value='0', position=(7, 105),
label_font=fonts.Small, text_font=fonts.Small))
ui.add_element('snoopr_last_scan', LabeledValue(
color=BLACK, label='Last Scan:', value='N/A', position=(7, 135),
label_font=fonts.Small, text_font=fonts.Small))
if self.__bluetooth_enabled:
ui.add_element('snoopr_bt_networks', LabeledValue(
color=BLACK, label='BT Nets:', value='0', position=(7, 115),
@ -201,9 +240,13 @@ class SnoopR(plugins.Plugin):
current_time = time.time()
if current_time - self.last_scan_time >= self.timer:
self.last_scan_time = current_time
self.on_bluetooth_scan()
Thread(target=self.on_bluetooth_scan).start()
ui.set('snoopr_wifi_networks', str(self.__db.network_count('wifi')))
ui.set('snoopr_wifi_snoopers', str(self.__db.snooper_count('wifi')))
if self.last_scan_time == 0:
ui.set('snoopr_last_scan', 'N/A')
else:
ui.set('snoopr_last_scan', datetime.fromtimestamp(self.last_scan_time).strftime('%H:%M:%S'))
if self.__bluetooth_enabled:
ui.set('snoopr_bt_networks', str(self.__db.network_count('bluetooth')))
ui.set('snoopr_bt_snoopers', str(self.__db.snooper_count('bluetooth')))
@ -213,6 +256,7 @@ class SnoopR(plugins.Plugin):
with ui._lock:
ui.remove_element('snoopr_wifi_networks')
ui.remove_element('snoopr_wifi_snoopers')
ui.remove_element('snoopr_last_scan')
if self.__bluetooth_enabled:
ui.remove_element('snoopr_bt_networks')
ui.remove_element('snoopr_bt_snoopers')
@ -222,6 +266,7 @@ class SnoopR(plugins.Plugin):
def on_unfiltered_ap_list(self, agent, aps):
if not self.ready:
return
with self.__lock:
gps_data = self.__get_gps(agent)
if gps_data and all([gps_data['Latitude'], gps_data['Longitude']]):
self.__last_gps = {
@ -230,23 +275,27 @@ class SnoopR(plugins.Plugin):
'altitude': gps_data['Altitude'] or '-'
}
coordinates = {'latitude': str(gps_data['Latitude']), 'longitude': str(gps_data['Longitude'])}
else:
coordinates = {'latitude': '-', 'longitude': '-'}
for ap in aps:
mac = ap['mac']
ssid = ap['hostname'] if ap['hostname'] != '<hidden>' else ''
if ssid in self.__whitelist:
continue
encryption = f"{ap['encryption']}{ap.get('cipher', '')}{ap.get('authentication', '')}"
rssi = ap['rssi']
self.__db.add_detection(self.__session_id, mac, 'wi-fi ap', ssid, 'wifi', encryption, rssi, coordinates['latitude'], coordinates['longitude'])
channel = ap.get('channel', 0)
auth_mode = ap.get('authentication', '')
network_id = self.__db.add_detection(self.__session_id, mac, 'wi-fi ap', ssid, 'wifi', encryption, rssi, coordinates['latitude'], coordinates['longitude'], channel, auth_mode)
self.check_and_update_snooper_status(mac, 'wifi')
else:
self.__gps_available = False
self.__last_gps = {'latitude': '-', 'longitude': '-', 'altitude': '-'}
def on_bluetooth_scan(self):
if not self.ready or not self.__bluetooth_enabled:
return
with self.__lock:
gps_data = self.__last_gps
if gps_data['latitude'] == '-' or gps_data['longitude'] == '-':
logging.warning('[SnoopR] GPS not available... skipping Bluetooth scan')
logging.warning("[SnoopR] No valid GPS data available, skipping Bluetooth scan.")
return
coordinates = {'latitude': gps_data['latitude'], 'longitude': gps_data['longitude']}
try:
@ -258,18 +307,26 @@ class SnoopR(plugins.Plugin):
continue
mac_address = fields[0]
name = self.get_device_name(mac_address)
self.__db.add_detection(self.__session_id, mac_address, 'bluetooth', name, 'bluetooth', '', 0, coordinates['latitude'], coordinates['longitude'])
if name in self.__whitelist:
continue
network_id = self.__db.add_detection(self.__session_id, mac_address, 'bluetooth', name, 'bluetooth', '', 0, coordinates['latitude'], coordinates['longitude'], 0, '')
self.check_and_update_snooper_status(mac_address, 'bluetooth')
logging.debug(f'[SnoopR] Logged Bluetooth device: {mac_address} ({name})')
except subprocess.CalledProcessError as e:
logging.error(f"[SnoopR] Error running hcitool: {e}")
def get_device_name(self, mac_address):
for attempt in range(3):
try:
cmd_name = f"hcitool name {mac_address}"
name_output = subprocess.check_output(cmd_name.split(), stderr=subprocess.DEVNULL).decode().strip()
return name_output if name_output else 'Unknown'
except subprocess.CalledProcessError:
if attempt < 2:
time.sleep(1)
continue
else:
logging.warning(f"[SnoopR] Failed to get name for {mac_address} after 3 attempts")
return 'Unknown'
def check_and_update_snooper_status(self, mac, device_type):
@ -282,7 +339,7 @@ class SnoopR(plugins.Plugin):
ORDER BY d.timestamp
''', (mac, device_type))
rows = cursor.fetchall()
if len(rows) < 2:
if len(rows) < 3:
return
is_snooper = False
for i in range(1, len(rows)):
@ -317,11 +374,14 @@ class SnoopR(plugins.Plugin):
return R * c
def on_webhook(self, path, request):
if request.method == 'GET' and (path == '/' or not path):
all_networks = self.__db.get_all_networks()
if request.method == 'GET':
sort_by = request.args.get('sort_by', None)
filter_by = request.args.get('filter_by', None)
if path == '/' or not path:
all_networks = self.__db.get_all_networks(sort_by=sort_by, filter_by=filter_by)
snoopers = [n for n in all_networks if n['is_snooper']]
center = [float(self.__last_gps['latitude']), float(self.__last_gps['longitude'])] if self.__last_gps['latitude'] != '-' else [0, 0]
return render_template_string(HTML_PAGE, networks=all_networks, snoopers=snoopers, center=center)
return render_template_string(HTML_PAGE, networks=all_networks, snoopers=snoopers, center=center, sort_by=sort_by, filter_by=filter_by)
return "Not found.", 404
HTML_PAGE = '''
@ -335,30 +395,63 @@ HTML_PAGE = '''
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; cursor: pointer; }
th { background-color: #f2f2f2; }
.snooper { background-color: #ffcccc; }
#map { height: 400px; margin-top: 20px; }
.filter-container { margin-bottom: 20px; }
.filter-btn { padding: 5px 10px; margin-right: 10px; cursor: pointer; }
.filter-btn.active { background-color: #4CAF50; color: white; }
#scroll-to-top-button {
position: fixed;
top: 10px;
left: 50%;
transform: translateX(-50%);
padding: 5px 10px;
background-color: #4CAF50;
color: white;
border: none;
cursor: pointer;
z-index: 1000;
}
#scroll-to-bottom-button {
position: fixed;
bottom: 10px;
left: 50%;
transform: translateX(-50%);
padding: 5px 10px;
background-color: #4CAF50;
color: white;
border: none;
cursor: pointer;
z-index: 1000;
}
</style>
</head>
<body>
<button id="scroll-to-top-button">Scroll to Top</button>
<h1>SnoopR - Wardrived Networks</h1>
<div class="filter-container">
<button class="filter-btn active" onclick="toggleFilter('all')">All Networks</button>
<button class="filter-btn" onclick="toggleFilter('snoopers')">Snoopers</button>
<button class="filter-btn" onclick="toggleFilter('bluetooth')">Bluetooth Networks</button>
</div>
<table>
<thead>
<tr>
<th>Device Type</th>
<th><a href="?sort_by=device_type&filter_by={{ filter_by }}">Device Type</a></th>
<th>MAC Address</th>
<th>Type</th>
<th>Name</th>
<th>First Seen</th>
<th>Last Seen</th>
<th># Sessions</th>
<th>Snooper</th>
<th><a href="?sort_by=is_snooper&filter_by={{ filter_by }}">Snooper</a></th>
</tr>
</thead>
<tbody>
{% for network in networks %}
<tr class="{{ 'snooper' if network.is_snooper else '' }}">
<tr onclick="panToNetwork({{ network.latitude }}, {{ network.longitude }})" class="{{ 'snooper' if network.is_snooper else '' }}">
<td>{{ network.device_type }}</td>
<td>{{ network.mac }}</td>
<td>{{ network.type }}</td>
@ -372,25 +465,87 @@ HTML_PAGE = '''
</tbody>
</table>
<div id="map"></div>
<button id="scroll-to-bottom-button">Scroll to Bottom</button>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script>
document.getElementById('scroll-to-top-button').addEventListener('click', function() {
window.scrollTo({
top: 0,
behavior: 'smooth'
});
});
document.getElementById('scroll-to-bottom-button').addEventListener('click', function() {
window.scrollTo({
top: document.body.scrollHeight,
behavior: 'smooth'
});
});
var networks = {{ networks | tojson }};
var center = {{ center | tojson }};
var map = L.map('map').setView(center, 13);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
attribution: '© OpenStreetMap contributors'
}).addTo(map);
var markers = [];
networks.forEach(function(network) {
if (network.latitude && network.longitude) {
var color = network.is_snooper ? 'red' : 'blue';
L.circleMarker([network.latitude, network.longitude], {
var marker = L.circleMarker([network.latitude, network.longitude], {
color: color,
radius: 5
}).addTo(map).bindPopup(
}).bindPopup(
`Device Type: ${network.device_type}<br>MAC: ${network.mac}<br>Name: ${network.name}<br>Snooper: ${network.is_snooper ? 'Yes' : 'No'}`
);
markers.push({marker: marker, network: network});
marker.addTo(map);
}
});
function panToNetwork(lat, lon) {
if (lat && lon) {
map.panTo([lat, lon]);
}
}
var currentFilter = 'all';
function toggleFilter(filter) {
if (currentFilter === filter) return;
currentFilter = filter;
// Update button styles
document.querySelectorAll('.filter-btn').forEach(btn => {
btn.classList.remove('active');
if (btn.textContent.toLowerCase().includes(filter)) {
btn.classList.add('active');
}
});
// Update map markers
markers.forEach(function(item) {
var network = item.network;
map.removeLayer(item.marker);
if (filter === 'all' ||
(filter === 'snoopers' && network.is_snooper) ||
(filter === 'bluetooth' && network.device_type === 'bluetooth')) {
item.marker.addTo(map);
}
});
}
var bounds = [];
networks.forEach(function(network) {
if (network.latitude && network.longitude) {
bounds.push([network.latitude, network.longitude]);
}
});
if (bounds.length > 0) {
map.fitBounds(bounds);
} else {
map.setView(center, 13);
}
</script>
</body>
</html>