From 28335419f6d69667d98793ae1a77bbc79fba4050 Mon Sep 17 00:00:00 2001 From: jayofelony Date: Sun, 30 Jun 2024 20:07:41 +0200 Subject: [PATCH] MLS is offline Signed-off-by: jayofelony --- pwnagotchi/plugins/default/net-pos.py | 150 -------------------------- 1 file changed, 150 deletions(-) delete mode 100644 pwnagotchi/plugins/default/net-pos.py diff --git a/pwnagotchi/plugins/default/net-pos.py b/pwnagotchi/plugins/default/net-pos.py deleted file mode 100644 index 3f226204..00000000 --- a/pwnagotchi/plugins/default/net-pos.py +++ /dev/null @@ -1,150 +0,0 @@ -import logging -import json -import os -import threading -import requests -import time -import pwnagotchi.plugins as plugins -from pwnagotchi.utils import StatusFile - - -class NetPos(plugins.Plugin): - __author__ = 'zenzen san' - __version__ = '2.0.3' - __license__ = 'GPL3' - __description__ = """Saves a json file with the access points with more signal - whenever a handshake is captured. - When internet is available the files are converted in geo locations - using Mozilla LocationService """ - - API_URL = 'https://location.services.mozilla.com/v1/geolocate?key={api}' - - def __init__(self): - self.report = StatusFile('/root/.net_pos_saved', data_format='json') - self.skip = list() - self.ready = False - self.lock = threading.Lock() - self.options = dict() - - def on_loaded(self): - if 'api_key' not in self.options or ('api_key' in self.options and not self.options['api_key']): - logging.error("NET-POS: api_key isn't set. Can't use mozilla's api.") - return - if 'api_url' in self.options: - self.API_URL = self.options['api_url'] - self.ready = True - logging.info("net-pos plugin loaded.") - logging.debug(f"net-pos: use api_url: {self.API_URL}") - - def _append_saved(self, path): - to_save = list() - if isinstance(path, str): - to_save.append(path) - elif isinstance(path, list): - to_save += path - else: - raise TypeError("Expected list or str, got %s" % type(path)) - - with open('/root/.net_pos_saved', 'a') as saved_file: - for x in to_save: - saved_file.write(x + "\n") - - def on_internet_available(self, agent): - if self.lock.locked(): - return - with self.lock: - if self.ready: - config = agent.config() - display = agent.view() - reported = self.report.data_field_or('reported', default=list()) - handshake_dir = config['bettercap']['handshakes'] - - all_files = os.listdir(handshake_dir) - all_np_files = [os.path.join(handshake_dir, filename) - for filename in all_files - if filename.endswith('.net-pos.json')] - new_np_files = set(all_np_files) - set(reported) - set(self.skip) - - if new_np_files: - logging.debug("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files)) - display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...") - display.update(force=True) - for idx, np_file in enumerate(new_np_files): - - geo_file = np_file.replace('.net-pos.json', '.geo.json') - if os.path.exists(geo_file): - # got already the position - reported.append(np_file) - self.report.update(data={'reported': reported}) - continue - - try: - geo_data = self._get_geo_data(np_file) # returns json obj - except requests.exceptions.RequestException as req_e: - logging.error("NET-POS: %s - RequestException: %s", np_file, req_e) - self.skip += np_file - continue - except json.JSONDecodeError as js_e: - logging.error("NET-POS: %s - JSONDecodeError: %s, removing it...", np_file, js_e) - os.remove(np_file) - continue - except OSError as os_e: - logging.error("NET-POS: %s - OSError: %s", np_file, os_e) - self.skip += np_file - continue - - with open(geo_file, 'w+t') as sf: - json.dump(geo_data, sf) - - reported.append(np_file) - self.report.update(data={'reported': reported}) - - display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})") - display.update(force=True) - - def on_handshake(self, agent, filename, access_point, client_station): - netpos = self._get_netpos(agent) - if not netpos['wifiAccessPoints']: - return - - netpos["ts"] = int("%.0f" % time.time()) - netpos_filename = filename.replace('.pcap', '.net-pos.json') - logging.debug("NET-POS: Saving net-location to %s", netpos_filename) - - try: - with open(netpos_filename, 'w+t') as net_pos_file: - json.dump(netpos, net_pos_file) - except OSError as os_e: - logging.error("NET-POS: %s", os_e) - - def _get_netpos(self, agent): - aps = agent.get_access_points() - netpos = dict() - netpos['wifiAccessPoints'] = list() - # 6 seems a good number to save a wifi networks location - for access_point in sorted(aps, key=lambda i: i['rssi'], reverse=True)[:6]: - netpos['wifiAccessPoints'].append({'macAddress': access_point['mac'], - 'signalStrength': access_point['rssi']}) - return netpos - - def _get_geo_data(self, path, timeout=30): - geourl = self.API_URL.format(api=self.options['api_key']) - - try: - with open(path, "r") as json_file: - data = json.load(json_file) - except json.JSONDecodeError as js_e: - raise js_e - except OSError as os_e: - raise os_e - - try: - result = requests.post(geourl, - json=data, - timeout=timeout) - return_geo = result.json() - if data["ts"]: - return_geo["ts"] = data["ts"] - return return_geo - except requests.exceptions.RequestException as req_e: - raise req_e