Wigle.py update

- Correct the CSV format 
- Add 2 missing columns: frequency(populated) and RCOI(not populated)
- Before sending to wigle, if cvs_dir is set, the CVS data ti written to that directory
- Update auth method to match the api: api_name + api_key
- Refactoring into only one plugin class


Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
This commit is contained in:
Frédéric
2025-02-09 01:50:57 +01:00
committed by GitHub
parent d4874a18e1
commit 0b3b38bb44

View File

@ -5,6 +5,7 @@ import csv
import requests import requests
import pwnagotchi import pwnagotchi
import re import re
from glob import glob
from threading import Lock from threading import Lock
from io import StringIO from io import StringIO
from datetime import datetime, UTC from datetime import datetime, UTC
@ -22,10 +23,55 @@ from pwnagotchi._version import __version__ as __pwnagotchi_version__
from scapy.all import Scapy_Exception from scapy.all import Scapy_Exception
def _extract_gps_data(path): class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony and fmatray"
__version__ = "4.0.0"
__license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
def __init__(self):
self.ready = False
self.report = StatusFile("/root/.wigle_uploads", data_format="json")
self.skip = list()
self.lock = Lock()
self.options = dict()
def on_config_changed(self, config):
api_name = self.options.get("api_name", None)
api_key = self.options.get("api_key", None)
if not (api_name and api_key):
logging.info("[WIGLE] api_name and api_key must be set.")
return
self.auth = (api_name, api_key)
self.donate = self.options.get("donate", False)
self.handshake_dir = config["bettercap"].get("handshakes", None)
self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", None)
self.timeout = config["main"].get("timeout", 30)
self.ready = True
logging.info("[WIGLE] Ready for wardriving!!!")
def on_webhook(self, path, request):
return make_response(redirect("https://www.wigle.net/", code=302))
def get_new_gps_files(self, reported):
all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
return set(all_gps_files) - set(reported) - set(self.skip)
@staticmethod
def get_pcap_filename(gps_file):
pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
if not os.path.exists(pcap_filename):
logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
return None
return pcap_filename
@staticmethod
def extract_gps_data(path):
""" """
Extract data from gps-file Extract data from gps-file
return json-obj return json-obj
""" """
try: try:
@ -45,167 +91,20 @@ def _extract_gps_data(path):
except (OSError, json.JSONDecodeError) as exp: except (OSError, json.JSONDecodeError) as exp:
raise exp raise exp
def _transform_wigle_entry(gps_data, pcap_data): def get_gps_data(self, gps_file):
"""
Transform to wigle entry in file
"""
logging.info(f"transform to wigle")
dummy = StringIO()
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
writer.writerow(
[
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
list(pcap_data[WifiInfo.ENCRYPTION]),
datetime.strptime(
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S"),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
gps_data["Latitude"],
gps_data["Longitude"],
gps_data["Altitude"],
gps_data["Accuracy"],
"WIFI",
]
)
return dummy.getvalue()
def _generate_csv(lines, plugin_version):
"""
Generates the csv file
"""
dummy = StringIO()
# write kismet header
dummy.write(
f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
)
# write header
dummy.write(
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n"
)
# write WIFIs
for line in lines:
dummy.write(f"{line}")
dummy.seek(0)
return dummy
def to_file(filename, content):
try: try:
with open(f"/tmp/{filename}", mode="w") as f: gps_data = self.extract_gps_data(gps_file)
f.write(content)
except Exception as exp:
logging.debug(f"WIGLE: {exp}")
pass
def _send_to_wigle(lines, api_name, api_key, plugin_version, donate=False, timeout=30):
"""
Uploads the file to wigle-net
"""
dummy = _generate_csv(lines, plugin_version)
date = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{pwnagotchi.name()}_{date}.csv"
payload = {
"file": (
filename,
dummy,
"text/csv",
)
}
to_file(filename, dummy.getvalue())
try:
res = requests.post(
"https://api.wigle.net/api/v2/file/upload",
headers={"Accept": "application/json"},
auth=(api_name, api_key),
data={"donate": "on" if donate else "false"},
files=payload,
timeout=timeout,
)
json_res = res.json()
logging.info(f"Request result: {json_res}")
if not json_res["success"]:
raise requests.exceptions.RequestException(json_res["message"])
except requests.exceptions.RequestException as re_e:
raise re_e
class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony"
__version__ = "4.0.0"
__license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
def __init__(self):
self.ready = False
self.report = StatusFile("/root/.wigle_uploads", data_format="json")
self.skip = list()
self.lock = Lock()
self.options = dict()
self.api_name = None
self.api_key = None
self.donate = False
self.handshake_dir = None
self.whitelist = None
def on_config_changed(self, config):
self.api_name = self.options.get("api_name", None)
self.api_key = self.options.get("api_key", None)
if not (self.api_name and self.api_key):
logging.debug(
"WIGLE: api_name and/or api_key isn't set. Can't upload to wigle.net"
)
return
self.donate = self.options.get("donate", False)
self.handshake_dir = config["bettercap"].get("handshakes", None)
self.whitelist = config["main"].get("whitelist", None)
self.ready = True
logging.info("WIGLE: ready")
def on_webhook(self, path, request):
response = make_response(redirect("https://www.wigle.net/", code=302))
return response
def get_new_gps_files(self, reported):
all_files = os.listdir(self.handshake_dir)
all_gps_files = [
os.path.join(self.handshake_dir, filename)
for filename in all_files
if filename.endswith(".gps.json") or filename.endswith(".geo.json")
]
all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
return set(all_gps_files) - set(reported) - set(self.skip)
@staticmethod
def get_pcap_filename(gps_file):
pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
if not os.path.exists(pcap_filename):
logging.debug("WIGLE: Can't find pcap for %s", gps_file)
return None
return pcap_filename
@staticmethod
def get_gps_data(gps_file):
try:
gps_data = _extract_gps_data(gps_file)
except (OSError, json.JSONDecodeError) as exp: except (OSError, json.JSONDecodeError) as exp:
logging.debug(f"WIGLE: {exp}") logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
return None return None
if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0: if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
logging.debug( logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
f"WIGLE: Not enough gps-information for {gps_file}. Trying again next time."
)
return None return None
return gps_data return gps_data
@staticmethod @staticmethod
def get_pcap_data(pcap_filename): def get_pcap_data(pcap_filename):
try: try:
logging.info(f"Extracting PCAP for {pcap_filename}")
pcap_data = extract_from_pcap( pcap_data = extract_from_pcap(
pcap_filename, pcap_filename,
[ [
@ -213,67 +112,113 @@ class Wigle(plugins.Plugin):
WifiInfo.ESSID, WifiInfo.ESSID,
WifiInfo.ENCRYPTION, WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL, WifiInfo.CHANNEL,
WifiInfo.FREQUENCY,
WifiInfo.RSSI, WifiInfo.RSSI,
], ],
) )
logging.info(f"PCAP DATA for {pcap_data}") logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
logging.info(f"Extracting PCAP for {pcap_filename} DONE: {pcap_data}")
except FieldNotFoundError: except FieldNotFoundError:
logging.debug( logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
f"WIGLE: Could not extract all information. Skip {pcap_filename}"
)
return None return None
except Scapy_Exception as sc_e: except Scapy_Exception as sc_e:
logging.debug(f"WIGLE: {sc_e}") logging.debug(f"[WIGLE] {sc_e}")
return None return None
return pcap_data return pcap_data
def upload(self, reported, csv_entries, no_err_entries): def generate_csv(self, data):
try: date = datetime.now().strftime("%Y%m%d_%H%M%S")
logging.info("Uploading to Wigle") filename = f"{pwnagotchi.name()}_{date}.csv"
_send_to_wigle(
csv_entries, self.api_name, self.api_key, self.__version__, donate=self.donate content = StringIO()
# write kismet header + header
content.write(
f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
) )
writer = csv.writer(
content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\"
)
for gps_data, pcap_data in data: # write WIFIs
writer.writerow(
[
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
datetime.strptime(
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S"),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.FREQUENCY],
pcap_data[WifiInfo.RSSI],
gps_data["Latitude"],
gps_data["Longitude"],
gps_data["Altitude"],
gps_data["Accuracy"],
"", # RCOIs to populate
"", # MfgrId always empty
"WIFI",
]
)
content.seek(0)
return filename, content
def save_to_file(self, cvs_filename, cvs_content):
if not self.cvs_dir:
return
filename = os.path.join(self.cvs_dir, cvs_filename)
logging.info(f"[WIGLE] Saving to file {filename}")
try:
with open(filename, mode="w") as f:
f.write(cvs_content.getvalue())
except Exception as exp:
logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
def upload_to_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
try:
json_res = requests.post(
"https://api.wigle.net/api/v2/file/upload",
headers={"Accept": "application/json"},
auth=self.auth,
data={"donate": "on" if self.donate else "false"},
files=dict(file=(cvs_filename, cvs_content, "text/csv")),
timeout=self.timeout,
).json()
if not json_res["success"]:
raise requests.exceptions.RequestException(json_res["message"])
reported += no_err_entries reported += no_err_entries
self.report.update(data={"reported": reported}) self.report.update(data={"reported": reported})
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) logging.info("[WIGLE] Successfully uploaded %d wifis", len(no_err_entries))
except requests.exceptions.RequestException as re_e: except (requests.exceptions.RequestException, OSError) as exp:
self.skip += no_err_entries self.skip += no_err_entries
logging.debug("WIGLE: Got an exception while uploading %s", re_e) logging.debug(f"[WIGLE] Exception while uploading: {exp}")
except OSError as os_e:
self.skip += no_err_entries
logging.debug("WIGLE: Got the following error: %s", os_e)
def on_internet_available(self, agent): def on_internet_available(self, agent):
"""
Called when there's internet connectivity
"""
if not self.ready: if not self.ready:
return return
with self.lock: with self.lock:
reported = self.report.data_field_or("reported", default=list()) reported = self.report.data_field_or("reported", default=list())
if new_gps_files := self.get_new_gps_files(reported): if new_gps_files := self.get_new_gps_files(reported):
logging.info( logging.info("[WIGLE] Uploading new handshakes to wigle.net")
"WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net" csv_entries, no_err_entries = list(), list()
)
csv_entries = list()
no_err_entries = list()
for gps_file in new_gps_files: for gps_file in new_gps_files:
logging.info(f"WIGLE: handeling {gps_file}") logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
if not (pcap_filename := self.get_pcap_filename(gps_file)): if (
self.skip.append(gps_file) (pcap_filename := self.get_pcap_filename(gps_file))
continue and (gps_data := self.get_gps_data(gps_file))
if not (gps_data := self.get_gps_data(gps_file)): and (pcap_data := self.get_pcap_data(pcap_filename))
self.skip.append(gps_file) ):
continue csv_entries.append((gps_data, pcap_data))
if not (pcap_data := self.get_pcap_data(pcap_filename)):
self.skip.append(gps_file)
continue
new_entry = _transform_wigle_entry(gps_data, pcap_data)
csv_entries.append(new_entry)
no_err_entries.append(gps_file) no_err_entries.append(gps_file)
else:
self.skip.append(gps_file)
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
if csv_entries: if csv_entries:
cvs_filename, cvs_content = self.generate_csv(csv_entries)
self.save_to_file(cvs_filename, cvs_content)
display = agent.view() display = agent.view()
display.on_uploading("wigle.net") display.on_uploading("wigle.net")
self.upload(reported, csv_entries, no_err_entries) self.upload_to_wigle(
reported, cvs_filename, cvs_content, no_err_entries
)
display.on_normal() display.on_normal()