mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
Merge pull request #333 from fmatray/noai
Updates for BT-Tether, Wigle and OHCAPI
This commit is contained in:
@ -28,7 +28,8 @@ main.plugins.bt-tether.enabled = false
|
||||
main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
|
||||
main.plugins.bt-tether.mac = ""
|
||||
main.plugins.bt-tether.phone = "" # android or ios
|
||||
main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios
|
||||
main.plugins.bt-tether.ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios
|
||||
main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-)
|
||||
|
||||
main.plugins.fix_services.enabled = true
|
||||
|
||||
@ -98,6 +99,13 @@ main.plugins.wardriver.whitelist = [
|
||||
"network-2"
|
||||
]
|
||||
|
||||
ain.plugins.wigle.enabled = false
|
||||
main.plugins.wigle.api_key = "" # mandatory
|
||||
main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory
|
||||
main.plugins.wigle.donate = false # default: off
|
||||
main.plugins.wigle.timeout = 30 # default: 30
|
||||
main.plugins.wigle.position = (7, 85) # optionnal
|
||||
|
||||
main.plugins.wpa-sec.enabled = false
|
||||
main.plugins.wpa-sec.api_key = ""
|
||||
main.plugins.wpa-sec.api_url = "https://wpa-sec.stanev.org"
|
||||
@ -188,10 +196,11 @@ bettercap.handshakes = "/home/pi/handshakes"
|
||||
bettercap.silence = [
|
||||
"ble.device.new",
|
||||
"ble.device.lost",
|
||||
"ble.device.disconnected",
|
||||
"ble.device.connected",
|
||||
"ble.device.service.discovered",
|
||||
"ble.device.characteristic.discovered",
|
||||
"ble.device.disconnected",
|
||||
"ble.device.connected",
|
||||
"ble.connection.timeout",
|
||||
"wifi.client.new",
|
||||
"wifi.client.lost",
|
||||
"wifi.client.probe",
|
||||
|
@ -1,93 +1,294 @@
|
||||
import logging
|
||||
import subprocess
|
||||
import re
|
||||
import time
|
||||
from flask import abort, render_template_string
|
||||
import pwnagotchi.plugins as plugins
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
from pwnagotchi.ui.components import LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
|
||||
TEMPLATE = """
|
||||
{% extends "base.html" %}
|
||||
{% set active_page = "bt-tether" %}
|
||||
{% block title %}
|
||||
{{ title }}
|
||||
{% endblock %}
|
||||
{% block meta %}
|
||||
<meta charset="utf-8">
|
||||
<meta name="viewport" content="width=device-width, user-scalable=0" />
|
||||
{% endblock %}
|
||||
{% block styles %}
|
||||
{{ super() }}
|
||||
<style>
|
||||
#searchText {
|
||||
width: 100%;
|
||||
}
|
||||
table {
|
||||
table-layout: auto;
|
||||
width: 100%;
|
||||
}
|
||||
table, th, td {
|
||||
border: 1px solid;
|
||||
border-collapse: collapse;
|
||||
}
|
||||
th, td {
|
||||
padding: 15px;
|
||||
text-align: left;
|
||||
}
|
||||
@media screen and (max-width:700px) {
|
||||
table, tr, td {
|
||||
padding:0;
|
||||
border:1px solid;
|
||||
}
|
||||
table {
|
||||
border:none;
|
||||
}
|
||||
tr:first-child, thead, th {
|
||||
display:none;
|
||||
border:none;
|
||||
}
|
||||
tr {
|
||||
float: left;
|
||||
width: 100%;
|
||||
margin-bottom: 2em;
|
||||
}
|
||||
td {
|
||||
float: left;
|
||||
width: 100%;
|
||||
padding:1em;
|
||||
}
|
||||
td::before {
|
||||
content:attr(data-label);
|
||||
word-wrap: break-word;
|
||||
color: white;
|
||||
border-right:2px solid;
|
||||
width: 20%;
|
||||
float:left;
|
||||
padding:1em;
|
||||
font-weight: bold;
|
||||
margin:-1em 1em -1em -1em;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
{% endblock %}
|
||||
{% block script %}
|
||||
var searchInput = document.getElementById("searchText");
|
||||
searchInput.onkeyup = function() {
|
||||
var filter, table, tr, td, i, txtValue;
|
||||
filter = searchInput.value.toUpperCase();
|
||||
table = document.getElementById("tableOptions");
|
||||
if (table) {
|
||||
tr = table.getElementsByTagName("tr");
|
||||
|
||||
for (i = 0; i < tr.length; i++) {
|
||||
td = tr[i].getElementsByTagName("td")[0];
|
||||
if (td) {
|
||||
txtValue = td.textContent || td.innerText;
|
||||
if (txtValue.toUpperCase().indexOf(filter) > -1) {
|
||||
tr[i].style.display = "";
|
||||
}else{
|
||||
tr[i].style.display = "none";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
{% endblock %}
|
||||
{% block content %}
|
||||
<input type="text" id="searchText" placeholder="Search for ..." title="Type in a filter">
|
||||
<table id="tableOptions">
|
||||
<tr>
|
||||
<th>Item</th>
|
||||
<th>Configuration</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td data-label="bluetooth">Bluetooth</td>
|
||||
<td>{{bluetooth|safe}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td data-label="device">Device</td>
|
||||
<td>{{device|safe}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td data-label="connection">Connection</td>
|
||||
<td>{{connection|safe}}</td>
|
||||
</tr>
|
||||
</table>
|
||||
{% endblock %}
|
||||
"""
|
||||
|
||||
# We all love crazy regex patterns
|
||||
MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
|
||||
IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
|
||||
DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
|
||||
|
||||
class BTTether(plugins.Plugin):
|
||||
__author__ = 'Jayofelony'
|
||||
__version__ = '1.2'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'A new BT-Tether plugin'
|
||||
__author__ = "Jayofelony, modified my fmatray"
|
||||
__version__ = "1.4"
|
||||
__license__ = "GPL3"
|
||||
__description__ = "A new BT-Tether plugin"
|
||||
|
||||
def __init__(self):
|
||||
self.ready = False
|
||||
self.options = dict()
|
||||
self.status = '-'
|
||||
self.phone_name = None
|
||||
self.mac = None
|
||||
|
||||
@staticmethod
|
||||
def exec_cmd(cmd, args, pattern=None):
|
||||
try:
|
||||
result = subprocess.run([cmd] + args,
|
||||
check=True, capture_output=True, text=True)
|
||||
if pattern:
|
||||
return result.stdout.find(pattern)
|
||||
return result
|
||||
except Exception as exp:
|
||||
logging.error(f"[BT-Tether] Error with {cmd} : {exp}")
|
||||
raise exp
|
||||
|
||||
def bluetoothctl(self, args, pattern=None):
|
||||
return self.exec_cmd("bluetoothctl", args, pattern)
|
||||
|
||||
def nmcli(self, args, pattern=None):
|
||||
return self.exec_cmd("nmcli", args, pattern)
|
||||
|
||||
def on_loaded(self):
|
||||
logging.info("[BT-Tether] plugin loaded.")
|
||||
|
||||
def on_config_changed(self, config):
|
||||
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
|
||||
self.ready = False
|
||||
ip = self.options['ip']
|
||||
mac = self.options['mac']
|
||||
phone_name = self.options['phone-name'] + ' Network'
|
||||
if self.options['phone'].lower() == 'android':
|
||||
address = f'{ip}'
|
||||
gateway = '192.168.44.1'
|
||||
elif self.options['phone'].lower() == 'ios':
|
||||
address = f'{ip}'
|
||||
gateway = '172.20.10.1'
|
||||
else:
|
||||
logging.error("[BT-Tether] Phone type not supported.")
|
||||
if "phone-name" not in self.options:
|
||||
logging.error("[BT-Tether] Phone name not provided")
|
||||
return
|
||||
if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])):
|
||||
logging.error("[BT-Tether] Error with mac adresse")
|
||||
return
|
||||
|
||||
if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]):
|
||||
logging.error("[BT-Tether] Phone type not supported")
|
||||
return
|
||||
if self.options["phone"].lower() == "android":
|
||||
address = self.options.get("ip", "192.168.44.2")
|
||||
gateway = "192.168.44.1"
|
||||
elif self.options["phone"].lower() == "ios":
|
||||
address = self.options.get("ip", "172.20.10.2")
|
||||
gateway = "172.20.10.1"
|
||||
if not re.match(IP_PTTRN, address):
|
||||
logging.error(f"[BT-Tether] IP error: {address}")
|
||||
return
|
||||
|
||||
self.phone_name = self.options["phone-name"] + " Network"
|
||||
self.mac = self.options["mac"]
|
||||
dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
|
||||
if not re.match(DNS_PTTRN, dns):
|
||||
logging.error(f"[BT-Tether] DNS error: {dns}")
|
||||
return
|
||||
dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
|
||||
|
||||
try:
|
||||
# Configure connection. Metric is set to 200 to prefer connection over USB
|
||||
self.nmcli(["connection", "modify", f"{self.phone_name}",
|
||||
"connection.type", "bluetooth",
|
||||
"bluetooth.type", "panu",
|
||||
"bluetooth.bdaddr", f"{self.mac}",
|
||||
"connection.autoconnect", "yes",
|
||||
"connection.autoconnect-retries", "0",
|
||||
"ipv4.method", "manual",
|
||||
"ipv4.dns", f"{dns}",
|
||||
"ipv4.addresses", f"{address}/24",
|
||||
"ipv4.gateway", f"{gateway}",
|
||||
"ipv4.route-metric", "200" ])
|
||||
self.nmcli(["connection", "reload"])
|
||||
self.ready = True
|
||||
logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
|
||||
except Exception as e:
|
||||
logging.error(f"[BT-Tether] Error while configuring: {e}")
|
||||
return
|
||||
try:
|
||||
subprocess.run([
|
||||
'nmcli', 'connection', 'modify', f'{phone_name}',
|
||||
'connection.type', 'bluetooth',
|
||||
'bluetooth.type', 'panu',
|
||||
'bluetooth.bdaddr', f'{mac}',
|
||||
'ipv4.method', 'manual',
|
||||
'ipv4.dns', '8.8.8.8 1.1.1.1',
|
||||
'ipv4.addresses', f'{address}/24',
|
||||
'ipv4.gateway', f'{gateway}',
|
||||
'ipv4.route-metric', '100'
|
||||
], check=True)
|
||||
subprocess.run(['nmcli', 'connection', 'reload'], check=True)
|
||||
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
|
||||
time.sleep(5) # Give some delay to configure before going up
|
||||
self.nmcli(["connection", "up", f"{self.phone_name}"])
|
||||
except Exception as e:
|
||||
logging.error(f"[BT-Tether] Failed to connect to device: {e}")
|
||||
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
|
||||
self.ready = True
|
||||
logging.error(
|
||||
f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?"
|
||||
)
|
||||
|
||||
def on_ready(self, agent):
|
||||
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
|
||||
self.ready = False
|
||||
self.ready = True
|
||||
try:
|
||||
logging.info(f"[BT-Tether] Disabling bettercap's BLE module")
|
||||
agent.run("ble.recon off", verbose_errors=False)
|
||||
except Exception as e:
|
||||
logging.info(f"[BT-Tether] Bettercap BLE was already off.")
|
||||
|
||||
def on_unload(self, ui):
|
||||
with ui._lock:
|
||||
ui.remove_element("bluetooth")
|
||||
try:
|
||||
self.nmcli(["connection", "down", f"{self.phone_name}"])
|
||||
except Exception as e:
|
||||
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
|
||||
|
||||
def on_ui_setup(self, ui):
|
||||
with ui._lock:
|
||||
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-',
|
||||
position=(ui.width() / 2 - 10, 0),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
|
||||
def on_ui_update(self, ui):
|
||||
if self.ready:
|
||||
phone_name = self.options['phone-name'] + ' Network'
|
||||
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
|
||||
self.status = 'C'
|
||||
else:
|
||||
self.status = '-'
|
||||
try:
|
||||
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
|
||||
except Exception as e:
|
||||
logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
|
||||
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
|
||||
ui.set('bluetooth', self.status)
|
||||
return
|
||||
|
||||
def on_unload(self, ui):
|
||||
phone_name = self.options['phone-name'] + ' Network'
|
||||
if not self.ready:
|
||||
return
|
||||
with ui._lock:
|
||||
ui.remove_element('bluetooth')
|
||||
try:
|
||||
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
|
||||
subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True)
|
||||
logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}")
|
||||
else:
|
||||
logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting")
|
||||
except Exception as e:
|
||||
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
|
||||
status = ""
|
||||
try:
|
||||
# Checking connection
|
||||
if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
|
||||
"activated") != -1:
|
||||
ui.set("bluetooth", "U")
|
||||
return
|
||||
else:
|
||||
ui.set("bluetooth", "D")
|
||||
status = "BT Conn. down"
|
||||
|
||||
# Checking device
|
||||
if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
|
||||
"(connected)") != -1:
|
||||
ui.set("bluetooth", "C")
|
||||
status += "\nBT dev conn."
|
||||
else:
|
||||
ui.set("bluetooth", "-")
|
||||
status += "\nBT dev disconn."
|
||||
ui.set("status", status)
|
||||
except Exception as e:
|
||||
logging.error(f"[BT-Tether] Error on update: {e}")
|
||||
|
||||
def on_webhook(self, path, request):
|
||||
if not self.ready:
|
||||
return """<html>
|
||||
<head><title>BT-tether: Error</title></head>
|
||||
<body><code>Plugin not ready</code></body>
|
||||
</html>"""
|
||||
if path == "/" or not path:
|
||||
try:
|
||||
bluetooth = self.bluetoothctl(["info", self.mac])
|
||||
bluetooth = bluetooth.stdout.replace('\n', '<br>')
|
||||
except Exception as e:
|
||||
bluetooth = "Error while checking bluetoothctl"
|
||||
|
||||
try:
|
||||
device =self.nmcli(["-w", "0","device", "show", self.mac])
|
||||
device = device.stdout.replace('\n', '<br>')
|
||||
except Exception as e:
|
||||
device = "Error while checking nmcli device"
|
||||
|
||||
try:
|
||||
connection = self.nmcli(["-w", "0","connection", "show", self.phone_name])
|
||||
connection = connection.stdout.replace('\n', '<br>')
|
||||
except Exception as e:
|
||||
connection = "Error while checking nmcli connection"
|
||||
|
||||
logging.debug(device)
|
||||
return render_template_string(TEMPLATE,
|
||||
title="BT-Tether",
|
||||
bluetooth=bluetooth,
|
||||
device=device,
|
||||
connection=connection)
|
||||
abort(404)
|
@ -75,20 +75,16 @@ class ohcapi(plugins.Plugin):
|
||||
return
|
||||
|
||||
# Check if the internet is still available by pinging Google
|
||||
self.internet_active = False
|
||||
try:
|
||||
response = requests.get('https://www.google.com', timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.internet_active = True
|
||||
except requests.ConnectionError:
|
||||
self.internet_active = False
|
||||
return
|
||||
|
||||
if response.status_code == 200:
|
||||
self.internet_active = True
|
||||
else:
|
||||
self.internet_active = False
|
||||
return
|
||||
|
||||
current_time = time.time()
|
||||
if current_time - self.last_run >= self.options['sleep']:
|
||||
if self.internet_active and current_time - self.last_run >= self.options['sleep']:
|
||||
self._run_tasks(agent)
|
||||
self.last_run = current_time
|
||||
|
||||
|
@ -4,212 +4,300 @@ import json
|
||||
import csv
|
||||
import requests
|
||||
import pwnagotchi
|
||||
|
||||
from io import StringIO
|
||||
from datetime import datetime
|
||||
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
|
||||
import re
|
||||
from glob import glob
|
||||
from threading import Lock
|
||||
from io import StringIO
|
||||
from datetime import datetime, UTC
|
||||
|
||||
from flask import make_response, redirect
|
||||
from pwnagotchi.utils import (
|
||||
WifiInfo,
|
||||
FieldNotFoundError,
|
||||
extract_from_pcap,
|
||||
StatusFile,
|
||||
remove_whitelisted,
|
||||
)
|
||||
from pwnagotchi import plugins
|
||||
from pwnagotchi._version import __version__ as __pwnagotchi_version__
|
||||
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
from pwnagotchi.ui.components import Text
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
|
||||
def _extract_gps_data(path):
|
||||
"""
|
||||
Extract data from gps-file
|
||||
|
||||
return json-obj
|
||||
"""
|
||||
|
||||
try:
|
||||
if path.endswith('.geo.json'):
|
||||
with open(path, 'r') as json_file:
|
||||
tempJson = json.load(json_file)
|
||||
d = datetime.utcfromtimestamp(int(tempJson["ts"]))
|
||||
return {"Latitude": tempJson["location"]["lat"],
|
||||
"Longitude": tempJson["location"]["lng"],
|
||||
"Altitude": 10,
|
||||
"Accuracy": tempJson["accuracy"],
|
||||
"Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
|
||||
else:
|
||||
with open(path, 'r') as json_file:
|
||||
return json.load(json_file)
|
||||
except OSError as os_err:
|
||||
raise os_err
|
||||
except json.JSONDecodeError as json_err:
|
||||
raise json_err
|
||||
|
||||
|
||||
def _format_auth(data):
|
||||
out = ""
|
||||
for auth in data:
|
||||
out = f"{out}[{auth}]"
|
||||
return [f"{auth}" for auth in data]
|
||||
|
||||
|
||||
def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
|
||||
"""
|
||||
Transform to wigle entry in file
|
||||
"""
|
||||
dummy = StringIO()
|
||||
# write kismet header
|
||||
dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
|
||||
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
|
||||
dummy.write(
|
||||
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
|
||||
|
||||
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
|
||||
writer.writerow([
|
||||
pcap_data[WifiInfo.BSSID],
|
||||
pcap_data[WifiInfo.ESSID],
|
||||
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
|
||||
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
|
||||
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
|
||||
pcap_data[WifiInfo.CHANNEL],
|
||||
pcap_data[WifiInfo.RSSI],
|
||||
gps_data['Latitude'],
|
||||
gps_data['Longitude'],
|
||||
gps_data['Altitude'],
|
||||
gps_data['Accuracy'],
|
||||
'WIFI'])
|
||||
return dummy.getvalue()
|
||||
|
||||
|
||||
def _send_to_wigle(lines, api_key, donate=True, timeout=30):
|
||||
"""
|
||||
Uploads the file to wigle-net
|
||||
"""
|
||||
|
||||
dummy = StringIO()
|
||||
|
||||
for line in lines:
|
||||
dummy.write(f"{line}")
|
||||
|
||||
dummy.seek(0)
|
||||
|
||||
headers = {"Authorization": f"Basic {api_key}",
|
||||
"Accept": "application/json",
|
||||
"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
|
||||
data = {"donate": "on" if donate else "false"}
|
||||
payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
|
||||
try:
|
||||
res = requests.post('https://api.wigle.net/api/v2/file/upload',
|
||||
data=data,
|
||||
headers=headers,
|
||||
files=payload,
|
||||
timeout=timeout)
|
||||
json_res = res.json()
|
||||
if not json_res['success']:
|
||||
raise requests.exceptions.RequestException(json_res['message'])
|
||||
except requests.exceptions.RequestException as re_e:
|
||||
raise re_e
|
||||
from scapy.all import Scapy_Exception
|
||||
|
||||
|
||||
class Wigle(plugins.Plugin):
|
||||
__author__ = "Dadav and updated by Jayofelony"
|
||||
__version__ = "3.1.0"
|
||||
__author__ = "Dadav and updated by Jayofelony and fmatray"
|
||||
__version__ = "4.0.0"
|
||||
__license__ = "GPL3"
|
||||
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
|
||||
LABEL_SPACING = 0
|
||||
|
||||
def __init__(self):
|
||||
self.ready = False
|
||||
self.report = StatusFile('/root/.wigle_uploads', data_format='json')
|
||||
self.report = None
|
||||
self.skip = list()
|
||||
self.lock = Lock()
|
||||
self.options = dict()
|
||||
self.statistics = dict(
|
||||
ready=False,
|
||||
username=None,
|
||||
rank=None,
|
||||
monthrank=None,
|
||||
discoveredwiFi=None,
|
||||
last=None,
|
||||
)
|
||||
self.last_stat = datetime.now(tz=UTC)
|
||||
self.ui_counter = 0
|
||||
|
||||
def on_loaded(self):
|
||||
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
|
||||
logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
|
||||
def on_config_changed(self, config):
|
||||
self.api_key = self.options.get("api_key", None)
|
||||
if not self.api_key:
|
||||
logging.info("[WIGLE] api_key must be set.")
|
||||
return
|
||||
|
||||
if 'donate' not in self.options:
|
||||
self.options['donate'] = False
|
||||
|
||||
self.donate = self.options.get("donate", False)
|
||||
self.handshake_dir = config["bettercap"].get("handshakes")
|
||||
report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
|
||||
self.report = StatusFile(report_filename, data_format="json")
|
||||
self.cvs_dir = self.options.get("cvs_dir", None)
|
||||
self.whitelist = config["main"].get("whitelist", [])
|
||||
self.timeout = self.options.get("timeout", 30)
|
||||
self.position = self.options.get("position", (10, 10))
|
||||
self.ready = True
|
||||
logging.info("WIGLE: ready")
|
||||
|
||||
logging.info("[WIGLE] Ready for wardriving!!!")
|
||||
self.get_statistics(force=True)
|
||||
|
||||
def on_webhook(self, path, request):
|
||||
from flask import make_response, redirect
|
||||
response = make_response(redirect("https://www.wigle.net/", code=302))
|
||||
return response
|
||||
return make_response(redirect("https://www.wigle.net/", code=302))
|
||||
|
||||
def get_new_gps_files(self, reported):
|
||||
all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
|
||||
all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
|
||||
all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
|
||||
return set(all_gps_files) - set(reported) - set(self.skip)
|
||||
|
||||
@staticmethod
|
||||
def get_pcap_filename(gps_file):
|
||||
pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
|
||||
if not os.path.exists(pcap_filename):
|
||||
logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
|
||||
return None
|
||||
return pcap_filename
|
||||
|
||||
@staticmethod
|
||||
def extract_gps_data(path):
|
||||
"""
|
||||
Extract data from gps-file
|
||||
return json-obj
|
||||
"""
|
||||
try:
|
||||
if path.endswith(".geo.json"):
|
||||
with open(path, "r") as json_file:
|
||||
tempJson = json.load(json_file)
|
||||
d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
|
||||
return {
|
||||
"Latitude": tempJson["location"]["lat"],
|
||||
"Longitude": tempJson["location"]["lng"],
|
||||
"Altitude": 10,
|
||||
"Accuracy": tempJson["accuracy"],
|
||||
"Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
|
||||
}
|
||||
with open(path, "r") as json_file:
|
||||
return json.load(json_file)
|
||||
except (OSError, json.JSONDecodeError) as exp:
|
||||
raise exp
|
||||
|
||||
def get_gps_data(self, gps_file):
|
||||
try:
|
||||
gps_data = self.extract_gps_data(gps_file)
|
||||
except (OSError, json.JSONDecodeError) as exp:
|
||||
logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
|
||||
return None
|
||||
if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
|
||||
logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
|
||||
return None
|
||||
return gps_data
|
||||
|
||||
@staticmethod
|
||||
def get_pcap_data(pcap_filename):
|
||||
try:
|
||||
pcap_data = extract_from_pcap(
|
||||
pcap_filename,
|
||||
[
|
||||
WifiInfo.BSSID,
|
||||
WifiInfo.ESSID,
|
||||
WifiInfo.ENCRYPTION,
|
||||
WifiInfo.CHANNEL,
|
||||
WifiInfo.FREQUENCY,
|
||||
WifiInfo.RSSI,
|
||||
],
|
||||
)
|
||||
logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
|
||||
except FieldNotFoundError:
|
||||
logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
|
||||
return None
|
||||
except Scapy_Exception as sc_e:
|
||||
logging.debug(f"[WIGLE] {sc_e}")
|
||||
return None
|
||||
return pcap_data
|
||||
|
||||
def generate_csv(self, data):
|
||||
date = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{pwnagotchi.name()}_{date}.csv"
|
||||
|
||||
content = StringIO()
|
||||
# write kismet header + header
|
||||
content.write(
|
||||
f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
|
||||
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
|
||||
f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
|
||||
)
|
||||
writer = csv.writer(
|
||||
content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\"
|
||||
)
|
||||
for gps_data, pcap_data in data: # write WIFIs
|
||||
writer.writerow(
|
||||
[
|
||||
pcap_data[WifiInfo.BSSID],
|
||||
pcap_data[WifiInfo.ESSID],
|
||||
f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
|
||||
datetime.strptime(
|
||||
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
|
||||
).strftime("%Y-%m-%d %H:%M:%S"),
|
||||
pcap_data[WifiInfo.CHANNEL],
|
||||
pcap_data[WifiInfo.FREQUENCY],
|
||||
pcap_data[WifiInfo.RSSI],
|
||||
gps_data["Latitude"],
|
||||
gps_data["Longitude"],
|
||||
gps_data["Altitude"],
|
||||
gps_data["Accuracy"],
|
||||
"", # RCOIs to populate
|
||||
"", # MfgrId always empty
|
||||
"WIFI",
|
||||
]
|
||||
)
|
||||
content.seek(0)
|
||||
return filename, content
|
||||
|
||||
def save_to_file(self, cvs_filename, cvs_content):
|
||||
if not self.cvs_dir:
|
||||
return
|
||||
filename = os.path.join(self.cvs_dir, cvs_filename)
|
||||
logging.info(f"[WIGLE] Saving to file {filename}")
|
||||
try:
|
||||
with open(filename, mode="w") as f:
|
||||
f.write(cvs_content.getvalue())
|
||||
except Exception as exp:
|
||||
logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
|
||||
|
||||
def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
|
||||
try:
|
||||
json_res = requests.post(
|
||||
"https://api.wigle.net/api/v2/file/upload",
|
||||
headers={
|
||||
"Authorization": f"Basic {self.api_key}",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
data={"donate": "on" if self.donate else "false"},
|
||||
files=dict(file=(cvs_filename, cvs_content, "text/csv")),
|
||||
timeout=self.timeout,
|
||||
).json()
|
||||
if not json_res["success"]:
|
||||
raise requests.exceptions.RequestException(json_res["message"])
|
||||
reported += no_err_entries
|
||||
self.report.update(data={"reported": reported})
|
||||
logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
|
||||
except (requests.exceptions.RequestException, OSError) as exp:
|
||||
self.skip += no_err_entries
|
||||
logging.debug(f"[WIGLE] Exception while uploading: {exp}")
|
||||
|
||||
def upload_new_handshakes(self, reported, new_gps_files, agent):
|
||||
logging.info("[WIGLE] Uploading new handshakes to wigle.net")
|
||||
csv_entries, no_err_entries = list(), list()
|
||||
for gps_file in new_gps_files:
|
||||
logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
|
||||
if (
|
||||
(pcap_filename := self.get_pcap_filename(gps_file))
|
||||
and (gps_data := self.get_gps_data(gps_file))
|
||||
and (pcap_data := self.get_pcap_data(pcap_filename))
|
||||
):
|
||||
csv_entries.append((gps_data, pcap_data))
|
||||
no_err_entries.append(gps_file)
|
||||
else:
|
||||
self.skip.append(gps_file)
|
||||
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
|
||||
if csv_entries:
|
||||
cvs_filename, cvs_content = self.generate_csv(csv_entries)
|
||||
self.save_to_file(cvs_filename, cvs_content)
|
||||
display = agent.view()
|
||||
display.on_uploading("wigle.net")
|
||||
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
|
||||
display.on_normal()
|
||||
|
||||
def get_statistics(self, force=False):
|
||||
if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30:
|
||||
return
|
||||
self.last_stat = datetime.now(tz=UTC)
|
||||
try:
|
||||
self.statistics["ready"] = False
|
||||
json_res = requests.get(
|
||||
"https://api.wigle.net/api/v2/stats/user",
|
||||
headers={
|
||||
"Authorization": f"Basic {self.api_key}",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
timeout=self.timeout,
|
||||
).json()
|
||||
if not json_res["success"]:
|
||||
return
|
||||
self.statistics["ready"] = True
|
||||
self.statistics["username"] = json_res["user"]
|
||||
self.statistics["rank"] = json_res["rank"]
|
||||
self.statistics["monthrank"] = json_res["monthRank"]
|
||||
self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"]
|
||||
last = json_res["statistics"]["last"]
|
||||
self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
|
||||
except (requests.exceptions.RequestException, OSError) as exp:
|
||||
pass
|
||||
|
||||
def on_internet_available(self, agent):
|
||||
"""
|
||||
Called when there's internet connectivity
|
||||
"""
|
||||
if not self.ready or self.lock.locked():
|
||||
if not self.ready:
|
||||
return
|
||||
with self.lock:
|
||||
reported = self.report.data_field_or("reported", default=list())
|
||||
if new_gps_files := self.get_new_gps_files(reported):
|
||||
self.upload_new_handshakes(reported, new_gps_files, agent)
|
||||
else:
|
||||
self.get_statistics()
|
||||
|
||||
from scapy.all import Scapy_Exception
|
||||
def on_ui_setup(self, ui):
|
||||
with ui._lock:
|
||||
ui.add_element(
|
||||
"wigle",
|
||||
Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
|
||||
)
|
||||
|
||||
config = agent.config()
|
||||
display = agent.view()
|
||||
reported = self.report.data_field_or('reported', default=list())
|
||||
handshake_dir = config['bettercap']['handshakes']
|
||||
all_files = os.listdir(handshake_dir)
|
||||
all_gps_files = [os.path.join(handshake_dir, filename)
|
||||
for filename in all_files
|
||||
if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
|
||||
def on_unload(self, ui):
|
||||
with ui._lock:
|
||||
ui.remove_element("wigle")
|
||||
|
||||
all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
|
||||
new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
|
||||
if new_gps_files:
|
||||
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
|
||||
csv_entries = list()
|
||||
no_err_entries = list()
|
||||
for gps_file in new_gps_files:
|
||||
if gps_file.endswith('.gps.json'):
|
||||
pcap_filename = gps_file.replace('.gps.json', '.pcap')
|
||||
if gps_file.endswith('.geo.json'):
|
||||
pcap_filename = gps_file.replace('.geo.json', '.pcap')
|
||||
if not os.path.exists(pcap_filename):
|
||||
logging.debug("WIGLE: Can't find pcap for %s", gps_file)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
try:
|
||||
gps_data = _extract_gps_data(gps_file)
|
||||
except OSError as os_err:
|
||||
logging.debug("WIGLE: %s", os_err)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
except json.JSONDecodeError as json_err:
|
||||
logging.debug("WIGLE: %s", json_err)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
|
||||
logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
try:
|
||||
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
|
||||
WifiInfo.ESSID,
|
||||
WifiInfo.ENCRYPTION,
|
||||
WifiInfo.CHANNEL,
|
||||
WifiInfo.RSSI])
|
||||
except FieldNotFoundError:
|
||||
logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
except Scapy_Exception as sc_e:
|
||||
logging.debug("WIGLE: %s", sc_e)
|
||||
self.skip.append(gps_file)
|
||||
continue
|
||||
new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
|
||||
csv_entries.append(new_entry)
|
||||
no_err_entries.append(gps_file)
|
||||
if csv_entries:
|
||||
display.on_uploading('wigle.net')
|
||||
|
||||
try:
|
||||
_send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
|
||||
reported += no_err_entries
|
||||
self.report.update(data={'reported': reported})
|
||||
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
|
||||
except requests.exceptions.RequestException as re_e:
|
||||
self.skip += no_err_entries
|
||||
logging.debug("WIGLE: Got an exception while uploading %s", re_e)
|
||||
except OSError as os_e:
|
||||
self.skip += no_err_entries
|
||||
logging.debug("WIGLE: Got the following error: %s", os_e)
|
||||
|
||||
display.on_normal()
|
||||
def on_ui_update(self, ui):
|
||||
if not self.ready:
|
||||
return
|
||||
with ui._lock:
|
||||
if not self.statistics["ready"]:
|
||||
ui.set("wigle", "We Will Wait Wigle")
|
||||
return
|
||||
msg = "-"
|
||||
self.ui_counter = (self.ui_counter + 1) % 4
|
||||
if self.ui_counter == 0:
|
||||
msg = f"User:{self.statistics['username']}"
|
||||
if self.ui_counter == 1:
|
||||
msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}"
|
||||
elif self.ui_counter == 2:
|
||||
msg = f"{self.statistics['discoveredwiFi']} discovered WiFis"
|
||||
elif self.ui_counter == 3:
|
||||
msg = f"Last upl.:{self.statistics['last']}"
|
||||
ui.set("wigle", msg)
|
||||
|
@ -564,7 +564,8 @@ class WifiInfo(Enum):
|
||||
ESSID = 1
|
||||
ENCRYPTION = 2
|
||||
CHANNEL = 3
|
||||
RSSI = 4
|
||||
FREQUENCY = 4
|
||||
RSSI = 5
|
||||
|
||||
|
||||
class FieldNotFoundError(Exception):
|
||||
@ -594,9 +595,6 @@ def extract_from_pcap(path, fields):
|
||||
"""
|
||||
results = dict()
|
||||
for field in fields:
|
||||
if not isinstance(field, WifiInfo):
|
||||
raise TypeError("Invalid field")
|
||||
|
||||
subtypes = set()
|
||||
|
||||
if field == WifiInfo.BSSID:
|
||||
@ -606,10 +604,9 @@ def extract_from_pcap(path, fields):
|
||||
packets = sniff(offline=path, filter=bpf_filter)
|
||||
try:
|
||||
for packet in packets:
|
||||
if packet.haslayer(Dot11Beacon):
|
||||
if hasattr(packet[Dot11], 'addr3'):
|
||||
results[field] = packet[Dot11].addr3
|
||||
break
|
||||
if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'):
|
||||
results[field] = packet[Dot11].addr3
|
||||
break
|
||||
else: # magic
|
||||
raise FieldNotFoundError("Could not find field [BSSID]")
|
||||
except Exception:
|
||||
@ -654,6 +651,14 @@ def extract_from_pcap(path, fields):
|
||||
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [CHANNEL]")
|
||||
elif field == WifiInfo.FREQUENCY:
|
||||
from scapy.layers.dot11 import sniff, RadioTap
|
||||
from pwnagotchi.mesh.wifi import freq_to_channel
|
||||
packets = sniff(offline=path, count=1)
|
||||
try:
|
||||
results[field] = packets[0][RadioTap].ChannelFrequency
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [FREQUENCY]")
|
||||
elif field == WifiInfo.RSSI:
|
||||
from scapy.layers.dot11 import sniff, RadioTap
|
||||
from pwnagotchi.mesh.wifi import freq_to_channel
|
||||
@ -662,7 +667,8 @@ def extract_from_pcap(path, fields):
|
||||
results[field] = packets[0][RadioTap].dBm_AntSignal
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [RSSI]")
|
||||
|
||||
else:
|
||||
raise TypeError("Invalid field")
|
||||
return results
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user