From e1f22cd6a04fcf0b4f0e09489ab4ce9f857ded95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:02:22 +0100 Subject: [PATCH] modified: .gitignore new file: pwnagotchi/plugins/default/cache.py modified: pwnagotchi/plugins/default/wigle.py - Add a cache plugin - Add statistics to wigle --- .gitignore | 3 +- pwnagotchi/plugins/default/cache.py | 53 ++++++++++ pwnagotchi/plugins/default/wigle.py | 153 +++++++++++++++++++++------- 3 files changed, 173 insertions(+), 36 deletions(-) create mode 100644 pwnagotchi/plugins/default/cache.py diff --git a/.gitignore b/.gitignore index 7e99e367..2a45ed72 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -*.pyc \ No newline at end of file +*.pyc +.vscode diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py new file mode 100644 index 00000000..92feff12 --- /dev/null +++ b/pwnagotchi/plugins/default/cache.py @@ -0,0 +1,53 @@ +import logging +import json +import os +import re + +import pwnagotchi.plugins as plugins +from pwnagotchi.ui.components import LabeledValue +from pwnagotchi.ui.view import BLACK +import pwnagotchi.ui.fonts as fonts + + +class Cache(plugins.Plugin): + __author__ = "fmatray" + __version__ = "1.0.0" + __license__ = "GPL3" + __description__ = "A simple plugin to cache AP informations" + + def __init__(self): + self.options = dict() + + def on_config_changed(self, config): + self.handshake_dir = config["bettercap"].get("handshakes") + self.cache_dir = os.path.join(self.handshake_dir, "cache") + if not (os.path.exists(self.cache_dir)): + os.mkdir(self.cache_dir) + + def get_cache(self, file): + cache_filename = os.path.basename( + re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file) + ) + cache_filename = os.path.join(self.cache_dir, cache_filename) + if not os.path.exists(cache_filename): + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + return None + + def cache_ap(self, ap): + mac = ap["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) + filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") + with open(filename, "w") as f: + json.dump(ap, f) + + def on_unfiltered_ap_list(self, agent, aps): + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.cache_ap(ap) + + def on_handshake(self, agent, filename, access_point, client_station): + logging.info(f"[WIGLE] on_handshake") + self.cache_ap(access_point) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 5f872dc5..2d9cb7a7 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -9,6 +9,7 @@ from glob import glob from threading import Lock from io import StringIO from datetime import datetime, UTC +from dataclasses import dataclass from flask import make_response, redirect from pwnagotchi.utils import ( @@ -28,6 +29,40 @@ from pwnagotchi.ui.view import BLACK from scapy.all import Scapy_Exception +@dataclass +class WigleStatistics: + ready: bool = False + username: str = None + rank: int = None + monthrank: int = None + discoveredwiFi: int = None + last: str = None + groupID: str = None + groupname: str = None + grouprank: int = None + + def update_user(self, json_res): + self.ready = True + self.username = json_res["user"] + self.rank = json_res["rank"] + self.monthrank = json_res["monthRank"] + self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"] + last = json_res["statistics"]["last"] + self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" + + def update_user_group(self, json_res): + self.groupID = json_res["groupId"] + self.groupname = json_res["groupName"] + + def update_group(self, json_res): + rank = 1 + for group in json_res["groups"]: + if group['groupId'] == self.groupID: + self.grouprank = rank + rank += 1 + + + class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" @@ -41,14 +76,7 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() - self.statistics = dict( - ready=False, - username=None, - rank=None, - monthrank=None, - discoveredwiFi=None, - last=None, - ) + self.statistics = WigleStatistics() self.last_stat = datetime.now(tz=UTC) self.ui_counter = 0 @@ -61,6 +89,9 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") + self.cache_dir = os.path.join(self.handshake_dir, "cache") + if not (os.path.exists(self.cache_dir)): + os.mkdir(self.cache_dir) self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -86,6 +117,17 @@ class Wigle(plugins.Plugin): return None return pcap_filename + def get_cache(self, pcap_file): + cache_filename = os.path.basename(pcap_file.replace(".pcap", ".cache")) + cache_filename = os.path.join(self.cache_dir, cache_filename) + if not os.path.exists(cache_filename): + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + return None + @staticmethod def extract_gps_data(path): """ @@ -120,8 +162,17 @@ class Wigle(plugins.Plugin): return None return gps_data - @staticmethod - def get_pcap_data(pcap_filename): + def get_pcap_data(self, pcap_filename): + if cache := self.get_cache(pcap_filename): + logging.info(f"[WIGLE] Using cache for {pcap_filename}") + return { + WifiInfo.BSSID: cache["mac"], + WifiInfo.ESSID: cache["hostname"], + WifiInfo.ENCRYPTION: cache["encryption"], + WifiInfo.CHANNEL: cache["channel"], + WifiInfo.FREQUENCY: cache["frequency"], + WifiInfo.RSSI: cache["rssi"], + } try: pcap_data = extract_from_pcap( pcap_filename, @@ -236,31 +287,46 @@ class Wigle(plugins.Plugin): self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) display.on_normal() - def get_statistics(self, force=False): - if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: - return - self.last_stat = datetime.now(tz=UTC) + def request_statistics(self, url): try: - self.statistics["ready"] = False - json_res = requests.get( - "https://api.wigle.net/api/v2/stats/user", + return requests.get( + url, headers={ "Authorization": f"Basic {self.api_key}", "Accept": "application/json", }, timeout=self.timeout, ).json() - if not json_res["success"]: - return - self.statistics["ready"] = True - self.statistics["username"] = json_res["user"] - self.statistics["rank"] = json_res["rank"] - self.statistics["monthrank"] = json_res["monthRank"] - self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"] - last = json_res["statistics"]["last"] - self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" except (requests.exceptions.RequestException, OSError) as exp: - pass + return None + + def get_user_statistics(self): + json_res = self.request_statistics( + "https://api.wigle.net/api/v2/stats/user", + ) + if json_res and json_res["success"]: + self.statistics.update_user(json_res) + + def get_usergroup_statistics(self): + if not self.statistics.username or self.statistics.groupID: + return + url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}" + if json_res := self.request_statistics(url): + self.statistics.update_user_group(json_res) + + def get_group_statistics(self): + if not self.statistics.groupID: + return + json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group") + if json_res and json_res["success"]: + self.statistics.update_group(json_res) + + def get_statistics(self, force=False): + if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30: + self.last_stat = datetime.now(tz=UTC) + self.get_user_statistics() + self.get_usergroup_statistics() + self.get_group_statistics() def on_internet_available(self, agent): if not self.ready: @@ -272,6 +338,21 @@ class Wigle(plugins.Plugin): else: self.get_statistics() + def cache_ap(self, ap): + mac = ap["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) + filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") + with open(filename, "w") as f: + json.dump(ap, f) + + def on_unfiltered_ap_list(self, agent, aps): + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.cache_ap(ap) + + def on_handshake(self, agent, filename, access_point, client_station): + logging.info(f"[WIGLE] on_handshake") + self.cache_ap(access_point) + def on_ui_setup(self, ui): with ui._lock: ui.add_element( @@ -284,20 +365,22 @@ class Wigle(plugins.Plugin): ui.remove_element("wigle") def on_ui_update(self, ui): - if not self.ready: - return with ui._lock: - if not self.statistics["ready"]: + if not (self.ready and self.statistics.ready): ui.set("wigle", "We Will Wait Wigle") return msg = "-" - self.ui_counter = (self.ui_counter + 1) % 4 + self.ui_counter = (self.ui_counter + 1) % 6 if self.ui_counter == 0: - msg = f"User:{self.statistics['username']}" + msg = f"User:{self.statistics.username}" if self.ui_counter == 1: - msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}" + msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}" elif self.ui_counter == 2: - msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" + msg = f"{self.statistics.discoveredwiFi} discovered WiFis" elif self.ui_counter == 3: - msg = f"Last upl.:{self.statistics['last']}" + msg = f"Last upl.:{self.statistics.last}" + elif self.ui_counter == 4: + msg = f"Grp:{self.statistics.groupname}" + elif self.ui_counter == 5: + msg = f"Grp rank:{self.statistics.grouprank}" ui.set("wigle", msg)