diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 0e498d94..a4b79970 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -5,6 +5,7 @@ import csv import requests import pwnagotchi import re +from glob import glob from threading import Lock from io import StringIO from datetime import datetime, UTC @@ -22,119 +23,8 @@ from pwnagotchi._version import __version__ as __pwnagotchi_version__ from scapy.all import Scapy_Exception -def _extract_gps_data(path): - """ - Extract data from gps-file - - return json-obj - """ - try: - if path.endswith(".geo.json"): - with open(path, "r") as json_file: - tempJson = json.load(json_file) - d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC) - return { - "Latitude": tempJson["location"]["lat"], - "Longitude": tempJson["location"]["lng"], - "Altitude": 10, - "Accuracy": tempJson["accuracy"], - "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"), - } - with open(path, "r") as json_file: - return json.load(json_file) - except (OSError, json.JSONDecodeError) as exp: - raise exp - -def _transform_wigle_entry(gps_data, pcap_data): - """ - Transform to wigle entry in file - """ - logging.info(f"transform to wigle") - dummy = StringIO() - writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") - writer.writerow( - [ - pcap_data[WifiInfo.BSSID], - pcap_data[WifiInfo.ESSID], - list(pcap_data[WifiInfo.ENCRYPTION]), - datetime.strptime( - gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" - ).strftime("%Y-%m-%d %H:%M:%S"), - pcap_data[WifiInfo.CHANNEL], - pcap_data[WifiInfo.RSSI], - gps_data["Latitude"], - gps_data["Longitude"], - gps_data["Altitude"], - gps_data["Accuracy"], - "WIFI", - ] - ) - return dummy.getvalue() - - -def _generate_csv(lines, plugin_version): - """ - Generates the csv file - """ - dummy = StringIO() - # write kismet header - dummy.write( - f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__}," - f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" - ) - # write header - dummy.write( - "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n" - ) - # write WIFIs - for line in lines: - dummy.write(f"{line}") - dummy.seek(0) - return dummy - -def to_file(filename, content): - try: - with open(f"/tmp/{filename}", mode="w") as f: - f.write(content) - except Exception as exp: - logging.debug(f"WIGLE: {exp}") - pass - -def _send_to_wigle(lines, api_name, api_key, plugin_version, donate=False, timeout=30): - """ - Uploads the file to wigle-net - """ - dummy = _generate_csv(lines, plugin_version) - - date = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"{pwnagotchi.name()}_{date}.csv" - payload = { - "file": ( - filename, - dummy, - "text/csv", - ) - } - to_file(filename, dummy.getvalue()) - try: - res = requests.post( - "https://api.wigle.net/api/v2/file/upload", - headers={"Accept": "application/json"}, - auth=(api_name, api_key), - data={"donate": "on" if donate else "false"}, - files=payload, - timeout=timeout, - ) - json_res = res.json() - logging.info(f"Request result: {json_res}") - if not json_res["success"]: - raise requests.exceptions.RequestException(json_res["message"]) - except requests.exceptions.RequestException as re_e: - raise re_e - - class Wigle(plugins.Plugin): - __author__ = "Dadav and updated by Jayofelony" + __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" @@ -145,38 +35,28 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() - self.api_name = None - self.api_key = None - self.donate = False - self.handshake_dir = None - self.whitelist = None def on_config_changed(self, config): - self.api_name = self.options.get("api_name", None) - self.api_key = self.options.get("api_key", None) - if not (self.api_name and self.api_key): - logging.debug( - "WIGLE: api_name and/or api_key isn't set. Can't upload to wigle.net" - ) + api_name = self.options.get("api_name", None) + api_key = self.options.get("api_key", None) + if not (api_name and api_key): + logging.info("[WIGLE] api_name and api_key must be set.") return + self.auth = (api_name, api_key) self.donate = self.options.get("donate", False) self.handshake_dir = config["bettercap"].get("handshakes", None) + self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", None) + self.timeout = config["main"].get("timeout", 30) self.ready = True - logging.info("WIGLE: ready") + logging.info("[WIGLE] Ready for wardriving!!!") def on_webhook(self, path, request): - response = make_response(redirect("https://www.wigle.net/", code=302)) - return response + return make_response(redirect("https://www.wigle.net/", code=302)) def get_new_gps_files(self, reported): - all_files = os.listdir(self.handshake_dir) - all_gps_files = [ - os.path.join(self.handshake_dir, filename) - for filename in all_files - if filename.endswith(".gps.json") or filename.endswith(".geo.json") - ] - + all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json")) + all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json")) all_gps_files = remove_whitelisted(all_gps_files, self.whitelist) return set(all_gps_files) - set(reported) - set(self.skip) @@ -184,28 +64,47 @@ class Wigle(plugins.Plugin): def get_pcap_filename(gps_file): pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file) if not os.path.exists(pcap_filename): - logging.debug("WIGLE: Can't find pcap for %s", gps_file) + logging.debug("[WIGLE] Can't find pcap for %s", gps_file) return None return pcap_filename @staticmethod - def get_gps_data(gps_file): + def extract_gps_data(path): + """ + Extract data from gps-file + return json-obj + """ try: - gps_data = _extract_gps_data(gps_file) + if path.endswith(".geo.json"): + with open(path, "r") as json_file: + tempJson = json.load(json_file) + d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC) + return { + "Latitude": tempJson["location"]["lat"], + "Longitude": tempJson["location"]["lng"], + "Altitude": 10, + "Accuracy": tempJson["accuracy"], + "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"), + } + with open(path, "r") as json_file: + return json.load(json_file) except (OSError, json.JSONDecodeError) as exp: - logging.debug(f"WIGLE: {exp}") + raise exp + + def get_gps_data(self, gps_file): + try: + gps_data = self.extract_gps_data(gps_file) + except (OSError, json.JSONDecodeError) as exp: + logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}") return None if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0: - logging.debug( - f"WIGLE: Not enough gps-information for {gps_file}. Trying again next time." - ) + logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.") return None return gps_data @staticmethod def get_pcap_data(pcap_filename): try: - logging.info(f"Extracting PCAP for {pcap_filename}") pcap_data = extract_from_pcap( pcap_filename, [ @@ -213,67 +112,113 @@ class Wigle(plugins.Plugin): WifiInfo.ESSID, WifiInfo.ENCRYPTION, WifiInfo.CHANNEL, + WifiInfo.FREQUENCY, WifiInfo.RSSI, ], ) - logging.info(f"PCAP DATA for {pcap_data}") - logging.info(f"Extracting PCAP for {pcap_filename} DONE: {pcap_data}") + logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}") except FieldNotFoundError: - logging.debug( - f"WIGLE: Could not extract all information. Skip {pcap_filename}" - ) + logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)") return None except Scapy_Exception as sc_e: - logging.debug(f"WIGLE: {sc_e}") + logging.debug(f"[WIGLE] {sc_e}") return None return pcap_data - def upload(self, reported, csv_entries, no_err_entries): - try: - logging.info("Uploading to Wigle") - _send_to_wigle( - csv_entries, self.api_name, self.api_key, self.__version__, donate=self.donate + def generate_csv(self, data): + date = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{pwnagotchi.name()}_{date}.csv" + + content = StringIO() + # write kismet header + header + content.write( + f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__}," + f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" + f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n" + ) + writer = csv.writer( + content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\" + ) + for gps_data, pcap_data in data: # write WIFIs + writer.writerow( + [ + pcap_data[WifiInfo.BSSID], + pcap_data[WifiInfo.ESSID], + f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]", + datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S"), + pcap_data[WifiInfo.CHANNEL], + pcap_data[WifiInfo.FREQUENCY], + pcap_data[WifiInfo.RSSI], + gps_data["Latitude"], + gps_data["Longitude"], + gps_data["Altitude"], + gps_data["Accuracy"], + "", # RCOIs to populate + "", # MfgrId always empty + "WIFI", + ] ) + content.seek(0) + return filename, content + + def save_to_file(self, cvs_filename, cvs_content): + if not self.cvs_dir: + return + filename = os.path.join(self.cvs_dir, cvs_filename) + logging.info(f"[WIGLE] Saving to file {filename}") + try: + with open(filename, mode="w") as f: + f.write(cvs_content.getvalue()) + except Exception as exp: + logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}") + + def upload_to_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): + try: + json_res = requests.post( + "https://api.wigle.net/api/v2/file/upload", + headers={"Accept": "application/json"}, + auth=self.auth, + data={"donate": "on" if self.donate else "false"}, + files=dict(file=(cvs_filename, cvs_content, "text/csv")), + timeout=self.timeout, + ).json() + if not json_res["success"]: + raise requests.exceptions.RequestException(json_res["message"]) reported += no_err_entries self.report.update(data={"reported": reported}) - logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) - except requests.exceptions.RequestException as re_e: + logging.info("[WIGLE] Successfully uploaded %d wifis", len(no_err_entries)) + except (requests.exceptions.RequestException, OSError) as exp: self.skip += no_err_entries - logging.debug("WIGLE: Got an exception while uploading %s", re_e) - except OSError as os_e: - self.skip += no_err_entries - logging.debug("WIGLE: Got the following error: %s", os_e) + logging.debug(f"[WIGLE] Exception while uploading: {exp}") def on_internet_available(self, agent): - """ - Called when there's internet connectivity - """ if not self.ready: return with self.lock: reported = self.report.data_field_or("reported", default=list()) if new_gps_files := self.get_new_gps_files(reported): - logging.info( - "WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net" - ) - csv_entries = list() - no_err_entries = list() + logging.info("[WIGLE] Uploading new handshakes to wigle.net") + csv_entries, no_err_entries = list(), list() for gps_file in new_gps_files: - logging.info(f"WIGLE: handeling {gps_file}") - if not (pcap_filename := self.get_pcap_filename(gps_file)): + logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}") + if ( + (pcap_filename := self.get_pcap_filename(gps_file)) + and (gps_data := self.get_gps_data(gps_file)) + and (pcap_data := self.get_pcap_data(pcap_filename)) + ): + csv_entries.append((gps_data, pcap_data)) + no_err_entries.append(gps_file) + else: self.skip.append(gps_file) - continue - if not (gps_data := self.get_gps_data(gps_file)): - self.skip.append(gps_file) - continue - if not (pcap_data := self.get_pcap_data(pcap_filename)): - self.skip.append(gps_file) - continue - new_entry = _transform_wigle_entry(gps_data, pcap_data) - csv_entries.append(new_entry) - no_err_entries.append(gps_file) + logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") if csv_entries: + cvs_filename, cvs_content = self.generate_csv(csv_entries) + self.save_to_file(cvs_filename, cvs_content) display = agent.view() display.on_uploading("wigle.net") - self.upload(reported, csv_entries, no_err_entries) + self.upload_to_wigle( + reported, cvs_filename, cvs_content, no_err_entries + ) display.on_normal()