import logging import os import json import time from datetime import datetime, timedelta import requests from threading import Lock from flask import render_template_string, request, jsonify import pwnagotchi.plugins as plugins import pwnagotchi.ui.fonts as fonts from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK class SkyHigh(plugins.Plugin): __author__ = 'AlienMajik' __version__ = '1.0.9' __license__ = 'GPL3' __description__ = 'A plugin that fetches aircraft data from the OpenSky API using GPS coordinates, logs it, prunes old entries, and provides a webhook with aircraft type, flight path visualization, and enhanced iconography.' def __init__(self): self.options = { 'timer': 60, # Time interval in seconds for fetching new aircraft data 'aircraft_file': '/root/handshakes/skyhigh_aircraft.json', # File to store detected aircraft information 'adsb_x_coord': 160, 'adsb_y_coord': 80, 'latitude': -66.273334, # Default latitude (Flying Saucer) 'longitude': 100.984166, # Default longitude (Flying Saucer) 'radius': 50, # Radius in miles to fetch aircraft data 'prune_minutes': 5, # Default pruning interval in minutes 'opensky_username': None, # Optional OpenSky username for authenticated requests 'opensky_password': None # Optional OpenSky password for authenticated requests } self.last_fetch_time = 0 self.data = {} self.data_lock = Lock() self.last_gps = {'latitude': None, 'longitude': None} self.credentials_valid = True self.flight_path_access = True self.metadata_access = True self.historical_positions = {} def on_loaded(self): logging.info("[SkyHigh] Plugin loaded.") if not os.path.exists(os.path.dirname(self.options['aircraft_file'])): os.makedirs(os.path.dirname(self.options['aircraft_file'])) if not os.path.exists(self.options['aircraft_file']): with open(self.options['aircraft_file'], 'w') as f: json.dump({}, f) with open(self.options['aircraft_file'], 'r') as f: self.data = json.load(f) def on_ui_setup(self, ui): ui.add_element('SkyHigh', LabeledValue(color=BLACK, label='SkyHigh', value=" ", position=(self.options["adsb_x_coord"], self.options["adsb_y_coord"]), label_font=fonts.Small, text_font=fonts.Small)) def on_ui_update(self, ui): current_time = time.time() if current_time - self.last_fetch_time >= self.options['timer']: ui.set('SkyHigh', "Updating...") self.last_fetch_time = current_time result = self.fetch_aircraft_data() ui.set('SkyHigh', result) else: with self.data_lock: aircraft_count = len(self.data) minutes_ago = int((current_time - self.last_fetch_time) / 60) ui.set('SkyHigh', f"{aircraft_count} aircrafts (Last: {minutes_ago}m)") def fetch_aircraft_data(self): logging.debug("[SkyHigh] Fetching aircraft data from API...") try: lat, lon, radius = self.options['latitude'], self.options['longitude'], self.options['radius'] lat_min, lat_max = lat - (radius / 69), lat + (radius / 69) lon_min, lon_max = lon - (radius / 69), lon + (radius / 69) url = f"https://opensky-network.org/api/states/all?lamin={lat_min}&lomin={lon_min}&lamax={lat_max}&lomax={lon_max}" headers = {} if self.options['opensky_username'] and self.options['opensky_password'] and self.credentials_valid: import base64 auth_str = f"{self.options['opensky_username']}:{self.options['opensky_password']}" auth_encoded = base64.b64encode(auth_str.encode()).decode() headers['Authorization'] = f'Basic {auth_encoded}' logging.debug(f"[SkyHigh] Attempting authenticated request to {url}") response = requests.get(url, headers=headers if headers else None, timeout=10) if response.status_code == 200: aircrafts = self.parse_api_response(response.json()) self.prune_old_data() with self.data_lock: with open(self.options['aircraft_file'], 'w') as f: json.dump(self.data, f) logging.debug("[SkyHigh] Fetch completed successfully.") return f"{len(aircrafts)} aircrafts detected" elif response.status_code == 401: logging.warning("[SkyHigh] Authentication failed for /states/all. Falling back to anonymous access.") self.credentials_valid = False response = requests.get(url, timeout=10) if response.status_code == 200: aircrafts = self.parse_api_response(response.json()) self.prune_old_data() with self.data_lock: with open(self.options['aircraft_file'], 'w') as f: json.dump(self.data, f) logging.debug("[SkyHigh] Fetch completed successfully (anonymous).") return f"{len(aircrafts)} aircrafts detected" else: logging.error("[SkyHigh] Anonymous fetch failed with status code %d", response.status_code) return "Fetch error" else: logging.error("[SkyHigh] API returned status code %d", response.status_code) return "Fetch error" except requests.exceptions.RequestException as e: logging.error("[SkyHigh] Error fetching data from API: %s", e) return "Fetch failed" def fetch_aircraft_metadata(self, icao24): """Fetch metadata for a specific aircraft using its ICAO24 code.""" try: if not self.metadata_access: logging.warning(f"[SkyHigh] Metadata access unavailable for {icao24}. Attempting anonymous access.") response = requests.get(f"https://opensky-network.org/api/metadata/aircraft/icao/{icao24}", timeout=5) if response.status_code == 200: data = response.json() manufacturer = data.get('manufacturerName', '') or '' model = data.get('model', 'Unknown') or 'Unknown' registration = data.get('registration', 'Unknown') or 'Unknown' db_flags = data.get('special_flags', []) or [] typecode = data.get('typecode', '') or '' # Safely handle None values before calling lower() manufacturer_lower = manufacturer.lower() if isinstance(manufacturer, str) else '' model_lower = model.lower() if isinstance(model, str) else '' typecode_lower = typecode.lower() if isinstance(typecode, str) else '' db_flags_lower = ', '.join([flag.lower() for flag in db_flags if isinstance(flag, str)]) # Categorization based on manufacturer, model, typecode, and DB flags is_helicopter = 'helicopter' in model_lower is_commercial_jet = ( any(jet in manufacturer_lower for jet in ['airbus', 'boeing', 'embraer', 'bombardier']) or any(model_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) or any(typecode_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) ) is_small_plane = ( any(small in manufacturer_lower for small in ['cessna', 'piper', 'beechcraft', 'cirrus']) or any(model_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) or any(typecode_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) ) is_drone = 'drone' in model_lower or 'uav' in model_lower is_glider = 'glider' in model_lower is_military = 'military' in db_flags_lower logging.debug(f"[SkyHigh] Metadata for {icao24}: manufacturer={manufacturer}, model={model}, typecode={typecode}, db_flags={db_flags_lower}, is_helicopter={is_helicopter}, is_commercial_jet={is_commercial_jet}, is_small_plane={is_small_plane}, is_drone={is_drone}, is_glider={is_glider}, is_military={is_military}") return { 'model': model, 'registration': registration, 'db_flags': db_flags_lower, 'is_helicopter': is_helicopter, 'is_commercial_jet': is_commercial_jet, 'is_small_plane': is_small_plane, 'is_drone': is_drone, 'is_glider': is_glider, 'is_military': is_military } else: logging.error(f"[SkyHigh] Anonymous metadata fetch failed for {icao24}: {response.status_code}") return None url = f"https://opensky-network.org/api/metadata/aircraft/icao/{icao24}" auth = None if self.options['opensky_username'] and self.options['opensky_password'] and self.credentials_valid: auth = (self.options['opensky_username'], self.options['opensky_password']) logging.debug(f"[SkyHigh] Using OpenSky credentials: username={self.options['opensky_username']}") else: logging.warning("[SkyHigh] OpenSky credentials not provided or invalid. Attempting anonymous metadata fetch.") response = requests.get(url, auth=auth, timeout=5) if response.status_code == 200: data = response.json() manufacturer = data.get('manufacturerName', '') or '' model = data.get('model', 'Unknown') or 'Unknown' registration = data.get('registration', 'Unknown') or 'Unknown' db_flags = data.get('special_flags', []) or [] typecode = data.get('typecode', '') or '' # Safely handle None values before calling lower() manufacturer_lower = manufacturer.lower() if isinstance(manufacturer, str) else '' model_lower = model.lower() if isinstance(model, str) else '' typecode_lower = typecode.lower() if isinstance(typecode, str) else '' db_flags_lower = ', '.join([flag.lower() for flag in db_flags if isinstance(flag, str)]) # Categorization based on manufacturer, model, typecode, and DB flags is_helicopter = 'helicopter' in model_lower is_commercial_jet = ( any(jet in manufacturer_lower for jet in ['airbus', 'boeing', 'embraer', 'bombardier']) or any(model_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) or any(typecode_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) ) is_small_plane = ( any(small in manufacturer_lower for small in ['cessna', 'piper', 'beechcraft', 'cirrus']) or any(model_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) or any(typecode_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) ) is_drone = 'drone' in model_lower or 'uav' in model_lower is_glider = 'glider' in model_lower is_military = 'military' in db_flags_lower logging.debug(f"[SkyHigh] Metadata for {icao24}: manufacturer={manufacturer}, model={model}, typecode={typecode}, db_flags={db_flags_lower}, is_helicopter={is_helicopter}, is_commercial_jet={is_commercial_jet}, is_small_plane={is_small_plane}, is_drone={is_drone}, is_glider={is_glider}, is_military={is_military}") return { 'model': model, 'registration': registration, 'db_flags': db_flags_lower, 'is_helicopter': is_helicopter, 'is_commercial_jet': is_commercial_jet, 'is_small_plane': is_small_plane, 'is_drone': is_drone, 'is_glider': is_glider, 'is_military': is_military } elif response.status_code == 401: self.credentials_valid = False self.metadata_access = False logging.warning("[SkyHigh] Invalid OpenSky credentials or insufficient permissions for metadata fetch. Falling back to anonymous access.") response = requests.get(url, timeout=5) if response.status_code == 200: data = response.json() manufacturer = data.get('manufacturerName', '') or '' model = data.get('model', 'Unknown') or 'Unknown' registration = data.get('registration', 'Unknown') or 'Unknown' db_flags = data.get('special_flags', []) or [] typecode = data.get('typecode', '') or '' # Safely handle None values before calling lower() manufacturer_lower = manufacturer.lower() if isinstance(manufacturer, str) else '' model_lower = model.lower() if isinstance(model, str) else '' typecode_lower = typecode.lower() if isinstance(typecode, str) else '' db_flags_lower = ', '.join([flag.lower() for flag in db_flags if isinstance(flag, str)]) # Categorization based on manufacturer, model, typecode, and DB flags is_helicopter = 'helicopter' in model_lower is_commercial_jet = ( any(jet in manufacturer_lower for jet in ['airbus', 'boeing', 'embraer', 'bombardier']) or any(model_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) or any(typecode_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) ) is_small_plane = ( any(small in manufacturer_lower for small in ['cessna', 'piper', 'beechcraft', 'cirrus']) or any(model_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) or any(typecode_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) ) is_drone = 'drone' in model_lower or 'uav' in model_lower is_glider = 'glider' in model_lower is_military = 'military' in db_flags_lower logging.debug(f"[SkyHigh] Metadata for {icao24}: manufacturer={manufacturer}, model={model}, typecode={typecode}, db_flags={db_flags_lower}, is_helicopter={is_helicopter}, is_commercial_jet={is_commercial_jet}, is_small_plane={is_small_plane}, is_drone={is_drone}, is_glider={is_glider}, is_military={is_military}") return { 'model': model, 'registration': registration, 'db_flags': db_flags_lower, 'is_helicopter': is_helicopter, 'is_commercial_jet': is_commercial_jet, 'is_small_plane': is_small_plane, 'is_drone': is_drone, 'is_glider': is_glider, 'is_military': is_military } else: logging.error(f"[SkyHigh] Anonymous metadata fetch failed for {icao24}: {response.status_code}") return None else: logging.warning(f"[SkyHigh] Failed to fetch metadata for {icao24}: {response.status_code}") return None except requests.exceptions.RequestException as e: logging.error(f"[SkyHigh] Error fetching metadata for {icao24}: {e}") return None def fetch_flight_path(self, icao24): """Fetch flight path data for a specific aircraft.""" try: if not self.credentials_valid: return {'error': 'OpenSky credentials invalid or insufficient permissions. Falling back to historical positions.'} if not self.flight_path_access: return {'error': 'Flight path access unavailable. Your OpenSky account lacks permissions for flight tracks. Using historical positions instead.'} time_ranges = [3600, 7200, 14400] headers = {} if self.options['opensky_username'] and self.options['opensky_password']: import base64 auth_str = f"{self.options['opensky_username']}:{self.options['opensky_password']}" auth_encoded = base64.b64encode(auth_str.encode()).decode() headers['Authorization'] = f'Basic {auth_encoded}' logging.debug(f"[SkyHigh] Using OpenSky credentials: username={self.options['opensky_username']}") else: return {'error': 'Authentication credentials not provided'} for time_range in time_ranges: current_time = int(time.time()) url = f"https://opensky-network.org/api/tracks/all?icao24={icao24}&time={current_time - time_range}" response = requests.get(url, headers=headers, timeout=5) if response.status_code == 200: data = response.json() if 'path' in data and data['path']: return data elif response.status_code == 404: logging.debug(f"[SkyHigh] No flight path data for {icao24} in the last {time_range} seconds") continue elif response.status_code == 401: self.flight_path_access = False return {'error': 'Flight path access unavailable. Your OpenSky account lacks permissions for flight tracks. Using historical positions instead.'} else: logging.warning(f"[SkyHigh] Failed to fetch flight path for {icao24}: {response.status_code}") continue return {'error': 'No recent flight path data available'} except requests.exceptions.RequestException as e: logging.error(f"[SkyHigh] Error fetching flight path for {icao24}: {e}") return {'error': 'Network error'} def get_historical_path(self, icao24): """Retrieve historical positions for an aircraft to approximate its flight path.""" with self.data_lock: if icao24 in self.historical_positions and len(self.historical_positions[icao24]) > 1: path = [] for pos in self.historical_positions[icao24]: path.append([int(pos['timestamp']), pos['latitude'], pos['longitude'], pos['baro_altitude'], pos['true_track'], False]) return {'path': path} return {'error': 'Insufficient historical data to plot flight path'} def parse_api_response(self, api_data): aircrafts = [] if 'states' in api_data and api_data['states'] is not None: for state in api_data['states']: icao24 = state[0] callsign = state[1].strip() if state[1] else "Unknown" latitude = state[6] if state[6] is not None else 'N/A' longitude = state[5] if state[5] is not None else 'N/A' baro_altitude = state[7] if state[7] is not None else 'N/A' geo_altitude = state[13] if state[13] is not None else 'N/A' velocity = state[9] if state[9] is not None else 'N/A' true_track = state[10] if state[10] is not None else 'N/A' vertical_rate = state[11] if state[11] is not None else 'N/A' on_ground = state[8] squawk = state[14] if state[14] is not None else 'N/A' timestamp = state[4] if state[4] is not None else int(time.time()) with self.data_lock: if latitude != 'N/A' and longitude != 'N/A': position = { 'timestamp': timestamp, 'latitude': latitude, 'longitude': longitude, 'baro_altitude': baro_altitude, 'true_track': true_track } if icao24 not in self.historical_positions: self.historical_positions[icao24] = [] self.historical_positions[icao24].append(position) self.historical_positions[icao24] = self.historical_positions[icao24][-10:] if icao24 not in self.data or 'model' not in self.data[icao24]: metadata = self.fetch_aircraft_metadata(icao24) if metadata: if 'error' in metadata: self.data[icao24] = { 'model': 'Unknown', 'registration': 'Unknown', 'db_flags': '', 'is_helicopter': False, 'is_commercial_jet': False, 'is_small_plane': False, 'is_drone': False, 'is_glider': False, 'is_military': False } else: self.data[icao24] = metadata else: self.data[icao24] = { 'model': 'Unknown', 'registration': 'Unknown', 'db_flags': '', 'is_helicopter': False, 'is_commercial_jet': False, 'is_small_plane': False, 'is_drone': False, 'is_glider': False, 'is_military': False } time.sleep(0.1) self.data[icao24].update({ 'callsign': callsign, 'latitude': latitude, 'longitude': longitude, 'baro_altitude': baro_altitude, 'geo_altitude': geo_altitude, 'velocity': velocity, 'true_track': true_track, 'vertical_rate': vertical_rate, 'on_ground': on_ground, 'squawk': squawk, 'last_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) aircrafts.append({ 'icao24': icao24, 'callsign': callsign, 'latitude': latitude, 'longitude': longitude, 'baro_altitude': baro_altitude, 'geo_altitude': geo_altitude, 'velocity': velocity, 'true_track': true_track, 'vertical_rate': vertical_rate, 'on_ground': on_ground, 'squawk': squawk, 'is_helicopter': self.data[icao24].get('is_helicopter', False), 'is_commercial_jet': self.data[icao24].get('is_commercial_jet', False), 'is_small_plane': self.data[icao24].get('is_small_plane', False), 'is_drone': self.data[icao24].get('is_drone', False), 'is_glider': self.data[icao24].get('is_glider', False), 'is_military': self.data[icao24].get('is_military', False), 'model': self.data[icao24].get('model', 'Unknown'), 'registration': self.data[icao24].get('registration', 'Unknown'), 'db_flags': self.data[icao24].get('db_flags', '') }) return aircrafts def prune_old_data(self): """Remove aircraft entries older than the configured prune_minutes interval.""" prune_minutes = self.options.get('prune_minutes', 0) if prune_minutes <= 0: return now = datetime.now() cutoff = now - timedelta(minutes=prune_minutes) keys_to_remove = [] with self.data_lock: for icao24, info in self.data.items(): last_seen = datetime.strptime(info['last_seen'], '%Y-%m-%d %H:%M:%S') if last_seen < cutoff: keys_to_remove.append(icao24) for key in keys_to_remove: del self.data[key] if key in self.historical_positions: del self.historical_positions[key] logging.debug(f"[SkyHigh] Pruned {len(keys_to_remove)} old aircraft entries.") def on_webhook(self, path, request): if request.method == 'GET': if path == '/' or not path: try: with open(self.options['aircraft_file'], 'r') as f: aircraft_dict = json.load(f) aircrafts = list(aircraft_dict.values()) center = [self.last_gps['latitude'] or self.options['latitude'], self.last_gps['longitude'] or self.options['longitude']] return render_template_string(HTML_TEMPLATE, aircrafts=aircrafts, center=center) except (FileNotFoundError, json.JSONDecodeError): aircrafts = [] center = [self.options['latitude'], self.options['longitude']] return render_template_string(HTML_TEMPLATE, aircrafts=aircrafts, center=center) elif path.startswith('flightpath/'): icao24 = path.split('/')[-1] flight_path_data = self.fetch_flight_path(icao24) if 'error' in flight_path_data and 'Using historical positions' in flight_path_data['error']: flight_path_data = self.get_historical_path(icao24) return jsonify(flight_path_data) return "Not found", 404 def on_unload(self, ui): with ui._lock: ui.remove_element('SkyHigh') HTML_TEMPLATE = ''' SkyHigh - Aircraft Map

Aircraft Detected by SkyHigh

{% for aircraft in aircrafts %} {% endfor %}
Callsign Model Registration Latitude Longitude Baro Altitude Geo Altitude Velocity True Track Vertical Rate On Ground Squawk DB Flags Last Seen
{{ aircraft.callsign }} {{ aircraft.model }} {{ aircraft.registration }} {{ aircraft.latitude }} {{ aircraft.longitude }} {{ aircraft.baro_altitude }} {{ aircraft.geo_altitude }} {{ aircraft.velocity }} {{ aircraft.true_track }} {{ aircraft.vertical_rate }} {{ 'Yes' if aircraft.on_ground else 'No' }} {{ aircraft.squawk }} {{ aircraft.db_flags }} {{ aircraft.last_seen }}
'''