diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml
index 2d9165e3..5edaf9da 100644
--- a/pwnagotchi/defaults.toml
+++ b/pwnagotchi/defaults.toml
@@ -28,7 +28,8 @@ main.plugins.bt-tether.enabled = false
main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
main.plugins.bt-tether.mac = ""
main.plugins.bt-tether.phone = "" # android or ios
-main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios
+main.plugins.bt-tether.ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios
+main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-)
main.plugins.fix_services.enabled = true
@@ -98,6 +99,13 @@ main.plugins.wardriver.whitelist = [
"network-2"
]
+ain.plugins.wigle.enabled = false
+main.plugins.wigle.api_key = "" # mandatory
+main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory
+main.plugins.wigle.donate = false # default: off
+main.plugins.wigle.timeout = 30 # default: 30
+main.plugins.wigle.position = (7, 85) # optionnal
+
main.plugins.wpa-sec.enabled = false
main.plugins.wpa-sec.api_key = ""
main.plugins.wpa-sec.api_url = "https://wpa-sec.stanev.org"
@@ -188,10 +196,11 @@ bettercap.handshakes = "/home/pi/handshakes"
bettercap.silence = [
"ble.device.new",
"ble.device.lost",
- "ble.device.disconnected",
- "ble.device.connected",
"ble.device.service.discovered",
"ble.device.characteristic.discovered",
+ "ble.device.disconnected",
+ "ble.device.connected",
+ "ble.connection.timeout",
"wifi.client.new",
"wifi.client.lost",
"wifi.client.probe",
diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py
index 9048492c..1f253f13 100644
--- a/pwnagotchi/plugins/default/bt-tether.py
+++ b/pwnagotchi/plugins/default/bt-tether.py
@@ -1,93 +1,294 @@
import logging
import subprocess
+import re
+import time
+from flask import abort, render_template_string
import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
+TEMPLATE = """
+{% extends "base.html" %}
+{% set active_page = "bt-tether" %}
+{% block title %}
+ {{ title }}
+{% endblock %}
+{% block meta %}
+
+
+{% endblock %}
+{% block styles %}
+{{ super() }}
+
+{% endblock %}
+{% block script %}
+ var searchInput = document.getElementById("searchText");
+ searchInput.onkeyup = function() {
+ var filter, table, tr, td, i, txtValue;
+ filter = searchInput.value.toUpperCase();
+ table = document.getElementById("tableOptions");
+ if (table) {
+ tr = table.getElementsByTagName("tr");
+
+ for (i = 0; i < tr.length; i++) {
+ td = tr[i].getElementsByTagName("td")[0];
+ if (td) {
+ txtValue = td.textContent || td.innerText;
+ if (txtValue.toUpperCase().indexOf(filter) > -1) {
+ tr[i].style.display = "";
+ }else{
+ tr[i].style.display = "none";
+ }
+ }
+ }
+ }
+ }
+{% endblock %}
+{% block content %}
+
+
+
+ Item |
+ Configuration |
+
+
+ Bluetooth |
+ {{bluetooth|safe}} |
+
+
+ Device |
+ {{device|safe}} |
+
+
+ Connection |
+ {{connection|safe}} |
+
+
+{% endblock %}
+"""
+
+# We all love crazy regex patterns
+MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
+IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
+DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
+
class BTTether(plugins.Plugin):
- __author__ = 'Jayofelony'
- __version__ = '1.2'
- __license__ = 'GPL3'
- __description__ = 'A new BT-Tether plugin'
+ __author__ = "Jayofelony, modified my fmatray"
+ __version__ = "1.4"
+ __license__ = "GPL3"
+ __description__ = "A new BT-Tether plugin"
def __init__(self):
self.ready = False
self.options = dict()
- self.status = '-'
+ self.phone_name = None
+ self.mac = None
+ @staticmethod
+ def exec_cmd(cmd, args, pattern=None):
+ try:
+ result = subprocess.run([cmd] + args,
+ check=True, capture_output=True, text=True)
+ if pattern:
+ return result.stdout.find(pattern)
+ return result
+ except Exception as exp:
+ logging.error(f"[BT-Tether] Error with {cmd} : {exp}")
+ raise exp
+
+ def bluetoothctl(self, args, pattern=None):
+ return self.exec_cmd("bluetoothctl", args, pattern)
+
+ def nmcli(self, args, pattern=None):
+ return self.exec_cmd("nmcli", args, pattern)
+
def on_loaded(self):
logging.info("[BT-Tether] plugin loaded.")
def on_config_changed(self, config):
- if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
- self.ready = False
- ip = self.options['ip']
- mac = self.options['mac']
- phone_name = self.options['phone-name'] + ' Network'
- if self.options['phone'].lower() == 'android':
- address = f'{ip}'
- gateway = '192.168.44.1'
- elif self.options['phone'].lower() == 'ios':
- address = f'{ip}'
- gateway = '172.20.10.1'
- else:
- logging.error("[BT-Tether] Phone type not supported.")
+ if "phone-name" not in self.options:
+ logging.error("[BT-Tether] Phone name not provided")
+ return
+ if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])):
+ logging.error("[BT-Tether] Error with mac adresse")
+ return
+
+ if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]):
+ logging.error("[BT-Tether] Phone type not supported")
+ return
+ if self.options["phone"].lower() == "android":
+ address = self.options.get("ip", "192.168.44.2")
+ gateway = "192.168.44.1"
+ elif self.options["phone"].lower() == "ios":
+ address = self.options.get("ip", "172.20.10.2")
+ gateway = "172.20.10.1"
+ if not re.match(IP_PTTRN, address):
+ logging.error(f"[BT-Tether] IP error: {address}")
+ return
+
+ self.phone_name = self.options["phone-name"] + " Network"
+ self.mac = self.options["mac"]
+ dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
+ if not re.match(DNS_PTTRN, dns):
+ logging.error(f"[BT-Tether] DNS error: {dns}")
+ return
+ dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
+
+ try:
+ # Configure connection. Metric is set to 200 to prefer connection over USB
+ self.nmcli(["connection", "modify", f"{self.phone_name}",
+ "connection.type", "bluetooth",
+ "bluetooth.type", "panu",
+ "bluetooth.bdaddr", f"{self.mac}",
+ "connection.autoconnect", "yes",
+ "connection.autoconnect-retries", "0",
+ "ipv4.method", "manual",
+ "ipv4.dns", f"{dns}",
+ "ipv4.addresses", f"{address}/24",
+ "ipv4.gateway", f"{gateway}",
+ "ipv4.route-metric", "200" ])
+ self.nmcli(["connection", "reload"])
+ self.ready = True
+ logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
+ except Exception as e:
+ logging.error(f"[BT-Tether] Error while configuring: {e}")
return
try:
- subprocess.run([
- 'nmcli', 'connection', 'modify', f'{phone_name}',
- 'connection.type', 'bluetooth',
- 'bluetooth.type', 'panu',
- 'bluetooth.bdaddr', f'{mac}',
- 'ipv4.method', 'manual',
- 'ipv4.dns', '8.8.8.8 1.1.1.1',
- 'ipv4.addresses', f'{address}/24',
- 'ipv4.gateway', f'{gateway}',
- 'ipv4.route-metric', '100'
- ], check=True)
- subprocess.run(['nmcli', 'connection', 'reload'], check=True)
- subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
+ time.sleep(5) # Give some delay to configure before going up
+ self.nmcli(["connection", "up", f"{self.phone_name}"])
except Exception as e:
logging.error(f"[BT-Tether] Failed to connect to device: {e}")
- logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
- self.ready = True
+ logging.error(
+ f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?"
+ )
def on_ready(self, agent):
- if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
- self.ready = False
- self.ready = True
+ try:
+ logging.info(f"[BT-Tether] Disabling bettercap's BLE module")
+ agent.run("ble.recon off", verbose_errors=False)
+ except Exception as e:
+ logging.info(f"[BT-Tether] Bettercap BLE was already off.")
+
+ def on_unload(self, ui):
+ with ui._lock:
+ ui.remove_element("bluetooth")
+ try:
+ self.nmcli(["connection", "down", f"{self.phone_name}"])
+ except Exception as e:
+ logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
def on_ui_setup(self, ui):
with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-',
position=(ui.width() / 2 - 10, 0),
label_font=fonts.Bold, text_font=fonts.Medium))
-
def on_ui_update(self, ui):
- if self.ready:
- phone_name = self.options['phone-name'] + ' Network'
- if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
- self.status = 'C'
- else:
- self.status = '-'
- try:
- subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
- except Exception as e:
- logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
- logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
- ui.set('bluetooth', self.status)
- return
-
- def on_unload(self, ui):
- phone_name = self.options['phone-name'] + ' Network'
+ if not self.ready:
+ return
with ui._lock:
- ui.remove_element('bluetooth')
- try:
- if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
- subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True)
- logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}")
- else:
- logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting")
- except Exception as e:
- logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
\ No newline at end of file
+ status = ""
+ try:
+ # Checking connection
+ if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
+ "activated") != -1:
+ ui.set("bluetooth", "U")
+ return
+ else:
+ ui.set("bluetooth", "D")
+ status = "BT Conn. down"
+
+ # Checking device
+ if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
+ "(connected)") != -1:
+ ui.set("bluetooth", "C")
+ status += "\nBT dev conn."
+ else:
+ ui.set("bluetooth", "-")
+ status += "\nBT dev disconn."
+ ui.set("status", status)
+ except Exception as e:
+ logging.error(f"[BT-Tether] Error on update: {e}")
+
+ def on_webhook(self, path, request):
+ if not self.ready:
+ return """
+ BT-tether: Error
+ Plugin not ready
+ """
+ if path == "/" or not path:
+ try:
+ bluetooth = self.bluetoothctl(["info", self.mac])
+ bluetooth = bluetooth.stdout.replace('\n', '
')
+ except Exception as e:
+ bluetooth = "Error while checking bluetoothctl"
+
+ try:
+ device =self.nmcli(["-w", "0","device", "show", self.mac])
+ device = device.stdout.replace('\n', '
')
+ except Exception as e:
+ device = "Error while checking nmcli device"
+
+ try:
+ connection = self.nmcli(["-w", "0","connection", "show", self.phone_name])
+ connection = connection.stdout.replace('\n', '
')
+ except Exception as e:
+ connection = "Error while checking nmcli connection"
+
+ logging.debug(device)
+ return render_template_string(TEMPLATE,
+ title="BT-Tether",
+ bluetooth=bluetooth,
+ device=device,
+ connection=connection)
+ abort(404)
\ No newline at end of file
diff --git a/pwnagotchi/plugins/default/ohcapi.py b/pwnagotchi/plugins/default/ohcapi.py
index b400c572..5aa6ac5d 100644
--- a/pwnagotchi/plugins/default/ohcapi.py
+++ b/pwnagotchi/plugins/default/ohcapi.py
@@ -75,20 +75,16 @@ class ohcapi(plugins.Plugin):
return
# Check if the internet is still available by pinging Google
+ self.internet_active = False
try:
response = requests.get('https://www.google.com', timeout=5)
+ if response.status_code == 200:
+ self.internet_active = True
except requests.ConnectionError:
- self.internet_active = False
- return
-
- if response.status_code == 200:
- self.internet_active = True
- else:
- self.internet_active = False
return
current_time = time.time()
- if current_time - self.last_run >= self.options['sleep']:
+ if self.internet_active and current_time - self.last_run >= self.options['sleep']:
self._run_tasks(agent)
self.last_run = current_time
diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py
index 5577ebee..5f872dc5 100644
--- a/pwnagotchi/plugins/default/wigle.py
+++ b/pwnagotchi/plugins/default/wigle.py
@@ -4,212 +4,300 @@ import json
import csv
import requests
import pwnagotchi
-
-from io import StringIO
-from datetime import datetime
-from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
+import re
+from glob import glob
from threading import Lock
+from io import StringIO
+from datetime import datetime, UTC
+
+from flask import make_response, redirect
+from pwnagotchi.utils import (
+ WifiInfo,
+ FieldNotFoundError,
+ extract_from_pcap,
+ StatusFile,
+ remove_whitelisted,
+)
from pwnagotchi import plugins
from pwnagotchi._version import __version__ as __pwnagotchi_version__
+import pwnagotchi.ui.fonts as fonts
+from pwnagotchi.ui.components import Text
+from pwnagotchi.ui.view import BLACK
-def _extract_gps_data(path):
- """
- Extract data from gps-file
-
- return json-obj
- """
-
- try:
- if path.endswith('.geo.json'):
- with open(path, 'r') as json_file:
- tempJson = json.load(json_file)
- d = datetime.utcfromtimestamp(int(tempJson["ts"]))
- return {"Latitude": tempJson["location"]["lat"],
- "Longitude": tempJson["location"]["lng"],
- "Altitude": 10,
- "Accuracy": tempJson["accuracy"],
- "Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
- else:
- with open(path, 'r') as json_file:
- return json.load(json_file)
- except OSError as os_err:
- raise os_err
- except json.JSONDecodeError as json_err:
- raise json_err
-
-
-def _format_auth(data):
- out = ""
- for auth in data:
- out = f"{out}[{auth}]"
- return [f"{auth}" for auth in data]
-
-
-def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
- """
- Transform to wigle entry in file
- """
- dummy = StringIO()
- # write kismet header
- dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
- f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
- dummy.write(
- "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
-
- writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
- writer.writerow([
- pcap_data[WifiInfo.BSSID],
- pcap_data[WifiInfo.ESSID],
- _format_auth(pcap_data[WifiInfo.ENCRYPTION]),
- datetime.strptime(gps_data['Updated'].rsplit('.')[0],
- "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
- pcap_data[WifiInfo.CHANNEL],
- pcap_data[WifiInfo.RSSI],
- gps_data['Latitude'],
- gps_data['Longitude'],
- gps_data['Altitude'],
- gps_data['Accuracy'],
- 'WIFI'])
- return dummy.getvalue()
-
-
-def _send_to_wigle(lines, api_key, donate=True, timeout=30):
- """
- Uploads the file to wigle-net
- """
-
- dummy = StringIO()
-
- for line in lines:
- dummy.write(f"{line}")
-
- dummy.seek(0)
-
- headers = {"Authorization": f"Basic {api_key}",
- "Accept": "application/json",
- "HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
- data = {"donate": "on" if donate else "false"}
- payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
- try:
- res = requests.post('https://api.wigle.net/api/v2/file/upload',
- data=data,
- headers=headers,
- files=payload,
- timeout=timeout)
- json_res = res.json()
- if not json_res['success']:
- raise requests.exceptions.RequestException(json_res['message'])
- except requests.exceptions.RequestException as re_e:
- raise re_e
+from scapy.all import Scapy_Exception
class Wigle(plugins.Plugin):
- __author__ = "Dadav and updated by Jayofelony"
- __version__ = "3.1.0"
+ __author__ = "Dadav and updated by Jayofelony and fmatray"
+ __version__ = "4.0.0"
__license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
+ LABEL_SPACING = 0
def __init__(self):
self.ready = False
- self.report = StatusFile('/root/.wigle_uploads', data_format='json')
+ self.report = None
self.skip = list()
self.lock = Lock()
self.options = dict()
+ self.statistics = dict(
+ ready=False,
+ username=None,
+ rank=None,
+ monthrank=None,
+ discoveredwiFi=None,
+ last=None,
+ )
+ self.last_stat = datetime.now(tz=UTC)
+ self.ui_counter = 0
- def on_loaded(self):
- if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
- logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
+ def on_config_changed(self, config):
+ self.api_key = self.options.get("api_key", None)
+ if not self.api_key:
+ logging.info("[WIGLE] api_key must be set.")
return
-
- if 'donate' not in self.options:
- self.options['donate'] = False
-
+ self.donate = self.options.get("donate", False)
+ self.handshake_dir = config["bettercap"].get("handshakes")
+ report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
+ self.report = StatusFile(report_filename, data_format="json")
+ self.cvs_dir = self.options.get("cvs_dir", None)
+ self.whitelist = config["main"].get("whitelist", [])
+ self.timeout = self.options.get("timeout", 30)
+ self.position = self.options.get("position", (10, 10))
self.ready = True
- logging.info("WIGLE: ready")
-
+ logging.info("[WIGLE] Ready for wardriving!!!")
+ self.get_statistics(force=True)
+
def on_webhook(self, path, request):
- from flask import make_response, redirect
- response = make_response(redirect("https://www.wigle.net/", code=302))
- return response
+ return make_response(redirect("https://www.wigle.net/", code=302))
+
+ def get_new_gps_files(self, reported):
+ all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
+ all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
+ all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
+ return set(all_gps_files) - set(reported) - set(self.skip)
+
+ @staticmethod
+ def get_pcap_filename(gps_file):
+ pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
+ if not os.path.exists(pcap_filename):
+ logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
+ return None
+ return pcap_filename
+
+ @staticmethod
+ def extract_gps_data(path):
+ """
+ Extract data from gps-file
+ return json-obj
+ """
+ try:
+ if path.endswith(".geo.json"):
+ with open(path, "r") as json_file:
+ tempJson = json.load(json_file)
+ d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
+ return {
+ "Latitude": tempJson["location"]["lat"],
+ "Longitude": tempJson["location"]["lng"],
+ "Altitude": 10,
+ "Accuracy": tempJson["accuracy"],
+ "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
+ }
+ with open(path, "r") as json_file:
+ return json.load(json_file)
+ except (OSError, json.JSONDecodeError) as exp:
+ raise exp
+
+ def get_gps_data(self, gps_file):
+ try:
+ gps_data = self.extract_gps_data(gps_file)
+ except (OSError, json.JSONDecodeError) as exp:
+ logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
+ return None
+ if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
+ logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
+ return None
+ return gps_data
+
+ @staticmethod
+ def get_pcap_data(pcap_filename):
+ try:
+ pcap_data = extract_from_pcap(
+ pcap_filename,
+ [
+ WifiInfo.BSSID,
+ WifiInfo.ESSID,
+ WifiInfo.ENCRYPTION,
+ WifiInfo.CHANNEL,
+ WifiInfo.FREQUENCY,
+ WifiInfo.RSSI,
+ ],
+ )
+ logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
+ except FieldNotFoundError:
+ logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
+ return None
+ except Scapy_Exception as sc_e:
+ logging.debug(f"[WIGLE] {sc_e}")
+ return None
+ return pcap_data
+
+ def generate_csv(self, data):
+ date = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"{pwnagotchi.name()}_{date}.csv"
+
+ content = StringIO()
+ # write kismet header + header
+ content.write(
+ f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
+ f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
+ f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
+ )
+ writer = csv.writer(
+ content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\"
+ )
+ for gps_data, pcap_data in data: # write WIFIs
+ writer.writerow(
+ [
+ pcap_data[WifiInfo.BSSID],
+ pcap_data[WifiInfo.ESSID],
+ f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
+ datetime.strptime(
+ gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
+ ).strftime("%Y-%m-%d %H:%M:%S"),
+ pcap_data[WifiInfo.CHANNEL],
+ pcap_data[WifiInfo.FREQUENCY],
+ pcap_data[WifiInfo.RSSI],
+ gps_data["Latitude"],
+ gps_data["Longitude"],
+ gps_data["Altitude"],
+ gps_data["Accuracy"],
+ "", # RCOIs to populate
+ "", # MfgrId always empty
+ "WIFI",
+ ]
+ )
+ content.seek(0)
+ return filename, content
+
+ def save_to_file(self, cvs_filename, cvs_content):
+ if not self.cvs_dir:
+ return
+ filename = os.path.join(self.cvs_dir, cvs_filename)
+ logging.info(f"[WIGLE] Saving to file {filename}")
+ try:
+ with open(filename, mode="w") as f:
+ f.write(cvs_content.getvalue())
+ except Exception as exp:
+ logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
+
+ def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
+ try:
+ json_res = requests.post(
+ "https://api.wigle.net/api/v2/file/upload",
+ headers={
+ "Authorization": f"Basic {self.api_key}",
+ "Accept": "application/json",
+ },
+ data={"donate": "on" if self.donate else "false"},
+ files=dict(file=(cvs_filename, cvs_content, "text/csv")),
+ timeout=self.timeout,
+ ).json()
+ if not json_res["success"]:
+ raise requests.exceptions.RequestException(json_res["message"])
+ reported += no_err_entries
+ self.report.update(data={"reported": reported})
+ logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
+ except (requests.exceptions.RequestException, OSError) as exp:
+ self.skip += no_err_entries
+ logging.debug(f"[WIGLE] Exception while uploading: {exp}")
+
+ def upload_new_handshakes(self, reported, new_gps_files, agent):
+ logging.info("[WIGLE] Uploading new handshakes to wigle.net")
+ csv_entries, no_err_entries = list(), list()
+ for gps_file in new_gps_files:
+ logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
+ if (
+ (pcap_filename := self.get_pcap_filename(gps_file))
+ and (gps_data := self.get_gps_data(gps_file))
+ and (pcap_data := self.get_pcap_data(pcap_filename))
+ ):
+ csv_entries.append((gps_data, pcap_data))
+ no_err_entries.append(gps_file)
+ else:
+ self.skip.append(gps_file)
+ logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
+ if csv_entries:
+ cvs_filename, cvs_content = self.generate_csv(csv_entries)
+ self.save_to_file(cvs_filename, cvs_content)
+ display = agent.view()
+ display.on_uploading("wigle.net")
+ self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
+ display.on_normal()
+
+ def get_statistics(self, force=False):
+ if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30:
+ return
+ self.last_stat = datetime.now(tz=UTC)
+ try:
+ self.statistics["ready"] = False
+ json_res = requests.get(
+ "https://api.wigle.net/api/v2/stats/user",
+ headers={
+ "Authorization": f"Basic {self.api_key}",
+ "Accept": "application/json",
+ },
+ timeout=self.timeout,
+ ).json()
+ if not json_res["success"]:
+ return
+ self.statistics["ready"] = True
+ self.statistics["username"] = json_res["user"]
+ self.statistics["rank"] = json_res["rank"]
+ self.statistics["monthrank"] = json_res["monthRank"]
+ self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"]
+ last = json_res["statistics"]["last"]
+ self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
+ except (requests.exceptions.RequestException, OSError) as exp:
+ pass
def on_internet_available(self, agent):
- """
- Called when there's internet connectivity
- """
- if not self.ready or self.lock.locked():
+ if not self.ready:
return
+ with self.lock:
+ reported = self.report.data_field_or("reported", default=list())
+ if new_gps_files := self.get_new_gps_files(reported):
+ self.upload_new_handshakes(reported, new_gps_files, agent)
+ else:
+ self.get_statistics()
- from scapy.all import Scapy_Exception
+ def on_ui_setup(self, ui):
+ with ui._lock:
+ ui.add_element(
+ "wigle",
+ Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
+ )
- config = agent.config()
- display = agent.view()
- reported = self.report.data_field_or('reported', default=list())
- handshake_dir = config['bettercap']['handshakes']
- all_files = os.listdir(handshake_dir)
- all_gps_files = [os.path.join(handshake_dir, filename)
- for filename in all_files
- if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
+ def on_unload(self, ui):
+ with ui._lock:
+ ui.remove_element("wigle")
- all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
- new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
- if new_gps_files:
- logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
- csv_entries = list()
- no_err_entries = list()
- for gps_file in new_gps_files:
- if gps_file.endswith('.gps.json'):
- pcap_filename = gps_file.replace('.gps.json', '.pcap')
- if gps_file.endswith('.geo.json'):
- pcap_filename = gps_file.replace('.geo.json', '.pcap')
- if not os.path.exists(pcap_filename):
- logging.debug("WIGLE: Can't find pcap for %s", gps_file)
- self.skip.append(gps_file)
- continue
- try:
- gps_data = _extract_gps_data(gps_file)
- except OSError as os_err:
- logging.debug("WIGLE: %s", os_err)
- self.skip.append(gps_file)
- continue
- except json.JSONDecodeError as json_err:
- logging.debug("WIGLE: %s", json_err)
- self.skip.append(gps_file)
- continue
- if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
- logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
- self.skip.append(gps_file)
- continue
- try:
- pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
- WifiInfo.ESSID,
- WifiInfo.ENCRYPTION,
- WifiInfo.CHANNEL,
- WifiInfo.RSSI])
- except FieldNotFoundError:
- logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
- self.skip.append(gps_file)
- continue
- except Scapy_Exception as sc_e:
- logging.debug("WIGLE: %s", sc_e)
- self.skip.append(gps_file)
- continue
- new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
- csv_entries.append(new_entry)
- no_err_entries.append(gps_file)
- if csv_entries:
- display.on_uploading('wigle.net')
-
- try:
- _send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
- reported += no_err_entries
- self.report.update(data={'reported': reported})
- logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
- except requests.exceptions.RequestException as re_e:
- self.skip += no_err_entries
- logging.debug("WIGLE: Got an exception while uploading %s", re_e)
- except OSError as os_e:
- self.skip += no_err_entries
- logging.debug("WIGLE: Got the following error: %s", os_e)
-
- display.on_normal()
+ def on_ui_update(self, ui):
+ if not self.ready:
+ return
+ with ui._lock:
+ if not self.statistics["ready"]:
+ ui.set("wigle", "We Will Wait Wigle")
+ return
+ msg = "-"
+ self.ui_counter = (self.ui_counter + 1) % 4
+ if self.ui_counter == 0:
+ msg = f"User:{self.statistics['username']}"
+ if self.ui_counter == 1:
+ msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}"
+ elif self.ui_counter == 2:
+ msg = f"{self.statistics['discoveredwiFi']} discovered WiFis"
+ elif self.ui_counter == 3:
+ msg = f"Last upl.:{self.statistics['last']}"
+ ui.set("wigle", msg)
diff --git a/pwnagotchi/utils.py b/pwnagotchi/utils.py
index 3fcf7cc7..26f507a8 100644
--- a/pwnagotchi/utils.py
+++ b/pwnagotchi/utils.py
@@ -564,7 +564,8 @@ class WifiInfo(Enum):
ESSID = 1
ENCRYPTION = 2
CHANNEL = 3
- RSSI = 4
+ FREQUENCY = 4
+ RSSI = 5
class FieldNotFoundError(Exception):
@@ -594,9 +595,6 @@ def extract_from_pcap(path, fields):
"""
results = dict()
for field in fields:
- if not isinstance(field, WifiInfo):
- raise TypeError("Invalid field")
-
subtypes = set()
if field == WifiInfo.BSSID:
@@ -606,10 +604,9 @@ def extract_from_pcap(path, fields):
packets = sniff(offline=path, filter=bpf_filter)
try:
for packet in packets:
- if packet.haslayer(Dot11Beacon):
- if hasattr(packet[Dot11], 'addr3'):
- results[field] = packet[Dot11].addr3
- break
+ if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'):
+ results[field] = packet[Dot11].addr3
+ break
else: # magic
raise FieldNotFoundError("Could not find field [BSSID]")
except Exception:
@@ -654,6 +651,14 @@ def extract_from_pcap(path, fields):
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
except Exception:
raise FieldNotFoundError("Could not find field [CHANNEL]")
+ elif field == WifiInfo.FREQUENCY:
+ from scapy.layers.dot11 import sniff, RadioTap
+ from pwnagotchi.mesh.wifi import freq_to_channel
+ packets = sniff(offline=path, count=1)
+ try:
+ results[field] = packets[0][RadioTap].ChannelFrequency
+ except Exception:
+ raise FieldNotFoundError("Could not find field [FREQUENCY]")
elif field == WifiInfo.RSSI:
from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
@@ -662,7 +667,8 @@ def extract_from_pcap(path, fields):
results[field] = packets[0][RadioTap].dBm_AntSignal
except Exception:
raise FieldNotFoundError("Could not find field [RSSI]")
-
+ else:
+ raise TypeError("Invalid field")
return results