mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
wigle.py update
- CSV format debug - Update auth method to API Name+key - Add logs - Code refactor and cleaning stil debugging Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
This commit is contained in:
@ -4,13 +4,22 @@ import json
|
||||
import csv
|
||||
import requests
|
||||
import pwnagotchi
|
||||
|
||||
from io import StringIO
|
||||
from datetime import datetime
|
||||
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
|
||||
import re
|
||||
from threading import Lock
|
||||
from io import StringIO
|
||||
from datetime import datetime, UTC
|
||||
|
||||
from flask import make_response, redirect
|
||||
from pwnagotchi.utils import (
|
||||
WifiInfo,
|
||||
FieldNotFoundError,
|
||||
extract_from_pcap,
|
||||
StatusFile,
|
||||
remove_whitelisted,
|
||||
)
|
||||
from pwnagotchi import plugins
|
||||
from pwnagotchi._version import __version__ as __pwnagotchi_version__
|
||||
from scapy.all import Scapy_Exception
|
||||
|
||||
|
||||
def _extract_gps_data(path):
|
||||
@ -19,197 +28,252 @@ def _extract_gps_data(path):
|
||||
|
||||
return json-obj
|
||||
"""
|
||||
|
||||
try:
|
||||
if path.endswith('.geo.json'):
|
||||
with open(path, 'r') as json_file:
|
||||
if path.endswith(".geo.json"):
|
||||
with open(path, "r") as json_file:
|
||||
tempJson = json.load(json_file)
|
||||
d = datetime.utcfromtimestamp(int(tempJson["ts"]))
|
||||
return {"Latitude": tempJson["location"]["lat"],
|
||||
"Longitude": tempJson["location"]["lng"],
|
||||
"Altitude": 10,
|
||||
"Accuracy": tempJson["accuracy"],
|
||||
"Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
|
||||
else:
|
||||
with open(path, 'r') as json_file:
|
||||
return json.load(json_file)
|
||||
except OSError as os_err:
|
||||
raise os_err
|
||||
except json.JSONDecodeError as json_err:
|
||||
raise json_err
|
||||
d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
|
||||
return {
|
||||
"Latitude": tempJson["location"]["lat"],
|
||||
"Longitude": tempJson["location"]["lng"],
|
||||
"Altitude": 10,
|
||||
"Accuracy": tempJson["accuracy"],
|
||||
"Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
|
||||
}
|
||||
with open(path, "r") as json_file:
|
||||
return json.load(json_file)
|
||||
except (OSError, json.JSONDecodeError) as exp:
|
||||
raise exp
|
||||
|
||||
|
||||
def _format_auth(data):
|
||||
out = ""
|
||||
for auth in data:
|
||||
out = f"{out}[{auth}]"
|
||||
return [f"{auth}" for auth in data]
|
||||
|
||||
|
||||
def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
|
||||
def _transform_wigle_entry(gps_data, pcap_data):
|
||||
"""
|
||||
Transform to wigle entry in file
|
||||
"""
|
||||
logging.info(f"transform to wigle")
|
||||
dummy = StringIO()
|
||||
# write kismet header
|
||||
dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
|
||||
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
|
||||
dummy.write(
|
||||
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
|
||||
|
||||
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
|
||||
writer.writerow([
|
||||
pcap_data[WifiInfo.BSSID],
|
||||
pcap_data[WifiInfo.ESSID],
|
||||
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
|
||||
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
|
||||
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
|
||||
pcap_data[WifiInfo.CHANNEL],
|
||||
pcap_data[WifiInfo.RSSI],
|
||||
gps_data['Latitude'],
|
||||
gps_data['Longitude'],
|
||||
gps_data['Altitude'],
|
||||
gps_data['Accuracy'],
|
||||
'WIFI'])
|
||||
writer.writerow(
|
||||
[
|
||||
pcap_data[WifiInfo.BSSID],
|
||||
pcap_data[WifiInfo.ESSID],
|
||||
list(pcap_data[WifiInfo.ENCRYPTION]),
|
||||
datetime.strptime(
|
||||
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
|
||||
).strftime("%Y-%m-%d %H:%M:%S"),
|
||||
pcap_data[WifiInfo.CHANNEL],
|
||||
pcap_data[WifiInfo.RSSI],
|
||||
gps_data["Latitude"],
|
||||
gps_data["Longitude"],
|
||||
gps_data["Altitude"],
|
||||
gps_data["Accuracy"],
|
||||
"WIFI",
|
||||
]
|
||||
)
|
||||
return dummy.getvalue()
|
||||
|
||||
|
||||
def _send_to_wigle(lines, api_key, donate=True, timeout=30):
|
||||
def _generate_csv(lines, plugin_version):
|
||||
"""
|
||||
Generates the csv file
|
||||
"""
|
||||
dummy = StringIO()
|
||||
# write kismet header
|
||||
dummy.write(
|
||||
f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
|
||||
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
|
||||
)
|
||||
# write header
|
||||
dummy.write(
|
||||
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n"
|
||||
)
|
||||
# write WIFIs
|
||||
for line in lines:
|
||||
dummy.write(f"{line}")
|
||||
dummy.seek(0)
|
||||
return dummy
|
||||
|
||||
def to_file(filename, content):
|
||||
try:
|
||||
with open(f"/tmp/{filename}", mode="w") as f:
|
||||
f.write(content)
|
||||
except Exception as exp:
|
||||
logging.debug(f"WIGLE: {exp}")
|
||||
pass
|
||||
|
||||
def _send_to_wigle(lines, api_name, api_key, plugin_version, donate=False, timeout=30):
|
||||
"""
|
||||
Uploads the file to wigle-net
|
||||
"""
|
||||
dummy = _generate_csv(lines, plugin_version)
|
||||
|
||||
dummy = StringIO()
|
||||
|
||||
for line in lines:
|
||||
dummy.write(f"{line}")
|
||||
|
||||
dummy.seek(0)
|
||||
|
||||
headers = {"Authorization": f"Basic {api_key}",
|
||||
"Accept": "application/json",
|
||||
"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
|
||||
data = {"donate": "on" if donate else "false"}
|
||||
payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
|
||||
date = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{pwnagotchi.name()}_{date}.csv"
|
||||
payload = {
|
||||
"file": (
|
||||
filename,
|
||||
dummy,
|
||||
"text/csv",
|
||||
)
|
||||
}
|
||||
to_file(filename, dummy.getvalue())
|
||||
try:
|
||||
res = requests.post('https://api.wigle.net/api/v2/file/upload',
|
||||
data=data,
|
||||
headers=headers,
|
||||
files=payload,
|
||||
timeout=timeout)
|
||||
res = requests.post(
|
||||
"https://api.wigle.net/api/v2/file/upload",
|
||||
headers={"Accept": "application/json"},
|
||||
auth=(api_name, api_key),
|
||||
data={"donate": "on" if donate else "false"},
|
||||
files=payload,
|
||||
timeout=timeout,
|
||||
)
|
||||
json_res = res.json()
|
||||
if not json_res['success']:
|
||||
raise requests.exceptions.RequestException(json_res['message'])
|
||||
logging.info(f"Request result: {json_res}")
|
||||
if not json_res["success"]:
|
||||
raise requests.exceptions.RequestException(json_res["message"])
|
||||
except requests.exceptions.RequestException as re_e:
|
||||
raise re_e
|
||||
|
||||
|
||||
class Wigle(plugins.Plugin):
|
||||
__author__ = "Dadav and updated by Jayofelony"
|
||||
__version__ = "3.1.0"
|
||||
__version__ = "4.0.0"
|
||||
__license__ = "GPL3"
|
||||
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
|
||||
|
||||
def __init__(self):
|
||||
self.ready = False
|
||||
self.report = StatusFile('/root/.wigle_uploads', data_format='json')
|
||||
self.report = StatusFile("/root/.wigle_uploads", data_format="json")
|
||||
self.skip = list()
|
||||
self.lock = Lock()
|
||||
self.options = dict()
|
||||
self.api_name = None
|
||||
self.api_key = None
|
||||
self.donate = False
|
||||
self.handshake_dir = None
|
||||
self.whitelist = None
|
||||
|
||||
def on_loaded(self):
|
||||
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
|
||||
logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
|
||||
def on_config_changed(self, config):
|
||||
self.api_name = self.options.get("api_name", None)
|
||||
self.api_key = self.options.get("api_key", None)
|
||||
if not (self.api_name and self.api_key):
|
||||
logging.debug(
|
||||
"WIGLE: api_name and/or api_key isn't set. Can't upload to wigle.net"
|
||||
)
|
||||
return
|
||||
|
||||
if 'donate' not in self.options:
|
||||
self.options['donate'] = False
|
||||
|
||||
self.donate = self.options.get("donate", False)
|
||||
self.handshake_dir = config["bettercap"].get("handshakes", None)
|
||||
self.whitelist = config["main"].get("whitelist", None)
|
||||
self.ready = True
|
||||
logging.info("WIGLE: ready")
|
||||
|
||||
|
||||
def on_webhook(self, path, request):
|
||||
from flask import make_response, redirect
|
||||
response = make_response(redirect("https://www.wigle.net/", code=302))
|
||||
return response
|
||||
|
||||
def get_new_gps_files(self, reported):
|
||||
all_files = os.listdir(self.handshake_dir)
|
||||
all_gps_files = [
|
||||
os.path.join(self.handshake_dir, filename)
|
||||
for filename in all_files
|
||||
if filename.endswith(".gps.json") or filename.endswith(".geo.json")
|
||||
]
|
||||
|
||||
all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
|
||||
return set(all_gps_files) - set(reported) - set(self.skip)
|
||||
|
||||
@staticmethod
|
||||
def get_pcap_filename(gps_file):
|
||||
pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
|
||||
if not os.path.exists(pcap_filename):
|
||||
logging.debug("WIGLE: Can't find pcap for %s", gps_file)
|
||||
return None
|
||||
return pcap_filename
|
||||
|
||||
@staticmethod
|
||||
def get_gps_data(gps_file):
|
||||
try:
|
||||
gps_data = _extract_gps_data(gps_file)
|
||||
except (OSError, json.JSONDecodeError) as exp:
|
||||
logging.debug(f"WIGLE: {exp}")
|
||||
return None
|
||||
if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
|
||||
logging.debug(
|
||||
f"WIGLE: Not enough gps-information for {gps_file}. Trying again next time."
|
||||
)
|
||||
return None
|
||||
return gps_data
|
||||
|
||||
@staticmethod
|
||||
def get_pcap_data(pcap_filename):
|
||||
try:
|
||||
logging.info(f"Extracting PCAP for {pcap_filename}")
|
||||
pcap_data = extract_from_pcap(
|
||||
pcap_filename,
|
||||
[
|
||||
WifiInfo.BSSID,
|
||||
WifiInfo.ESSID,
|
||||
WifiInfo.ENCRYPTION,
|
||||
WifiInfo.CHANNEL,
|
||||
WifiInfo.RSSI,
|
||||
],
|
||||
)
|
||||
logging.info(f"PCAP DATA for {pcap_data}")
|
||||
logging.info(f"Extracting PCAP for {pcap_filename} DONE: {pcap_data}")
|
||||
except FieldNotFoundError:
|
||||
logging.debug(
|
||||
f"WIGLE: Could not extract all information. Skip {pcap_filename}"
|
||||
)
|
||||
return None
|
||||
except Scapy_Exception as sc_e:
|
||||
logging.debug(f"WIGLE: {sc_e}")
|
||||
return None
|
||||
return pcap_data
|
||||
|
||||
def upload(self, reported, csv_entries, no_err_entries):
|
||||
try:
|
||||
logging.info("Uploading to Wigle")
|
||||
_send_to_wigle(
|
||||
csv_entries, self.api_name, self.api_key, self.__version__, donate=self.donate
|
||||
)
|
||||
reported += no_err_entries
|
||||
self.report.update(data={"reported": reported})
|
||||
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
|
||||
except requests.exceptions.RequestException as re_e:
|
||||
self.skip += no_err_entries
|
||||
logging.debug("WIGLE: Got an exception while uploading %s", re_e)
|
||||
except OSError as os_e:
|
||||
self.skip += no_err_entries
|
||||
logging.debug("WIGLE: Got the following error: %s", os_e)
|
||||
|
||||
def on_internet_available(self, agent):
|
||||
"""
|
||||
Called when there's internet connectivity
|
||||
"""
|
||||
if not self.ready or self.lock.locked():
|
||||
if not self.ready:
|
||||
return
|
||||
|
||||
from scapy.all import Scapy_Exception
|
||||
|
||||
config = agent.config()
|
||||
display = agent.view()
|
||||
reported = self.report.data_field_or('reported', default=list())
|
||||
handshake_dir = config['bettercap']['handshakes']
|
||||
all_files = os.listdir(handshake_dir)
|
||||
all_gps_files = [os.path.join(handshake_dir, filename)
|
||||
for filename in all_files
|
||||
if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
|
||||
|
||||
all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
|
||||
new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
|
||||
if new_gps_files:
|
||||
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
|
||||
csv_entries = list()
|
||||
no_err_entries = list()
|
||||
for gps_file in new_gps_files:
|
||||
if gps_file.endswith('.gps.json'):
|
||||
pcap_filename = gps_file.replace('.gps.json', '.pcap')
|
||||
if gps_file.endswith('.geo.json'):
|
||||
pcap_filename = gps_file.replace('.geo.json', '.pcap')
|
||||
if not os.path.exists(pcap_filename):
|
||||
logging.debug("WIGLE: Can't find pcap for %s", gps_file)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
try:
|
||||
gps_data = _extract_gps_data(gps_file)
|
||||
except OSError as os_err:
|
||||
logging.debug("WIGLE: %s", os_err)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
except json.JSONDecodeError as json_err:
|
||||
logging.debug("WIGLE: %s", json_err)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
|
||||
logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
try:
|
||||
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
|
||||
WifiInfo.ESSID,
|
||||
WifiInfo.ENCRYPTION,
|
||||
WifiInfo.CHANNEL,
|
||||
WifiInfo.RSSI])
|
||||
except FieldNotFoundError:
|
||||
logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
except Scapy_Exception as sc_e:
|
||||
logging.debug("WIGLE: %s", sc_e)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
|
||||
csv_entries.append(new_entry)
|
||||
no_err_entries.append(gps_file)
|
||||
if csv_entries:
|
||||
display.on_uploading('wigle.net')
|
||||
|
||||
try:
|
||||
_send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
|
||||
reported += no_err_entries
|
||||
self.report.update(data={'reported': reported})
|
||||
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
|
||||
except requests.exceptions.RequestException as re_e:
|
||||
self.skip += no_err_entries
|
||||
logging.debug("WIGLE: Got an exception while uploading %s", re_e)
|
||||
except OSError as os_e:
|
||||
self.skip += no_err_entries
|
||||
logging.debug("WIGLE: Got the following error: %s", os_e)
|
||||
|
||||
display.on_normal()
|
||||
with self.lock:
|
||||
reported = self.report.data_field_or("reported", default=list())
|
||||
if new_gps_files := self.get_new_gps_files(reported):
|
||||
logging.info(
|
||||
"WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net"
|
||||
)
|
||||
csv_entries = list()
|
||||
no_err_entries = list()
|
||||
for gps_file in new_gps_files:
|
||||
logging.info(f"WIGLE: handeling {gps_file}")
|
||||
if not (pcap_filename := self.get_pcap_filename(gps_file)):
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
if not (gps_data := self.get_gps_data(gps_file)):
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
if not (pcap_data := self.get_pcap_data(pcap_filename)):
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
new_entry = _transform_wigle_entry(gps_data, pcap_data)
|
||||
csv_entries.append(new_entry)
|
||||
no_err_entries.append(gps_file)
|
||||
if csv_entries:
|
||||
display = agent.view()
|
||||
display.on_uploading("wigle.net")
|
||||
self.upload(reported, csv_entries, no_err_entries)
|
||||
display.on_normal()
|
||||
|
Reference in New Issue
Block a user