diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index a4b79970..d446973d 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -20,6 +20,11 @@ from pwnagotchi.utils import ( ) from pwnagotchi import plugins from pwnagotchi._version import __version__ as __pwnagotchi_version__ + +import pwnagotchi.ui.fonts as fonts +from pwnagotchi.ui.components import Text +from pwnagotchi.ui.view import BLACK + from scapy.all import Scapy_Exception @@ -28,6 +33,7 @@ class Wigle(plugins.Plugin): __version__ = "4.0.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" + LABEL_SPACING = 0 def __init__(self): self.ready = False @@ -35,6 +41,16 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() + self.statistics = dict( + ready=False, + username=None, + rank=None, + monthrank=None, + discoveredwiFi=None, + last=None, + ) + self.last_stat = datetime.now(tz=UTC) + self.ui_counter = 0 def on_config_changed(self, config): api_name = self.options.get("api_name", None) @@ -44,10 +60,11 @@ class Wigle(plugins.Plugin): return self.auth = (api_name, api_key) self.donate = self.options.get("donate", False) - self.handshake_dir = config["bettercap"].get("handshakes", None) + self.handshake_dir = config["bettercap"].get("handshakes") self.cvs_dir = self.options.get("cvs_dir", None) - self.whitelist = config["main"].get("whitelist", None) - self.timeout = config["main"].get("timeout", 30) + self.whitelist = config["main"].get("whitelist", []) + self.timeout = self.options.get("timeout", 30) + self.position = self.options.get("position", (10, 10)) self.ready = True logging.info("[WIGLE] Ready for wardriving!!!") @@ -174,7 +191,7 @@ class Wigle(plugins.Plugin): except Exception as exp: logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}") - def upload_to_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): + def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): try: json_res = requests.post( "https://api.wigle.net/api/v2/file/upload", @@ -188,37 +205,94 @@ class Wigle(plugins.Plugin): raise requests.exceptions.RequestException(json_res["message"]) reported += no_err_entries self.report.update(data={"reported": reported}) - logging.info("[WIGLE] Successfully uploaded %d wifis", len(no_err_entries)) + logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis") except (requests.exceptions.RequestException, OSError) as exp: self.skip += no_err_entries logging.debug(f"[WIGLE] Exception while uploading: {exp}") + def upload_new_handshakes(self, reported, new_gps_files, agent): + logging.info("[WIGLE] Uploading new handshakes to wigle.net") + csv_entries, no_err_entries = list(), list() + for gps_file in new_gps_files: + logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}") + if ( + (pcap_filename := self.get_pcap_filename(gps_file)) + and (gps_data := self.get_gps_data(gps_file)) + and (pcap_data := self.get_pcap_data(pcap_filename)) + ): + csv_entries.append((gps_data, pcap_data)) + no_err_entries.append(gps_file) + else: + self.skip.append(gps_file) + logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") + if csv_entries: + cvs_filename, cvs_content = self.generate_csv(csv_entries) + self.save_to_file(cvs_filename, cvs_content) + display = agent.view() + display.on_uploading("wigle.net") + self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) + display.on_normal() + + def get_statistics(self): + if (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: + return + self.last_stat = datetime.now(tz=UTC) + try: + self.statistics["ready"] = False + json_res = requests.get( + "https://api.wigle.net/api/v2/stats/user", + headers={"Accept": "application/json"}, + auth=self.auth, + timeout=self.timeout, + ).json() + if not json_res["success"]: + return + self.statistics["ready"] = True + self.statistics["username"] = json_res["user"] + self.statistics["rank"] = json_res["rank"] + self.statistics["monthrank"] = json_res["monthRank"] + self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"] + last = json_res["statistics"]["last"] + self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" + except (requests.exceptions.RequestException, OSError) as exp: + pass + def on_internet_available(self, agent): if not self.ready: return with self.lock: reported = self.report.data_field_or("reported", default=list()) if new_gps_files := self.get_new_gps_files(reported): - logging.info("[WIGLE] Uploading new handshakes to wigle.net") - csv_entries, no_err_entries = list(), list() - for gps_file in new_gps_files: - logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}") - if ( - (pcap_filename := self.get_pcap_filename(gps_file)) - and (gps_data := self.get_gps_data(gps_file)) - and (pcap_data := self.get_pcap_data(pcap_filename)) - ): - csv_entries.append((gps_data, pcap_data)) - no_err_entries.append(gps_file) - else: - self.skip.append(gps_file) - logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") - if csv_entries: - cvs_filename, cvs_content = self.generate_csv(csv_entries) - self.save_to_file(cvs_filename, cvs_content) - display = agent.view() - display.on_uploading("wigle.net") - self.upload_to_wigle( - reported, cvs_filename, cvs_content, no_err_entries - ) - display.on_normal() + self.upload_new_handshakes(reported, new_gps_files, agent) + else: + self.get_statistics() + + def on_ui_setup(self, ui): + with ui._lock: + ui.add_element( + "wigle", + Text(value="-", position=self.position, font=fonts.Small, color=BLACK), + ) + + def on_unload(self, ui): + with ui._lock: + ui.remove_element("wigle") + + def on_ui_update(self, ui): + if not self.ready: + return + with ui._lock: + if not self.statistics["ready"]: + ui.set("wigle", "We Will Wait Wigle") + return + msg = "-" + self.ui_counter = (self.ui_counter + 1) % 4 + if self.ui_counter == 0: + msg = f"User:{self.statistics['username']}" + if self.ui_counter == 1: + msg = f"Rank:{self.statistics['rank']} Monthly:{self.statistics['monthrank']}" + elif self.ui_counter == 2: + msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" + elif self.ui_counter == 3: + msg = f"Last report:{self.statistics['last']}" + ui.set("wigle", msg)