mirror of
https://github.com/AlienMajik/pwnagotchi_plugins.git
synced 2025-07-01 10:27:27 -04:00
89 lines
3.7 KiB
Python
89 lines
3.7 KiB
Python
import logging
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import pwnagotchi.plugins as plugins
|
|
import pwnagotchi.ui.fonts as fonts
|
|
from pwnagotchi.ui.components import LabeledValue
|
|
from pwnagotchi.ui.view import BLACK
|
|
|
|
class ADSBSniffer(plugins.Plugin):
|
|
__author__ = '4li3nMaJ1k'
|
|
__version__ = '0.1.0'
|
|
__license__ = 'GPL3'
|
|
__description__ = 'A plugin that captures ADS-B data from aircraft using RTL-SDR and logs it.'
|
|
|
|
def __init__(self):
|
|
self.options = {
|
|
'timer': 60, # Time interval in seconds for checking for new aircraft
|
|
'aircraft_file': '/root/handshakes/adsb_aircraft.json', # File to store detected aircraft information
|
|
'adsb_x_coord': 160,
|
|
'adsb_y_coord': 80
|
|
}
|
|
self.last_scan_time = 0
|
|
self.data = {}
|
|
|
|
def on_loaded(self):
|
|
logging.info("[ADSB] ADSBSniffer plugin loaded.")
|
|
if not os.path.exists(os.path.dirname(self.options['aircraft_file'])):
|
|
os.makedirs(os.path.dirname(self.options['aircraft_file']))
|
|
if not os.path.exists(self.options['aircraft_file']):
|
|
with open(self.options['aircraft_file'], 'w') as f:
|
|
json.dump({}, f)
|
|
with open(self.options['aircraft_file'], 'r') as f:
|
|
self.data = json.load(f)
|
|
|
|
def on_ui_setup(self, ui):
|
|
ui.add_element('ADSB', LabeledValue(color=BLACK,
|
|
label='ADSB',
|
|
value=" ",
|
|
position=(self.options["adsb_x_coord"],
|
|
self.options["adsb_y_coord"]),
|
|
label_font=fonts.Small,
|
|
text_font=fonts.Small))
|
|
|
|
def on_ui_update(self, ui):
|
|
current_time = time.time()
|
|
if current_time - self.last_scan_time >= self.options['timer']:
|
|
self.last_scan_time = current_time
|
|
result = self.scan()
|
|
ui.set('ADSB', result)
|
|
|
|
def scan(self):
|
|
logging.info("[ADSB] Scanning for ADS-B signals...")
|
|
cmd = "timeout 10s rtl_adsb"
|
|
try:
|
|
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
|
|
aircrafts = self.parse_output(output.decode('utf-8'))
|
|
return f"{len(aircrafts)} aircrafts detected"
|
|
except subprocess.CalledProcessError as e:
|
|
if e.returncode == 124: # Graceful handling of the timeout exit status
|
|
logging.info("[ADSB] Successfully completed ADS-B scan.")
|
|
output = e.output.decode('utf-8')
|
|
aircrafts = self.parse_output(output)
|
|
return f"{len(aircrafts)} aircrafts detected"
|
|
else:
|
|
logging.error("[ADSB] Error running rtl_adsb: %s, output: %s", e, e.output.decode('utf-8'))
|
|
return "Scan error"
|
|
|
|
def parse_output(self, raw_data):
|
|
aircrafts = []
|
|
for line in raw_data.split('\n'):
|
|
if line.strip():
|
|
aircraft_data = line.split(',')
|
|
if len(aircraft_data) >= 2:
|
|
hex_id, signal = aircraft_data[0], aircraft_data[1]
|
|
aircrafts.append({'hex': hex_id, 'signal_strength': signal})
|
|
self.data[hex_id] = {'last_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
'signal_strength': signal}
|
|
with open(self.options['aircraft_file'], 'w') as f:
|
|
json.dump(self.data, f)
|
|
return aircrafts
|
|
|
|
def on_unload(self, ui):
|
|
with ui._lock:
|
|
ui.remove_element('ADSB')
|