Files
pwnagotchi/pwnagotchi/plugins/default/wigle.py

210 lines
8.1 KiB
Python
Raw Normal View History

2019-10-05 15:39:14 +02:00
import os
import logging
import json
import csv
import requests
2024-01-22 12:06:20 +01:00
import pwnagotchi
2020-04-03 19:01:40 +02:00
from io import StringIO
from datetime import datetime
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
2020-04-03 19:01:40 +02:00
from threading import Lock
from pwnagotchi import plugins
from pwnagotchi._version import __version__ as __pwnagotchi_version__
2019-10-05 15:39:14 +02:00
def _extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try:
if path.endswith('.geo.json'):
with open(path, 'r') as json_file:
tempJson = json.load(json_file)
d = datetime.utcfromtimestamp(int(tempJson["ts"]))
2024-01-22 12:06:20 +01:00
return {"Latitude": tempJson["location"]["lat"],
"Longitude": tempJson["location"]["lng"],
"Altitude": 10,
"Accuracy": tempJson["accuracy"],
"Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
else:
with open(path, 'r') as json_file:
return json.load(json_file)
2019-10-05 15:39:14 +02:00
except OSError as os_err:
2019-10-06 22:30:43 +02:00
raise os_err
2019-10-05 15:39:14 +02:00
except json.JSONDecodeError as json_err:
2019-10-06 22:30:43 +02:00
raise json_err
2019-10-05 15:39:14 +02:00
def _format_auth(data):
out = ""
for auth in data:
out = f"{out}[{auth}]"
2024-01-22 12:06:20 +01:00
return [f"{auth}" for auth in data]
2019-10-05 15:39:14 +02:00
def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
2019-10-05 15:39:14 +02:00
"""
Transform to wigle entry in file
"""
dummy = StringIO()
# write kismet header
2024-01-28 21:46:50 +01:00
dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
dummy.write(
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
2019-10-05 15:39:14 +02:00
2019-10-23 23:27:25 +02:00
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
2019-10-05 15:39:14 +02:00
writer.writerow([
2019-10-07 19:59:28 +02:00
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
2019-10-05 15:39:14 +02:00
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
2019-10-07 19:59:28 +02:00
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
2019-10-05 15:39:14 +02:00
gps_data['Latitude'],
gps_data['Longitude'],
gps_data['Altitude'],
2024-01-22 12:06:20 +01:00
gps_data['Accuracy'],
2019-10-05 15:39:14 +02:00
'WIFI'])
return dummy.getvalue()
def _send_to_wigle(lines, api_key, donate=True, timeout=30):
2019-10-05 15:39:14 +02:00
"""
Uploads the file to wigle-net
"""
dummy = StringIO()
for line in lines:
dummy.write(f"{line}")
dummy.seek(0)
headers = {'Authorization': f"Basic {api_key}",
'Accept': 'application/json'}
data = {'donate': 'on' if donate else 'false'}
2024-01-28 21:46:50 +01:00
payload = {'file': (pwnagotchi.name() + ".csv", dummy, 'multipart/form-data', {'Expires': '0'})}
2019-10-05 15:39:14 +02:00
try:
res = requests.post('https://api.wigle.net/api/v2/file/upload',
data=data,
headers=headers,
files=payload,
timeout=timeout)
json_res = res.json()
if not json_res['success']:
raise requests.exceptions.RequestException(json_res['message'])
except requests.exceptions.RequestException as re_e:
raise re_e
class Wigle(plugins.Plugin):
2024-01-28 21:46:50 +01:00
__author__ = "Dadav and updated by Jayofelony"
__version__ = "3.0.1"
__license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
def __init__(self):
self.ready = False
self.report = StatusFile('/root/.wigle_uploads', data_format='json')
self.skip = list()
2020-04-03 19:01:40 +02:00
self.lock = Lock()
2024-01-22 12:06:20 +01:00
self.options = dict()
def on_loaded(self):
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
return
2020-04-03 19:01:40 +02:00
2024-01-28 21:46:50 +01:00
if 'donate' not in self.options:
self.options['donate'] = False
self.ready = True
logging.info("WIGLE: ready")
def on_internet_available(self, agent):
"""
2024-01-28 21:46:50 +01:00
Called when there's internet connectivity
"""
2020-04-03 19:01:40 +02:00
if not self.ready or self.lock.locked():
return
from scapy.all import Scapy_Exception
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_gps_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
2020-04-03 19:01:40 +02:00
all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
2020-04-03 19:01:40 +02:00
new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
if new_gps_files:
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
csv_entries = list()
no_err_entries = list()
for gps_file in new_gps_files:
if gps_file.endswith('.gps.json'):
pcap_filename = gps_file.replace('.gps.json', '.pcap')
if gps_file.endswith('.geo.json'):
pcap_filename = gps_file.replace('.geo.json', '.pcap')
2020-04-03 19:01:40 +02:00
if not os.path.exists(pcap_filename):
logging.debug("WIGLE: Can't find pcap for %s", gps_file)
2020-04-03 19:01:40 +02:00
self.skip.append(gps_file)
continue
try:
gps_data = _extract_gps_data(gps_file)
except OSError as os_err:
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: %s", os_err)
2020-04-03 19:01:40 +02:00
self.skip.append(gps_file)
continue
except json.JSONDecodeError as json_err:
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: %s", json_err)
2020-04-03 19:01:40 +02:00
self.skip.append(gps_file)
continue
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
2020-04-03 19:01:40 +02:00
self.skip.append(gps_file)
continue
try:
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
2020-04-03 19:01:40 +02:00
WifiInfo.ESSID,
WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL,
WifiInfo.RSSI])
except FieldNotFoundError:
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
2020-04-03 19:01:40 +02:00
self.skip.append(gps_file)
continue
except Scapy_Exception as sc_e:
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: %s", sc_e)
2020-04-03 19:01:40 +02:00
self.skip.append(gps_file)
continue
new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
2020-04-03 19:01:40 +02:00
csv_entries.append(new_entry)
no_err_entries.append(gps_file)
if csv_entries:
display.on_uploading('wigle.net')
2020-04-03 19:01:40 +02:00
try:
_send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
2020-04-03 19:01:40 +02:00
reported += no_err_entries
self.report.update(data={'reported': reported})
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
except requests.exceptions.RequestException as re_e:
self.skip += no_err_entries
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: Got an exception while uploading %s", re_e)
2020-04-03 19:01:40 +02:00
except OSError as os_e:
self.skip += no_err_entries
2020-04-18 17:09:39 +02:00
logging.debug("WIGLE: Got the following error: %s", os_e)
display.on_normal()