diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index ecd0ed06..afced4d3 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -144,7 +144,10 @@ class BTTether(plugins.Plugin): return result.stdout.find(pattern) return result except Exception as exp: - logging.error(f"[BT-Tether] Error with {cmd} : {exp}") + logging.error(f"[BT-Tether] Error with {cmd}") + logging.error(f"[BT-Tether] Exception : {exp}") + logging.error(f"[BT-Tether] STDOUT : {result.stdout}") + logging.error(f"[BT-Tether] STDERR : {result.stderr}") raise exp def bluetoothctl(self, args, pattern=None): @@ -205,6 +208,12 @@ class BTTether(plugins.Plugin): "ipv4.route-metric", "200", ] ) + # Configure Device to autoconnect + self.nmcli([ + "device", "set", f"{self.mac}", + "autoconnect", "yes", + "managed", "yes" + ]) self.nmcli(["connection", "reload"]) self.ready = True logging.info(f"[BT-Tether] Connection {self.phone_name} configured") diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 0ab0368d..a94bc3dc 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -2,14 +2,25 @@ import logging import json import os import re - +import pathlib import pwnagotchi.plugins as plugins -from pwnagotchi.ui.components import LabeledValue -from pwnagotchi.ui.view import BLACK -import pwnagotchi.ui.fonts as fonts +from datetime import datetime, UTC +from threading import Lock + + +def read_ap_cache(cache_dir, file): + cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file)) + cache_filename = os.path.join(cache_dir, cache_filename) + if not os.path.exists(cache_filename): + logging.info("Cache not exist") + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + logging.info(f"Exception {e}") + return None -def get_cache(): - return None class Cache(plugins.Plugin): __author__ = "fmatray" @@ -19,37 +30,87 @@ class Cache(plugins.Plugin): def __init__(self): self.options = dict() + self.ready = False + self.lock = Lock() + + def on_loaded(self): + logging.info("[CACHE] plugin loaded.") def on_config_changed(self, config): - self.handshake_dir = config["bettercap"].get("handshakes") - self.cache_dir = os.path.join(self.handshake_dir, "cache") - if not (os.path.exists(self.cache_dir)): - os.mkdir(self.cache_dir) - - def get_cache(self, file): - cache_filename = os.path.basename( - re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file) - ) - cache_filename = os.path.join(self.cache_dir, cache_filename) - if not os.path.exists(cache_filename): - return None try: - with open(cache_filename, "r") as f: - return json.load(f) - except Exception as e: - return None + handshake_dir = config["bettercap"].get("handshakes") + self.cache_dir = os.path.join(handshake_dir, "cache") + os.makedirs(self.cache_dir, exist_ok=True) + except Exception: + logging.info(f"[CACHE] Cannot access to the cache directory") + return + self.last_clean = datetime.now(tz=UTC) + self.ready = True + logging.info(f"[CACHE] Cache plugin configured") + self.clean_ap_cache() - def cache_ap(self, ap): - mac = ap["mac"].replace(":", "") - hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) - filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") - with open(filename, "w") as f: - json.dump(ap, f) + def on_unload(self, ui): + self.clean_ap_cache() + + def clean_ap_cache(self): + if not self.ready: + return + with self.lock: + ctime = datetime.now(tz=UTC) + cache_to_delete = list() + for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"): + mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) + if (ctime - mtime).total_seconds() > 60 * 5: + cache_to_delete.append(cache_file) + if cache_to_delete: + logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files") + for cache_file in cache_to_delete: + try: + cache_file.unlink() + except Exception as e: + logging.error(f"[CACHE] Cannot delete {cache_file}: {e}") + + def write_ap_cache(self, access_point): + with self.lock: + try: + mac = access_point["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"]) + except KeyError: + return + cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache") + try: + with open(cache_file, "w") as f: + json.dump(access_point, f) + except Exception as e: + logging.error(f"[CACHE] Cannot write {cache_file}: {e}") + pass + + def on_wifi_update(self, agent, access_points): + if self.ready: + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], access_points): + self.write_ap_cache(ap) def on_unfiltered_ap_list(self, agent, aps): - for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): - self.cache_ap(ap) + if self.ready: + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.write_ap_cache(ap) + + def on_association(self, agent, access_point): + if self.ready: + self.write_ap_cache(access_point) + + def on_deauthentication(self, agent, access_point, client_station): + if self.ready: + self.write_ap_cache(access_point) def on_handshake(self, agent, filename, access_point, client_station): - logging.info(f"[WIGLE] on_handshake") - self.cache_ap(access_point) + if self.ready: + self.write_ap_cache(access_point) + + def on_ui_update(self, ui): + if not self.ready: + return + current_time = datetime.now(tz=UTC) + if (current_time - self.last_clean).total_seconds() > 60: + self.clean_ap_cache() + self.last_clean = current_time diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 2b5a367c..5000629d 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -20,6 +20,7 @@ from pwnagotchi.utils import ( remove_whitelisted, ) from pwnagotchi import plugins +from pwnagotchi.plugins.default.cache import read_ap_cache from pwnagotchi._version import __version__ as __pwnagotchi_version__ import pwnagotchi.ui.fonts as fonts @@ -64,7 +65,7 @@ class WigleStatistics: class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" - __version__ = "4.0.0" + __version__ = "4.1.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" LABEL_SPACING = 0 @@ -79,6 +80,9 @@ class Wigle(plugins.Plugin): self.last_stat = datetime.now(tz=UTC) self.ui_counter = 0 + def on_loaded(self): + logging.info("[WIGLE] plugin loaded.") + def on_config_changed(self, config): self.api_key = self.options.get("api_key", None) if not self.api_key: @@ -88,6 +92,7 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") + self.cache_dir = os.path.join(self.handshake_dir, "cache") self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -148,16 +153,19 @@ class Wigle(plugins.Plugin): return gps_data def get_pcap_data(self, pcap_filename): - if cache := self.get_cache(pcap_filename): - logging.info(f"[WIGLE] Using cache for {pcap_filename}") - return { - WifiInfo.BSSID: cache["mac"], - WifiInfo.ESSID: cache["hostname"], - WifiInfo.ENCRYPTION: cache["encryption"], - WifiInfo.CHANNEL: cache["channel"], - WifiInfo.FREQUENCY: cache["frequency"], - WifiInfo.RSSI: cache["rssi"], - } + try: + if cache := read_ap_cache(self.cache_dir, self.pcap_filename): + logging.info(f"[WIGLE] Using cache for {pcap_filename}") + return { + WifiInfo.BSSID: cache["mac"], + WifiInfo.ESSID: cache["hostname"], + WifiInfo.ENCRYPTION: cache["encryption"], + WifiInfo.CHANNEL: cache["channel"], + WifiInfo.FREQUENCY: cache["frequency"], + WifiInfo.RSSI: cache["rssi"], + } + except (AttributeError, KeyError): + pass try: pcap_data = extract_from_pcap( pcap_filename, @@ -190,14 +198,16 @@ class Wigle(plugins.Plugin): f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n" ) - writer = csv.writer( - content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\" - ) + writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") for gps_data, pcap_data in data: # write WIFIs try: - timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + timestamp = datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") except ValueError: - timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + timestamp = datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") writer.writerow( [ pcap_data[WifiInfo.BSSID], @@ -334,7 +344,10 @@ class Wigle(plugins.Plugin): def on_unload(self, ui): with ui._lock: - ui.remove_element("wigle") + try: + ui.remove_element("wigle") + except KeyError: + pass def on_ui_update(self, ui): with ui._lock: