From e1f22cd6a04fcf0b4f0e09489ab4ce9f857ded95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:02:22 +0100 Subject: [PATCH 01/10] modified: .gitignore new file: pwnagotchi/plugins/default/cache.py modified: pwnagotchi/plugins/default/wigle.py - Add a cache plugin - Add statistics to wigle --- .gitignore | 3 +- pwnagotchi/plugins/default/cache.py | 53 ++++++++++ pwnagotchi/plugins/default/wigle.py | 153 +++++++++++++++++++++------- 3 files changed, 173 insertions(+), 36 deletions(-) create mode 100644 pwnagotchi/plugins/default/cache.py diff --git a/.gitignore b/.gitignore index 7e99e367..2a45ed72 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -*.pyc \ No newline at end of file +*.pyc +.vscode diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py new file mode 100644 index 00000000..92feff12 --- /dev/null +++ b/pwnagotchi/plugins/default/cache.py @@ -0,0 +1,53 @@ +import logging +import json +import os +import re + +import pwnagotchi.plugins as plugins +from pwnagotchi.ui.components import LabeledValue +from pwnagotchi.ui.view import BLACK +import pwnagotchi.ui.fonts as fonts + + +class Cache(plugins.Plugin): + __author__ = "fmatray" + __version__ = "1.0.0" + __license__ = "GPL3" + __description__ = "A simple plugin to cache AP informations" + + def __init__(self): + self.options = dict() + + def on_config_changed(self, config): + self.handshake_dir = config["bettercap"].get("handshakes") + self.cache_dir = os.path.join(self.handshake_dir, "cache") + if not (os.path.exists(self.cache_dir)): + os.mkdir(self.cache_dir) + + def get_cache(self, file): + cache_filename = os.path.basename( + re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file) + ) + cache_filename = os.path.join(self.cache_dir, cache_filename) + if not os.path.exists(cache_filename): + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + return None + + def cache_ap(self, ap): + mac = ap["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) + filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") + with open(filename, "w") as f: + json.dump(ap, f) + + def on_unfiltered_ap_list(self, agent, aps): + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.cache_ap(ap) + + def on_handshake(self, agent, filename, access_point, client_station): + logging.info(f"[WIGLE] on_handshake") + self.cache_ap(access_point) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 5f872dc5..2d9cb7a7 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -9,6 +9,7 @@ from glob import glob from threading import Lock from io import StringIO from datetime import datetime, UTC +from dataclasses import dataclass from flask import make_response, redirect from pwnagotchi.utils import ( @@ -28,6 +29,40 @@ from pwnagotchi.ui.view import BLACK from scapy.all import Scapy_Exception +@dataclass +class WigleStatistics: + ready: bool = False + username: str = None + rank: int = None + monthrank: int = None + discoveredwiFi: int = None + last: str = None + groupID: str = None + groupname: str = None + grouprank: int = None + + def update_user(self, json_res): + self.ready = True + self.username = json_res["user"] + self.rank = json_res["rank"] + self.monthrank = json_res["monthRank"] + self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"] + last = json_res["statistics"]["last"] + self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" + + def update_user_group(self, json_res): + self.groupID = json_res["groupId"] + self.groupname = json_res["groupName"] + + def update_group(self, json_res): + rank = 1 + for group in json_res["groups"]: + if group['groupId'] == self.groupID: + self.grouprank = rank + rank += 1 + + + class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" @@ -41,14 +76,7 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() - self.statistics = dict( - ready=False, - username=None, - rank=None, - monthrank=None, - discoveredwiFi=None, - last=None, - ) + self.statistics = WigleStatistics() self.last_stat = datetime.now(tz=UTC) self.ui_counter = 0 @@ -61,6 +89,9 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") + self.cache_dir = os.path.join(self.handshake_dir, "cache") + if not (os.path.exists(self.cache_dir)): + os.mkdir(self.cache_dir) self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -86,6 +117,17 @@ class Wigle(plugins.Plugin): return None return pcap_filename + def get_cache(self, pcap_file): + cache_filename = os.path.basename(pcap_file.replace(".pcap", ".cache")) + cache_filename = os.path.join(self.cache_dir, cache_filename) + if not os.path.exists(cache_filename): + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + return None + @staticmethod def extract_gps_data(path): """ @@ -120,8 +162,17 @@ class Wigle(plugins.Plugin): return None return gps_data - @staticmethod - def get_pcap_data(pcap_filename): + def get_pcap_data(self, pcap_filename): + if cache := self.get_cache(pcap_filename): + logging.info(f"[WIGLE] Using cache for {pcap_filename}") + return { + WifiInfo.BSSID: cache["mac"], + WifiInfo.ESSID: cache["hostname"], + WifiInfo.ENCRYPTION: cache["encryption"], + WifiInfo.CHANNEL: cache["channel"], + WifiInfo.FREQUENCY: cache["frequency"], + WifiInfo.RSSI: cache["rssi"], + } try: pcap_data = extract_from_pcap( pcap_filename, @@ -236,31 +287,46 @@ class Wigle(plugins.Plugin): self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) display.on_normal() - def get_statistics(self, force=False): - if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: - return - self.last_stat = datetime.now(tz=UTC) + def request_statistics(self, url): try: - self.statistics["ready"] = False - json_res = requests.get( - "https://api.wigle.net/api/v2/stats/user", + return requests.get( + url, headers={ "Authorization": f"Basic {self.api_key}", "Accept": "application/json", }, timeout=self.timeout, ).json() - if not json_res["success"]: - return - self.statistics["ready"] = True - self.statistics["username"] = json_res["user"] - self.statistics["rank"] = json_res["rank"] - self.statistics["monthrank"] = json_res["monthRank"] - self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"] - last = json_res["statistics"]["last"] - self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" except (requests.exceptions.RequestException, OSError) as exp: - pass + return None + + def get_user_statistics(self): + json_res = self.request_statistics( + "https://api.wigle.net/api/v2/stats/user", + ) + if json_res and json_res["success"]: + self.statistics.update_user(json_res) + + def get_usergroup_statistics(self): + if not self.statistics.username or self.statistics.groupID: + return + url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}" + if json_res := self.request_statistics(url): + self.statistics.update_user_group(json_res) + + def get_group_statistics(self): + if not self.statistics.groupID: + return + json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group") + if json_res and json_res["success"]: + self.statistics.update_group(json_res) + + def get_statistics(self, force=False): + if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30: + self.last_stat = datetime.now(tz=UTC) + self.get_user_statistics() + self.get_usergroup_statistics() + self.get_group_statistics() def on_internet_available(self, agent): if not self.ready: @@ -272,6 +338,21 @@ class Wigle(plugins.Plugin): else: self.get_statistics() + def cache_ap(self, ap): + mac = ap["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) + filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") + with open(filename, "w") as f: + json.dump(ap, f) + + def on_unfiltered_ap_list(self, agent, aps): + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.cache_ap(ap) + + def on_handshake(self, agent, filename, access_point, client_station): + logging.info(f"[WIGLE] on_handshake") + self.cache_ap(access_point) + def on_ui_setup(self, ui): with ui._lock: ui.add_element( @@ -284,20 +365,22 @@ class Wigle(plugins.Plugin): ui.remove_element("wigle") def on_ui_update(self, ui): - if not self.ready: - return with ui._lock: - if not self.statistics["ready"]: + if not (self.ready and self.statistics.ready): ui.set("wigle", "We Will Wait Wigle") return msg = "-" - self.ui_counter = (self.ui_counter + 1) % 4 + self.ui_counter = (self.ui_counter + 1) % 6 if self.ui_counter == 0: - msg = f"User:{self.statistics['username']}" + msg = f"User:{self.statistics.username}" if self.ui_counter == 1: - msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}" + msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}" elif self.ui_counter == 2: - msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" + msg = f"{self.statistics.discoveredwiFi} discovered WiFis" elif self.ui_counter == 3: - msg = f"Last upl.:{self.statistics['last']}" + msg = f"Last upl.:{self.statistics.last}" + elif self.ui_counter == 4: + msg = f"Grp:{self.statistics.groupname}" + elif self.ui_counter == 5: + msg = f"Grp rank:{self.statistics.grouprank}" ui.set("wigle", msg) From 9d0df49fb6e0d79082c3a4485f7595ef3338f8ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:04:14 +0100 Subject: [PATCH 02/10] add cache.py --- pwnagotchi/plugins/default/cache.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 92feff12..0ab0368d 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -8,6 +8,8 @@ from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts +def get_cache(): + return None class Cache(plugins.Plugin): __author__ = "fmatray" From 5d668ae34ef0ee94af5f85118bd9eae936ed32d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:04:37 +0100 Subject: [PATCH 03/10] update wigle.py --- pwnagotchi/plugins/default/wigle.py | 32 +---------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 2d9cb7a7..ec120a67 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -57,12 +57,11 @@ class WigleStatistics: def update_group(self, json_res): rank = 1 for group in json_res["groups"]: - if group['groupId'] == self.groupID: + if group["groupId"] == self.groupID: self.grouprank = rank rank += 1 - class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" @@ -89,9 +88,6 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") - self.cache_dir = os.path.join(self.handshake_dir, "cache") - if not (os.path.exists(self.cache_dir)): - os.mkdir(self.cache_dir) self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -117,17 +113,6 @@ class Wigle(plugins.Plugin): return None return pcap_filename - def get_cache(self, pcap_file): - cache_filename = os.path.basename(pcap_file.replace(".pcap", ".cache")) - cache_filename = os.path.join(self.cache_dir, cache_filename) - if not os.path.exists(cache_filename): - return None - try: - with open(cache_filename, "r") as f: - return json.load(f) - except Exception as e: - return None - @staticmethod def extract_gps_data(path): """ @@ -338,21 +323,6 @@ class Wigle(plugins.Plugin): else: self.get_statistics() - def cache_ap(self, ap): - mac = ap["mac"].replace(":", "") - hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) - filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") - with open(filename, "w") as f: - json.dump(ap, f) - - def on_unfiltered_ap_list(self, agent, aps): - for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): - self.cache_ap(ap) - - def on_handshake(self, agent, filename, access_point, client_station): - logging.info(f"[WIGLE] on_handshake") - self.cache_ap(access_point) - def on_ui_setup(self, ui): with ui._lock: ui.add_element( From 1d635d955bd6c90e4660d54348eacd5056aaf41c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 14:10:33 +0100 Subject: [PATCH 04/10] BT-Tether: add a check to DNS config --- pwnagotchi/plugins/default/bt-tether.py | 107 +++++++++++++++--------- 1 file changed, 67 insertions(+), 40 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 7d797c1f..ecd0ed06 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -123,6 +123,7 @@ MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$" DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$" + class BTTether(plugins.Plugin): __author__ = "Jayofelony, modified my fmatray" __version__ = "1.4" @@ -138,8 +139,7 @@ class BTTether(plugins.Plugin): @staticmethod def exec_cmd(cmd, args, pattern=None): try: - result = subprocess.run([cmd] + args, - check=True, capture_output=True, text=True) + result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True) if pattern: return result.stdout.find(pattern) return result @@ -152,7 +152,7 @@ class BTTether(plugins.Plugin): def nmcli(self, args, pattern=None): return self.exec_cmd("nmcli", args, pattern) - + def on_loaded(self): logging.info("[BT-Tether] plugin loaded.") @@ -162,7 +162,7 @@ class BTTether(plugins.Plugin): return if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])): logging.error("[BT-Tether] Error with mac address") - return + return if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]): logging.error("[BT-Tether] Phone type not supported") @@ -181,23 +181,30 @@ class BTTether(plugins.Plugin): self.mac = self.options["mac"] dns = self.options.get("dns", "8.8.8.8 1.1.1.1") if not re.match(DNS_PTTRN, dns): - logging.error(f"[BT-Tether] DNS error: {dns}") + if dns == "": + logging.error(f"[BT-Tether] Empty DNS setting") + else: + logging.error(f"[BT-Tether] Wrong DNS setting: '{dns}'") return - dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning + dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning try: # Configure connection. Metric is set to 200 to prefer connection over USB - self.nmcli(["connection", "modify", f"{self.phone_name}", - "connection.type", "bluetooth", - "bluetooth.type", "panu", - "bluetooth.bdaddr", f"{self.mac}", - "connection.autoconnect", "yes", - "connection.autoconnect-retries", "0", - "ipv4.method", "manual", - "ipv4.dns", f"{dns}", - "ipv4.addresses", f"{address}/24", - "ipv4.gateway", f"{gateway}", - "ipv4.route-metric", "200" ]) + self.nmcli( + [ + "connection", "modify", f"{self.phone_name}", + "connection.type", "bluetooth", + "bluetooth.type", "panu", + "bluetooth.bdaddr", f"{self.mac}", + "connection.autoconnect", "yes", + "connection.autoconnect-retries", "0", + "ipv4.method", "manual", + "ipv4.dns", f"{dns}", + "ipv4.addresses", f"{address}/24", + "ipv4.gateway", f"{gateway}", + "ipv4.route-metric", "200", + ] + ) self.nmcli(["connection", "reload"]) self.ready = True logging.info(f"[BT-Tether] Connection {self.phone_name} configured") @@ -205,7 +212,7 @@ class BTTether(plugins.Plugin): logging.error(f"[BT-Tether] Error while configuring: {e}") return try: - time.sleep(5) # Give some delay to configure before going up + time.sleep(5) # Give some delay to configure before going up self.nmcli(["connection", "up", f"{self.phone_name}"]) except Exception as e: logging.error(f"[BT-Tether] Failed to connect to device: {e}") @@ -230,9 +237,18 @@ class BTTether(plugins.Plugin): def on_ui_setup(self, ui): with ui._lock: - ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', - position=(ui.width() / 2 - 10, 0), - label_font=fonts.Bold, text_font=fonts.Medium)) + ui.add_element( + "bluetooth", + LabeledValue( + color=BLACK, + label="BT", + value="-", + position=(ui.width() / 2 - 10, 0), + label_font=fonts.Bold, + text_font=fonts.Medium, + ), + ) + def on_ui_update(self, ui): if not self.ready: return @@ -240,17 +256,26 @@ class BTTether(plugins.Plugin): status = "" try: # Checking connection - if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name], - "activated") != -1: + if ( + self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name], + "activated", + ) + != -1 + ): ui.set("bluetooth", "U") return else: ui.set("bluetooth", "D") status = "BT Conn. down" - + # Checking device - if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac], - "(connected)") != -1: + if ( + self.nmcli( + ["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac], + "(connected)", + ) + != -1 + ): ui.set("bluetooth", "C") status += "\nBT dev conn." else: @@ -269,26 +294,28 @@ class BTTether(plugins.Plugin): if path == "/" or not path: try: bluetooth = self.bluetoothctl(["info", self.mac]) - bluetooth = bluetooth.stdout.replace('\n', '
') + bluetooth = bluetooth.stdout.replace("\n", "
") except Exception as e: bluetooth = "Error while checking bluetoothctl" - - try: - device = self.nmcli(["-w", "0","device", "show", self.mac]) - device = device.stdout.replace('\n', '
') + + try: + device = self.nmcli(["-w", "0", "device", "show", self.mac]) + device = device.stdout.replace("\n", "
") except Exception as e: device = "Error while checking nmcli device" - + try: - connection = self.nmcli(["-w", "0","connection", "show", self.phone_name]) - connection = connection.stdout.replace('\n', '
') + connection = self.nmcli(["-w", "0", "connection", "show", self.phone_name]) + connection = connection.stdout.replace("\n", "
") except Exception as e: connection = "Error while checking nmcli connection" logging.debug(device) - return render_template_string(TEMPLATE, - title="BT-Tether", - bluetooth=bluetooth, - device=device, - connection=connection) - abort(404) \ No newline at end of file + return render_template_string( + TEMPLATE, + title="BT-Tether", + bluetooth=bluetooth, + device=device, + connection=connection, + ) + abort(404) From 29d1ca6728f0f9fb6cb4b54aba2e185249587ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Thu, 13 Feb 2025 22:17:09 +0100 Subject: [PATCH 05/10] Bt-tether: add autoconnect to device Cache: - add cache to all hools with access point - add cleaning Wigle: use cache --- pwnagotchi/plugins/default/bt-tether.py | 11 ++- pwnagotchi/plugins/default/cache.py | 125 ++++++++++++++++++------ pwnagotchi/plugins/default/wigle.py | 47 +++++---- 3 files changed, 133 insertions(+), 50 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index ecd0ed06..afced4d3 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -144,7 +144,10 @@ class BTTether(plugins.Plugin): return result.stdout.find(pattern) return result except Exception as exp: - logging.error(f"[BT-Tether] Error with {cmd} : {exp}") + logging.error(f"[BT-Tether] Error with {cmd}") + logging.error(f"[BT-Tether] Exception : {exp}") + logging.error(f"[BT-Tether] STDOUT : {result.stdout}") + logging.error(f"[BT-Tether] STDERR : {result.stderr}") raise exp def bluetoothctl(self, args, pattern=None): @@ -205,6 +208,12 @@ class BTTether(plugins.Plugin): "ipv4.route-metric", "200", ] ) + # Configure Device to autoconnect + self.nmcli([ + "device", "set", f"{self.mac}", + "autoconnect", "yes", + "managed", "yes" + ]) self.nmcli(["connection", "reload"]) self.ready = True logging.info(f"[BT-Tether] Connection {self.phone_name} configured") diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 0ab0368d..a94bc3dc 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -2,14 +2,25 @@ import logging import json import os import re - +import pathlib import pwnagotchi.plugins as plugins -from pwnagotchi.ui.components import LabeledValue -from pwnagotchi.ui.view import BLACK -import pwnagotchi.ui.fonts as fonts +from datetime import datetime, UTC +from threading import Lock + + +def read_ap_cache(cache_dir, file): + cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file)) + cache_filename = os.path.join(cache_dir, cache_filename) + if not os.path.exists(cache_filename): + logging.info("Cache not exist") + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + logging.info(f"Exception {e}") + return None -def get_cache(): - return None class Cache(plugins.Plugin): __author__ = "fmatray" @@ -19,37 +30,87 @@ class Cache(plugins.Plugin): def __init__(self): self.options = dict() + self.ready = False + self.lock = Lock() + + def on_loaded(self): + logging.info("[CACHE] plugin loaded.") def on_config_changed(self, config): - self.handshake_dir = config["bettercap"].get("handshakes") - self.cache_dir = os.path.join(self.handshake_dir, "cache") - if not (os.path.exists(self.cache_dir)): - os.mkdir(self.cache_dir) - - def get_cache(self, file): - cache_filename = os.path.basename( - re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file) - ) - cache_filename = os.path.join(self.cache_dir, cache_filename) - if not os.path.exists(cache_filename): - return None try: - with open(cache_filename, "r") as f: - return json.load(f) - except Exception as e: - return None + handshake_dir = config["bettercap"].get("handshakes") + self.cache_dir = os.path.join(handshake_dir, "cache") + os.makedirs(self.cache_dir, exist_ok=True) + except Exception: + logging.info(f"[CACHE] Cannot access to the cache directory") + return + self.last_clean = datetime.now(tz=UTC) + self.ready = True + logging.info(f"[CACHE] Cache plugin configured") + self.clean_ap_cache() - def cache_ap(self, ap): - mac = ap["mac"].replace(":", "") - hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) - filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") - with open(filename, "w") as f: - json.dump(ap, f) + def on_unload(self, ui): + self.clean_ap_cache() + + def clean_ap_cache(self): + if not self.ready: + return + with self.lock: + ctime = datetime.now(tz=UTC) + cache_to_delete = list() + for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"): + mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) + if (ctime - mtime).total_seconds() > 60 * 5: + cache_to_delete.append(cache_file) + if cache_to_delete: + logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files") + for cache_file in cache_to_delete: + try: + cache_file.unlink() + except Exception as e: + logging.error(f"[CACHE] Cannot delete {cache_file}: {e}") + + def write_ap_cache(self, access_point): + with self.lock: + try: + mac = access_point["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"]) + except KeyError: + return + cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache") + try: + with open(cache_file, "w") as f: + json.dump(access_point, f) + except Exception as e: + logging.error(f"[CACHE] Cannot write {cache_file}: {e}") + pass + + def on_wifi_update(self, agent, access_points): + if self.ready: + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], access_points): + self.write_ap_cache(ap) def on_unfiltered_ap_list(self, agent, aps): - for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): - self.cache_ap(ap) + if self.ready: + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.write_ap_cache(ap) + + def on_association(self, agent, access_point): + if self.ready: + self.write_ap_cache(access_point) + + def on_deauthentication(self, agent, access_point, client_station): + if self.ready: + self.write_ap_cache(access_point) def on_handshake(self, agent, filename, access_point, client_station): - logging.info(f"[WIGLE] on_handshake") - self.cache_ap(access_point) + if self.ready: + self.write_ap_cache(access_point) + + def on_ui_update(self, ui): + if not self.ready: + return + current_time = datetime.now(tz=UTC) + if (current_time - self.last_clean).total_seconds() > 60: + self.clean_ap_cache() + self.last_clean = current_time diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 2b5a367c..5000629d 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -20,6 +20,7 @@ from pwnagotchi.utils import ( remove_whitelisted, ) from pwnagotchi import plugins +from pwnagotchi.plugins.default.cache import read_ap_cache from pwnagotchi._version import __version__ as __pwnagotchi_version__ import pwnagotchi.ui.fonts as fonts @@ -64,7 +65,7 @@ class WigleStatistics: class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" - __version__ = "4.0.0" + __version__ = "4.1.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" LABEL_SPACING = 0 @@ -79,6 +80,9 @@ class Wigle(plugins.Plugin): self.last_stat = datetime.now(tz=UTC) self.ui_counter = 0 + def on_loaded(self): + logging.info("[WIGLE] plugin loaded.") + def on_config_changed(self, config): self.api_key = self.options.get("api_key", None) if not self.api_key: @@ -88,6 +92,7 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") + self.cache_dir = os.path.join(self.handshake_dir, "cache") self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -148,16 +153,19 @@ class Wigle(plugins.Plugin): return gps_data def get_pcap_data(self, pcap_filename): - if cache := self.get_cache(pcap_filename): - logging.info(f"[WIGLE] Using cache for {pcap_filename}") - return { - WifiInfo.BSSID: cache["mac"], - WifiInfo.ESSID: cache["hostname"], - WifiInfo.ENCRYPTION: cache["encryption"], - WifiInfo.CHANNEL: cache["channel"], - WifiInfo.FREQUENCY: cache["frequency"], - WifiInfo.RSSI: cache["rssi"], - } + try: + if cache := read_ap_cache(self.cache_dir, self.pcap_filename): + logging.info(f"[WIGLE] Using cache for {pcap_filename}") + return { + WifiInfo.BSSID: cache["mac"], + WifiInfo.ESSID: cache["hostname"], + WifiInfo.ENCRYPTION: cache["encryption"], + WifiInfo.CHANNEL: cache["channel"], + WifiInfo.FREQUENCY: cache["frequency"], + WifiInfo.RSSI: cache["rssi"], + } + except (AttributeError, KeyError): + pass try: pcap_data = extract_from_pcap( pcap_filename, @@ -190,14 +198,16 @@ class Wigle(plugins.Plugin): f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n" ) - writer = csv.writer( - content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\" - ) + writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") for gps_data, pcap_data in data: # write WIFIs try: - timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + timestamp = datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") except ValueError: - timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + timestamp = datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") writer.writerow( [ pcap_data[WifiInfo.BSSID], @@ -334,7 +344,10 @@ class Wigle(plugins.Plugin): def on_unload(self, ui): with ui._lock: - ui.remove_element("wigle") + try: + ui.remove_element("wigle") + except KeyError: + pass def on_ui_update(self, ui): with ui._lock: From 2dc45bc4b49a63df0144ee13d5802e6a1c2f8708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Sun, 16 Feb 2025 20:07:22 +0100 Subject: [PATCH 06/10] - Add more sentences to voice.py, with some geek references --- pwnagotchi/voice.py | 69 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/pwnagotchi/voice.py b/pwnagotchi/voice.py index fad1eea8..38dbaefd 100644 --- a/pwnagotchi/voice.py +++ b/pwnagotchi/voice.py @@ -27,11 +27,19 @@ class Voice: self._('Hack the Planet!'), self._('No more mister Wi-Fi!!'), self._('Pretty fly 4 a Wi-Fi!'), + self._('Good Pwning!'), # Battlestar Galactica + self._('Ensign, Engage!'), # Star trek + self._('Free your Wi-Fi!'), # Matrix + self._('Chevron Seven, locked.'), # Stargate + self._('May the Wi-fi be with you'), # Star wars ]) def on_keys_generation(self): return random.choice([ - self._('Generating keys, do not turn off ...')]) + self._('Generating keys, do not turn off ...'), + self._('Are you the keymaster?'), # Ghostbusters + self._('I am the keymaster!'), # Ghostbusters + ]) def on_normal(self): return random.choice([ @@ -44,8 +52,7 @@ class Voice: def on_reading_logs(self, lines_so_far=0): if lines_so_far == 0: return self._('Reading last session logs ...') - else: - return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far) + return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far) def on_bored(self): return random.choice([ @@ -53,7 +60,11 @@ class Voice: self._('Let\'s go for a walk!')]) def on_motivated(self, reward): - return self._('This is the best day of my life!') + return random.choice([ + self._('This is the best day of my life!'), + self._('All your base are belong to us'), + self._('Fascinating!'), # Star trek + ]) def on_demotivated(self, reward): return self._('Shitty day :/') @@ -63,6 +74,8 @@ class Voice: self._('I\'m extremely bored ...'), self._('I\'m very sad ...'), self._('I\'m sad'), + self._('I\'m so happy ...'), # Marvin in H2G2 + self._('Life? Don\'t talk to me about life.'), # Also Marvin in H2G2 '...']) def on_angry(self): @@ -78,17 +91,17 @@ class Voice: self._('I pwn therefore I am.'), self._('So many networks!!!'), self._('I\'m having so much fun!'), + self._('It\'s a Wi-Fi system! I know this!'), # Jurassic park self._('My crime is that of curiosity ...')]) def on_new_peer(self, peer): if peer.first_encounter(): return random.choice([ self._('Hello {name}! Nice to meet you.').format(name=peer.name())]) - else: - return random.choice([ - self._('Yo {name}! Sup?').format(name=peer.name()), - self._('Hey {name} how are you doing?').format(name=peer.name()), - self._('Unit {name} is nearby!').format(name=peer.name())]) + return random.choice([ + self._('Yo {name}! Sup?').format(name=peer.name()), + self._('Hey {name} how are you doing?').format(name=peer.name()), + self._('Unit {name} is nearby!').format(name=peer.name())]) def on_lost_peer(self, peer): return random.choice([ @@ -104,19 +117,23 @@ class Voice: def on_grateful(self): return random.choice([ self._('Good friends are a blessing!'), - self._('I love my friends!')]) + self._('I love my friends!') + ]) def on_lonely(self): return random.choice([ self._('Nobody wants to play with me ...'), self._('I feel so alone ...'), + self._('Let\'s find friends'), self._('Where\'s everybody?!')]) def on_napping(self, secs): return random.choice([ self._('Napping for {secs}s ...').format(secs=secs), self._('Zzzzz'), - self._('ZzzZzzz ({secs}s)').format(secs=secs)]) + self._('Snoring ...'), + self._('ZzzZzzz ({secs}s)').format(secs=secs), + ]) def on_shutdown(self): return random.choice([ @@ -124,12 +141,17 @@ class Voice: self._('Zzz')]) def on_awakening(self): - return random.choice(['...', '!']) + return random.choice([ + '...', + '!', + 'Hello World!', + self._('I dreamed of electric sheep'), + ]) def on_waiting(self, secs): return random.choice([ - self._('Waiting for {secs}s ...').format(secs=secs), '...', + self._('Waiting for {secs}s ...').format(secs=secs), self._('Looking around ({secs}s)').format(secs=secs)]) def on_assoc(self, ap): @@ -138,12 +160,16 @@ class Voice: return random.choice([ self._('Hey {what} let\'s be friends!').format(what=what), self._('Associating to {what}').format(what=what), - self._('Yo {what}!').format(what=what)]) + self._('Yo {what}!').format(what=what), + self._('Rise and Shine Mr. {what}!').format(what=what), # Half Life + ]) def on_deauth(self, sta): return random.choice([ - self._('Just decided that {mac} needs no WiFi!').format(mac=sta['mac']), + self._('Just decided that {mac} needs no Wi-Fi!').format(mac=sta['mac']), self._('Deauthenticating {mac}').format(mac=sta['mac']), + self._('No more Wi-Fi for {mac}').format(mac=sta['mac']), + self._('It\'s a trap! {mac}').format(mac=sta['mac']), # Star wars self._('Kickbanning {mac}!').format(mac=sta['mac'])]) def on_handshakes(self, new_shakes): @@ -155,10 +181,19 @@ class Voice: return self._('You have {count} new message{plural}!').format(count=count, plural=s) def on_rebooting(self): - return self._("Oops, something went wrong ... Rebooting ...") + return random.choice([ + self._("Oops, something went wrong ... Rebooting ..."), + self._("Have you tried turning it off and on again?"), # The IT crew + self._("I\'m afraid Dave"), # 2001 Space Odyssey + self._("I\'m dead, Jim!"), # Star Trek + self._("I have a bad feeling about this"), # Star wars + ]) def on_uploading(self, to): - return self._("Uploading data to {to} ...").format(to=to) + return random.choice([ + self._("Uploading data to {to} ...").format(to=to), + self._("Beam me up to {to}").format(to=to), + ]) def on_downloading(self, name): return self._("Downloading from {name} ...").format(name=name) From d7f7dac0d743c205b999edc42c9d72aa84de8a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Sun, 16 Feb 2025 20:13:01 +0100 Subject: [PATCH 07/10] - Catch an exception on lstat() --- pwnagotchi/plugins/default/cache.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index a94bc3dc..64c8f566 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -59,9 +59,12 @@ class Cache(plugins.Plugin): ctime = datetime.now(tz=UTC) cache_to_delete = list() for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"): - mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) - if (ctime - mtime).total_seconds() > 60 * 5: - cache_to_delete.append(cache_file) + try: + mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) + if (ctime - mtime).total_seconds() > 60 * 5: + cache_to_delete.append(cache_file) + except FileNotFoundError: + pass if cache_to_delete: logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files") for cache_file in cache_to_delete: From 7dd809afe01acfd5e4bd1c57d0d75f4eff618d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Sun, 16 Feb 2025 20:29:06 +0100 Subject: [PATCH 08/10] add otion for cache --- pwnagotchi/defaults.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 532e9d6a..8bdb5955 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -33,6 +33,8 @@ main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". main.plugins.fix_services.enabled = true +main.plugins.cache.enabled = true + main.plugins.gdrivesync.enabled = false main.plugins.gdrivesync.backupfiles = [''] main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups" From c433a6c2d54c56c63d38e7529a36321cc006f0c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Mon, 17 Feb 2025 00:49:46 +0100 Subject: [PATCH 09/10] remove debug messages --- pwnagotchi/plugins/default/bt-tether.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index afced4d3..dc12e71d 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -146,8 +146,6 @@ class BTTether(plugins.Plugin): except Exception as exp: logging.error(f"[BT-Tether] Error with {cmd}") logging.error(f"[BT-Tether] Exception : {exp}") - logging.error(f"[BT-Tether] STDOUT : {result.stdout}") - logging.error(f"[BT-Tether] STDERR : {result.stderr}") raise exp def bluetoothctl(self, args, pattern=None): From 393981e0ba27a0cd63aa66fb479292f2bccb23df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Mon, 17 Feb 2025 11:01:16 +0100 Subject: [PATCH 10/10] Remove debgging mesasge in cache --- pwnagotchi/plugins/default/cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 64c8f566..f3b826ed 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -70,8 +70,8 @@ class Cache(plugins.Plugin): for cache_file in cache_to_delete: try: cache_file.unlink() - except Exception as e: - logging.error(f"[CACHE] Cannot delete {cache_file}: {e}") + except FileNotFoundError as e: + pass def write_ap_cache(self, access_point): with self.lock: