From 0234c63a2af0450d8d665c3465ef4f8f90f6c382 Mon Sep 17 00:00:00 2001
From: AlienMajik <118037572+AlienMajik@users.noreply.github.com>
Date: Mon, 12 May 2025 03:42:40 -0700
Subject: [PATCH] Update skyhigh.py
---
skyhigh.py | 487 +++++++++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 453 insertions(+), 34 deletions(-)
diff --git a/skyhigh.py b/skyhigh.py
index 7072924..1de47b4 100644
--- a/skyhigh.py
+++ b/skyhigh.py
@@ -5,7 +5,7 @@ import time
from datetime import datetime, timedelta
import requests
from threading import Lock
-from flask import render_template_string
+from flask import render_template_string, request, jsonify
import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts
@@ -14,9 +14,9 @@ from pwnagotchi.ui.view import BLACK
class SkyHigh(plugins.Plugin):
__author__ = 'AlienMajik'
- __version__ = '1.0.0'
+ __version__ = '1.0.9'
__license__ = 'GPL3'
- __description__ = 'A plugin that fetches aircraft data from an API using GPS coordinates, logs it, prunes old entries, and provides a webhook with aircraft type and origin country visualization.'
+ __description__ = 'A plugin that fetches aircraft data from the OpenSky API using GPS coordinates, logs it, prunes old entries, and provides a webhook with aircraft type, flight path visualization, and enhanced iconography.'
def __init__(self):
self.options = {
@@ -27,12 +27,18 @@ class SkyHigh(plugins.Plugin):
'latitude': -66.273334, # Default latitude (Flying Saucer)
'longitude': 100.984166, # Default longitude (Flying Saucer)
'radius': 50, # Radius in miles to fetch aircraft data
- 'prune_minutes': 5 # Default pruning interval in minutes
+ 'prune_minutes': 5, # Default pruning interval in minutes
+ 'opensky_username': None, # Optional OpenSky username for authenticated requests
+ 'opensky_password': None # Optional OpenSky password for authenticated requests
}
self.last_fetch_time = 0
self.data = {}
self.data_lock = Lock()
- self.last_gps = {'latitude': None, 'longitude': None} # To store last known GPS coordinates
+ self.last_gps = {'latitude': None, 'longitude': None}
+ self.credentials_valid = True
+ self.flight_path_access = True
+ self.metadata_access = True
+ self.historical_positions = {}
def on_loaded(self):
logging.info("[SkyHigh] Plugin loaded.")
@@ -73,7 +79,17 @@ class SkyHigh(plugins.Plugin):
lat_min, lat_max = lat - (radius / 69), lat + (radius / 69)
lon_min, lon_max = lon - (radius / 69), lon + (radius / 69)
url = f"https://opensky-network.org/api/states/all?lamin={lat_min}&lomin={lon_min}&lamax={lat_max}&lomax={lon_max}"
- response = requests.get(url, timeout=10)
+
+ headers = {}
+ if self.options['opensky_username'] and self.options['opensky_password'] and self.credentials_valid:
+ import base64
+ auth_str = f"{self.options['opensky_username']}:{self.options['opensky_password']}"
+ auth_encoded = base64.b64encode(auth_str.encode()).decode()
+ headers['Authorization'] = f'Basic {auth_encoded}'
+ logging.debug(f"[SkyHigh] Attempting authenticated request to {url}")
+
+ response = requests.get(url, headers=headers if headers else None, timeout=10)
+
if response.status_code == 200:
aircrafts = self.parse_api_response(response.json())
self.prune_old_data()
@@ -82,6 +98,21 @@ class SkyHigh(plugins.Plugin):
json.dump(self.data, f)
logging.debug("[SkyHigh] Fetch completed successfully.")
return f"{len(aircrafts)} aircrafts detected"
+ elif response.status_code == 401:
+ logging.warning("[SkyHigh] Authentication failed for /states/all. Falling back to anonymous access.")
+ self.credentials_valid = False
+ response = requests.get(url, timeout=10)
+ if response.status_code == 200:
+ aircrafts = self.parse_api_response(response.json())
+ self.prune_old_data()
+ with self.data_lock:
+ with open(self.options['aircraft_file'], 'w') as f:
+ json.dump(self.data, f)
+ logging.debug("[SkyHigh] Fetch completed successfully (anonymous).")
+ return f"{len(aircrafts)} aircrafts detected"
+ else:
+ logging.error("[SkyHigh] Anonymous fetch failed with status code %d", response.status_code)
+ return "Fetch error"
else:
logging.error("[SkyHigh] API returned status code %d", response.status_code)
return "Fetch error"
@@ -92,20 +123,156 @@ class SkyHigh(plugins.Plugin):
def fetch_aircraft_metadata(self, icao24):
"""Fetch metadata for a specific aircraft using its ICAO24 code."""
try:
+ if not self.metadata_access:
+ logging.warning(f"[SkyHigh] Metadata access unavailable for {icao24}. Attempting anonymous access.")
+ response = requests.get(f"https://opensky-network.org/api/metadata/aircraft/icao/{icao24}", timeout=5)
+ if response.status_code == 200:
+ data = response.json()
+ manufacturer = data.get('manufacturerName', '') or ''
+ model = data.get('model', 'Unknown') or 'Unknown'
+ registration = data.get('registration', 'Unknown') or 'Unknown'
+ db_flags = data.get('special_flags', []) or []
+ typecode = data.get('typecode', '') or ''
+
+ # Safely handle None values before calling lower()
+ manufacturer_lower = manufacturer.lower() if isinstance(manufacturer, str) else ''
+ model_lower = model.lower() if isinstance(model, str) else ''
+ typecode_lower = typecode.lower() if isinstance(typecode, str) else ''
+ db_flags_lower = ', '.join([flag.lower() for flag in db_flags if isinstance(flag, str)])
+
+ # Categorization based on manufacturer, model, typecode, and DB flags
+ is_helicopter = 'helicopter' in model_lower
+ is_commercial_jet = (
+ any(jet in manufacturer_lower for jet in ['airbus', 'boeing', 'embraer', 'bombardier']) or
+ any(model_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) or
+ any(typecode_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj'])
+ )
+ is_small_plane = (
+ any(small in manufacturer_lower for small in ['cessna', 'piper', 'beechcraft', 'cirrus']) or
+ any(model_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) or
+ any(typecode_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr'])
+ )
+ is_drone = 'drone' in model_lower or 'uav' in model_lower
+ is_glider = 'glider' in model_lower
+ is_military = 'military' in db_flags_lower
+
+ logging.debug(f"[SkyHigh] Metadata for {icao24}: manufacturer={manufacturer}, model={model}, typecode={typecode}, db_flags={db_flags_lower}, is_helicopter={is_helicopter}, is_commercial_jet={is_commercial_jet}, is_small_plane={is_small_plane}, is_drone={is_drone}, is_glider={is_glider}, is_military={is_military}")
+ return {
+ 'model': model,
+ 'registration': registration,
+ 'db_flags': db_flags_lower,
+ 'is_helicopter': is_helicopter,
+ 'is_commercial_jet': is_commercial_jet,
+ 'is_small_plane': is_small_plane,
+ 'is_drone': is_drone,
+ 'is_glider': is_glider,
+ 'is_military': is_military
+ }
+ else:
+ logging.error(f"[SkyHigh] Anonymous metadata fetch failed for {icao24}: {response.status_code}")
+ return None
+
url = f"https://opensky-network.org/api/metadata/aircraft/icao/{icao24}"
- response = requests.get(url, timeout=5)
+ auth = None
+ if self.options['opensky_username'] and self.options['opensky_password'] and self.credentials_valid:
+ auth = (self.options['opensky_username'], self.options['opensky_password'])
+ logging.debug(f"[SkyHigh] Using OpenSky credentials: username={self.options['opensky_username']}")
+ else:
+ logging.warning("[SkyHigh] OpenSky credentials not provided or invalid. Attempting anonymous metadata fetch.")
+
+ response = requests.get(url, auth=auth, timeout=5)
if response.status_code == 200:
data = response.json()
- model = data.get('model', 'Unknown')
- origin_country = data.get('registered', 'Unknown')
- db_flags = ', '.join(data.get('special_flags', [])) # DB flags like military, PIA, LAD
- is_helicopter = 'helicopter' in model.lower()
+ manufacturer = data.get('manufacturerName', '') or ''
+ model = data.get('model', 'Unknown') or 'Unknown'
+ registration = data.get('registration', 'Unknown') or 'Unknown'
+ db_flags = data.get('special_flags', []) or []
+ typecode = data.get('typecode', '') or ''
+
+ # Safely handle None values before calling lower()
+ manufacturer_lower = manufacturer.lower() if isinstance(manufacturer, str) else ''
+ model_lower = model.lower() if isinstance(model, str) else ''
+ typecode_lower = typecode.lower() if isinstance(typecode, str) else ''
+ db_flags_lower = ', '.join([flag.lower() for flag in db_flags if isinstance(flag, str)])
+
+ # Categorization based on manufacturer, model, typecode, and DB flags
+ is_helicopter = 'helicopter' in model_lower
+ is_commercial_jet = (
+ any(jet in manufacturer_lower for jet in ['airbus', 'boeing', 'embraer', 'bombardier']) or
+ any(model_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) or
+ any(typecode_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj'])
+ )
+ is_small_plane = (
+ any(small in manufacturer_lower for small in ['cessna', 'piper', 'beechcraft', 'cirrus']) or
+ any(model_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) or
+ any(typecode_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr'])
+ )
+ is_drone = 'drone' in model_lower or 'uav' in model_lower
+ is_glider = 'glider' in model_lower
+ is_military = 'military' in db_flags_lower
+
+ logging.debug(f"[SkyHigh] Metadata for {icao24}: manufacturer={manufacturer}, model={model}, typecode={typecode}, db_flags={db_flags_lower}, is_helicopter={is_helicopter}, is_commercial_jet={is_commercial_jet}, is_small_plane={is_small_plane}, is_drone={is_drone}, is_glider={is_glider}, is_military={is_military}")
return {
'model': model,
- 'origin_country': origin_country,
- 'db_flags': db_flags,
- 'is_helicopter': is_helicopter
+ 'registration': registration,
+ 'db_flags': db_flags_lower,
+ 'is_helicopter': is_helicopter,
+ 'is_commercial_jet': is_commercial_jet,
+ 'is_small_plane': is_small_plane,
+ 'is_drone': is_drone,
+ 'is_glider': is_glider,
+ 'is_military': is_military
}
+ elif response.status_code == 401:
+ self.credentials_valid = False
+ self.metadata_access = False
+ logging.warning("[SkyHigh] Invalid OpenSky credentials or insufficient permissions for metadata fetch. Falling back to anonymous access.")
+ response = requests.get(url, timeout=5)
+ if response.status_code == 200:
+ data = response.json()
+ manufacturer = data.get('manufacturerName', '') or ''
+ model = data.get('model', 'Unknown') or 'Unknown'
+ registration = data.get('registration', 'Unknown') or 'Unknown'
+ db_flags = data.get('special_flags', []) or []
+ typecode = data.get('typecode', '') or ''
+
+ # Safely handle None values before calling lower()
+ manufacturer_lower = manufacturer.lower() if isinstance(manufacturer, str) else ''
+ model_lower = model.lower() if isinstance(model, str) else ''
+ typecode_lower = typecode.lower() if isinstance(typecode, str) else ''
+ db_flags_lower = ', '.join([flag.lower() for flag in db_flags if isinstance(flag, str)])
+
+ # Categorization based on manufacturer, model, typecode, and DB flags
+ is_helicopter = 'helicopter' in model_lower
+ is_commercial_jet = (
+ any(jet in manufacturer_lower for jet in ['airbus', 'boeing', 'embraer', 'bombardier']) or
+ any(model_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj']) or
+ any(typecode_lower.startswith(prefix) for prefix in ['a', 'b', 'e', 'crj', 'erj'])
+ )
+ is_small_plane = (
+ any(small in manufacturer_lower for small in ['cessna', 'piper', 'beechcraft', 'cirrus']) or
+ any(model_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr']) or
+ any(typecode_lower.startswith(prefix) for prefix in ['c', 'p', 'be', 'sr'])
+ )
+ is_drone = 'drone' in model_lower or 'uav' in model_lower
+ is_glider = 'glider' in model_lower
+ is_military = 'military' in db_flags_lower
+
+ logging.debug(f"[SkyHigh] Metadata for {icao24}: manufacturer={manufacturer}, model={model}, typecode={typecode}, db_flags={db_flags_lower}, is_helicopter={is_helicopter}, is_commercial_jet={is_commercial_jet}, is_small_plane={is_small_plane}, is_drone={is_drone}, is_glider={is_glider}, is_military={is_military}")
+ return {
+ 'model': model,
+ 'registration': registration,
+ 'db_flags': db_flags_lower,
+ 'is_helicopter': is_helicopter,
+ 'is_commercial_jet': is_commercial_jet,
+ 'is_small_plane': is_small_plane,
+ 'is_drone': is_drone,
+ 'is_glider': is_glider,
+ 'is_military': is_military
+ }
+ else:
+ logging.error(f"[SkyHigh] Anonymous metadata fetch failed for {icao24}: {response.status_code}")
+ return None
else:
logging.warning(f"[SkyHigh] Failed to fetch metadata for {icao24}: {response.status_code}")
return None
@@ -113,28 +280,132 @@ class SkyHigh(plugins.Plugin):
logging.error(f"[SkyHigh] Error fetching metadata for {icao24}: {e}")
return None
+ def fetch_flight_path(self, icao24):
+ """Fetch flight path data for a specific aircraft."""
+ try:
+ if not self.credentials_valid:
+ return {'error': 'OpenSky credentials invalid or insufficient permissions. Falling back to historical positions.'}
+
+ if not self.flight_path_access:
+ return {'error': 'Flight path access unavailable. Your OpenSky account lacks permissions for flight tracks. Using historical positions instead.'}
+
+ time_ranges = [3600, 7200, 14400]
+ headers = {}
+ if self.options['opensky_username'] and self.options['opensky_password']:
+ import base64
+ auth_str = f"{self.options['opensky_username']}:{self.options['opensky_password']}"
+ auth_encoded = base64.b64encode(auth_str.encode()).decode()
+ headers['Authorization'] = f'Basic {auth_encoded}'
+ logging.debug(f"[SkyHigh] Using OpenSky credentials: username={self.options['opensky_username']}")
+ else:
+ return {'error': 'Authentication credentials not provided'}
+
+ for time_range in time_ranges:
+ current_time = int(time.time())
+ url = f"https://opensky-network.org/api/tracks/all?icao24={icao24}&time={current_time - time_range}"
+ response = requests.get(url, headers=headers, timeout=5)
+
+ if response.status_code == 200:
+ data = response.json()
+ if 'path' in data and data['path']:
+ return data
+ elif response.status_code == 404:
+ logging.debug(f"[SkyHigh] No flight path data for {icao24} in the last {time_range} seconds")
+ continue
+ elif response.status_code == 401:
+ self.flight_path_access = False
+ return {'error': 'Flight path access unavailable. Your OpenSky account lacks permissions for flight tracks. Using historical positions instead.'}
+ else:
+ logging.warning(f"[SkyHigh] Failed to fetch flight path for {icao24}: {response.status_code}")
+ continue
+
+ return {'error': 'No recent flight path data available'}
+ except requests.exceptions.RequestException as e:
+ logging.error(f"[SkyHigh] Error fetching flight path for {icao24}: {e}")
+ return {'error': 'Network error'}
+
+ def get_historical_path(self, icao24):
+ """Retrieve historical positions for an aircraft to approximate its flight path."""
+ with self.data_lock:
+ if icao24 in self.historical_positions and len(self.historical_positions[icao24]) > 1:
+ path = []
+ for pos in self.historical_positions[icao24]:
+ path.append([int(pos['timestamp']), pos['latitude'], pos['longitude'], pos['baro_altitude'], pos['true_track'], False])
+ return {'path': path}
+ return {'error': 'Insufficient historical data to plot flight path'}
+
def parse_api_response(self, api_data):
aircrafts = []
- if 'states' in api_data:
+ if 'states' in api_data and api_data['states'] is not None:
for state in api_data['states']:
icao24 = state[0]
callsign = state[1].strip() if state[1] else "Unknown"
- latitude = state[6]
- longitude = state[5]
- altitude = state[7]
+ latitude = state[6] if state[6] is not None else 'N/A'
+ longitude = state[5] if state[5] is not None else 'N/A'
+ baro_altitude = state[7] if state[7] is not None else 'N/A'
+ geo_altitude = state[13] if state[13] is not None else 'N/A'
+ velocity = state[9] if state[9] is not None else 'N/A'
+ true_track = state[10] if state[10] is not None else 'N/A'
+ vertical_rate = state[11] if state[11] is not None else 'N/A'
+ on_ground = state[8]
+ squawk = state[14] if state[14] is not None else 'N/A'
+ timestamp = state[4] if state[4] is not None else int(time.time())
+
with self.data_lock:
+ if latitude != 'N/A' and longitude != 'N/A':
+ position = {
+ 'timestamp': timestamp,
+ 'latitude': latitude,
+ 'longitude': longitude,
+ 'baro_altitude': baro_altitude,
+ 'true_track': true_track
+ }
+ if icao24 not in self.historical_positions:
+ self.historical_positions[icao24] = []
+ self.historical_positions[icao24].append(position)
+ self.historical_positions[icao24] = self.historical_positions[icao24][-10:]
+
if icao24 not in self.data or 'model' not in self.data[icao24]:
metadata = self.fetch_aircraft_metadata(icao24)
if metadata:
- self.data[icao24] = metadata
+ if 'error' in metadata:
+ self.data[icao24] = {
+ 'model': 'Unknown',
+ 'registration': 'Unknown',
+ 'db_flags': '',
+ 'is_helicopter': False,
+ 'is_commercial_jet': False,
+ 'is_small_plane': False,
+ 'is_drone': False,
+ 'is_glider': False,
+ 'is_military': False
+ }
+ else:
+ self.data[icao24] = metadata
else:
- self.data[icao24] = {'model': 'Unknown', 'origin_country': 'Unknown', 'db_flags': '', 'is_helicopter': False}
- time.sleep(0.1) # Small delay to avoid rate limits
+ self.data[icao24] = {
+ 'model': 'Unknown',
+ 'registration': 'Unknown',
+ 'db_flags': '',
+ 'is_helicopter': False,
+ 'is_commercial_jet': False,
+ 'is_small_plane': False,
+ 'is_drone': False,
+ 'is_glider': False,
+ 'is_military': False
+ }
+ time.sleep(0.1)
self.data[icao24].update({
'callsign': callsign,
'latitude': latitude,
'longitude': longitude,
- 'altitude': altitude,
+ 'baro_altitude': baro_altitude,
+ 'geo_altitude': geo_altitude,
+ 'velocity': velocity,
+ 'true_track': true_track,
+ 'vertical_rate': vertical_rate,
+ 'on_ground': on_ground,
+ 'squawk': squawk,
'last_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
aircrafts.append({
@@ -142,7 +413,22 @@ class SkyHigh(plugins.Plugin):
'callsign': callsign,
'latitude': latitude,
'longitude': longitude,
- 'altitude': altitude
+ 'baro_altitude': baro_altitude,
+ 'geo_altitude': geo_altitude,
+ 'velocity': velocity,
+ 'true_track': true_track,
+ 'vertical_rate': vertical_rate,
+ 'on_ground': on_ground,
+ 'squawk': squawk,
+ 'is_helicopter': self.data[icao24].get('is_helicopter', False),
+ 'is_commercial_jet': self.data[icao24].get('is_commercial_jet', False),
+ 'is_small_plane': self.data[icao24].get('is_small_plane', False),
+ 'is_drone': self.data[icao24].get('is_drone', False),
+ 'is_glider': self.data[icao24].get('is_glider', False),
+ 'is_military': self.data[icao24].get('is_military', False),
+ 'model': self.data[icao24].get('model', 'Unknown'),
+ 'registration': self.data[icao24].get('registration', 'Unknown'),
+ 'db_flags': self.data[icao24].get('db_flags', '')
})
return aircrafts
@@ -161,6 +447,8 @@ class SkyHigh(plugins.Plugin):
keys_to_remove.append(icao24)
for key in keys_to_remove:
del self.data[key]
+ if key in self.historical_positions:
+ del self.historical_positions[key]
logging.debug(f"[SkyHigh] Pruned {len(keys_to_remove)} old aircraft entries.")
def on_webhook(self, path, request):
@@ -177,6 +465,12 @@ class SkyHigh(plugins.Plugin):
aircrafts = []
center = [self.options['latitude'], self.options['longitude']]
return render_template_string(HTML_TEMPLATE, aircrafts=aircrafts, center=center)
+ elif path.startswith('flightpath/'):
+ icao24 = path.split('/')[-1]
+ flight_path_data = self.fetch_flight_path(icao24)
+ if 'error' in flight_path_data and 'Using historical positions' in flight_path_data['error']:
+ flight_path_data = self.get_historical_path(icao24)
+ return jsonify(flight_path_data)
return "Not found", 404
def on_unload(self, ui):
@@ -204,11 +498,18 @@ HTML_TEMPLATE = '''
@@ -216,11 +517,18 @@ HTML_TEMPLATE = '''
{% for aircraft in aircrafts %}
Callsign
- DB Flags
- Type
+ Model
+ Registration
Latitude
Longitude
- Altitude
+ Baro Altitude
+ Geo Altitude
+ Velocity
+ True Track
+ Vertical Rate
+ On Ground
+ Squawk
+ DB Flags
Last Seen