From dd1750b9f4612a28cd95cb1aea21f9aba194ae4b Mon Sep 17 00:00:00 2001 From: AlienMajik <118037572+AlienMajik@users.noreply.github.com> Date: Sun, 18 Feb 2024 23:21:23 -0800 Subject: [PATCH] Add files via upload --- adsbsniffer.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 adsbsniffer.py diff --git a/adsbsniffer.py b/adsbsniffer.py new file mode 100644 index 0000000..0521d65 --- /dev/null +++ b/adsbsniffer.py @@ -0,0 +1,88 @@ +import logging +import os +import subprocess +import json +import time +from datetime import datetime + +import pwnagotchi.plugins as plugins +import pwnagotchi.ui.fonts as fonts +from pwnagotchi.ui.components import LabeledValue +from pwnagotchi.ui.view import BLACK + +class ADSBSniffer(plugins.Plugin): + __author__ = '4li3nMaJ1k' + __version__ = '0.1.0' + __license__ = 'GPL3' + __description__ = 'A plugin that captures ADS-B data from aircraft using RTL-SDR and logs it.' + + def __init__(self): + self.options = { + 'timer': 60, # Time interval in seconds for checking for new aircraft + 'aircraft_file': '/root/handshakes/adsb_aircraft.json', # File to store detected aircraft information + 'adsb_x_coord': 160, + 'adsb_y_coord': 80 + } + self.last_scan_time = 0 + self.data = {} + + def on_loaded(self): + logging.info("[ADSB] ADSBSniffer plugin loaded.") + if not os.path.exists(os.path.dirname(self.options['aircraft_file'])): + os.makedirs(os.path.dirname(self.options['aircraft_file'])) + if not os.path.exists(self.options['aircraft_file']): + with open(self.options['aircraft_file'], 'w') as f: + json.dump({}, f) + with open(self.options['aircraft_file'], 'r') as f: + self.data = json.load(f) + + def on_ui_setup(self, ui): + ui.add_element('ADSB', LabeledValue(color=BLACK, + label='ADSB', + value=" ", + position=(self.options["adsb_x_coord"], + self.options["adsb_y_coord"]), + label_font=fonts.Small, + text_font=fonts.Small)) + + def on_ui_update(self, ui): + current_time = time.time() + if current_time - self.last_scan_time >= self.options['timer']: + self.last_scan_time = current_time + result = self.scan() + ui.set('ADSB', result) + + def scan(self): + logging.info("[ADSB] Scanning for ADS-B signals...") + cmd = "timeout 10s rtl_adsb" + try: + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True) + aircrafts = self.parse_output(output.decode('utf-8')) + return f"{len(aircrafts)} aircrafts detected" + except subprocess.CalledProcessError as e: + if e.returncode == 124: # Graceful handling of the timeout exit status + logging.info("[ADSB] Successfully completed ADS-B scan.") + output = e.output.decode('utf-8') + aircrafts = self.parse_output(output) + return f"{len(aircrafts)} aircrafts detected" + else: + logging.error("[ADSB] Error running rtl_adsb: %s, output: %s", e, e.output.decode('utf-8')) + return "Scan error" + + def parse_output(self, raw_data): + aircrafts = [] + for line in raw_data.split('\n'): + if line.strip(): + aircraft_data = line.split(',') + if len(aircraft_data) >= 2: + hex_id, signal = aircraft_data[0], aircraft_data[1] + aircrafts.append({'hex': hex_id, 'signal_strength': signal}) + self.data[hex_id] = {'last_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'signal_strength': signal} + with open(self.options['aircraft_file'], 'w') as f: + json.dump(self.data, f) + return aircrafts + + def on_unload(self, ui): + with ui._lock: + ui.remove_element('ADSB')