Merge branch 'noai' into noai

Signed-off-by: 铲屎将军 <37292630+wlmh110@users.noreply.github.com>
This commit is contained in:
铲屎将军
2025-02-17 16:26:59 +08:00
committed by GitHub
12 changed files with 615 additions and 289 deletions

View File

@ -33,7 +33,8 @@ body:
label: Version label: Version
description: What version of our software are you running? description: What version of our software are you running?
options: options:
- 2.9.4-2 - 2.9.5.2
- 2.9.5.3
default: 0 default: 0
validations: validations:
required: true required: true

View File

@ -33,7 +33,8 @@ body:
label: Version label: Version
description: What version of our software are you running? description: What version of our software are you running?
options: options:
- 2.9.4-2 - 2.9.5.2
- 2.9.5.3
default: 0 default: 0
validations: validations:
required: true required: true

View File

@ -1 +1 @@
__version__ = '2.9.5.2' __version__ = '2.9.5.4'

View File

@ -12,29 +12,31 @@ main.custom_plugin_repos = [
"https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip", "https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip",
"https://github.com/NeonLightning/pwny/archive/master.zip", "https://github.com/NeonLightning/pwny/archive/master.zip",
"https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip", "https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip",
"https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip" "https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip",
"https://github.com/cyberartemio/wardriver-pwnagotchi-plugin/archive/main.zip",
] ]
main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/" main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/"
main.plugins.auto-tune.enabled = true main.plugins.auto-tune.enabled = true
main.plugins.auto-update.enabled = false main.plugins.auto-update.enabled = true
main.plugins.auto-update.install = false main.plugins.auto-update.install = true
main.plugins.auto-update.interval = 1 main.plugins.auto-update.interval = 1
main.plugins.bt-tether.enabled = false main.plugins.bt-tether.enabled = false
main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone" main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
main.plugins.bt-tether.mac = "" main.plugins.bt-tether.mac = ""
main.plugins.bt-tether.phone = "" # android or ios main.plugins.bt-tether.phone = "" # android or ios
main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios main.plugins.bt-tether.ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios
main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-)
main.plugins.fix_services.enabled = true main.plugins.fix_services.enabled = true
main.plugins.gdrivesync.enabled = false main.plugins.gdrivesync.enabled = false
main.plugins.gdrivesync.backupfiles = [''] main.plugins.gdrivesync.backupfiles = ['']
main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups" main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups"
main.plugin.gdrivesync.interval = 1 main.plugins.gdrivesync.interval = 1
main.plugins.gpio_buttons.enabled = false main.plugins.gpio_buttons.enabled = false
@ -60,7 +62,7 @@ main.plugins.ohcapi.receive_email = "yes"
main.plugins.pwndroid.enabled = false main.plugins.pwndroid.enabled = false
main.plugins.pwndroid.display = false # show coords on display main.plugins.pwndroid.display = false # show coords on display
main.plugins.pwndroid.display_alitude = false # show altitude on display main.plugins.pwndroid.display_altitude = false # show altitude on display
main.plugins.pisugarx.enabled = false main.plugins.pisugarx.enabled = false
main.plugins.pisugarx.rotation = false main.plugins.pisugarx.rotation = false
@ -86,8 +88,11 @@ main.plugins.webcfg.enabled = true
main.plugins.webgpsmap.enabled = false main.plugins.webgpsmap.enabled = false
main.plugins.wigle.enabled = false main.plugins.wigle.enabled = false
main.plugins.wigle.api_key = "" main.plugins.wigle.api_key = "" # mandatory
main.plugins.wigle.donate = false main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory
main.plugins.wigle.donate = false # default: off
main.plugins.wigle.timeout = 30 # default: 30
main.plugins.wigle.position = [7, 85] # optionnal
main.plugins.wpa-sec.enabled = false main.plugins.wpa-sec.enabled = false
main.plugins.wpa-sec.api_key = "" main.plugins.wpa-sec.api_key = ""
@ -179,10 +184,11 @@ bettercap.handshakes = "/home/pi/handshakes"
bettercap.silence = [ bettercap.silence = [
"ble.device.new", "ble.device.new",
"ble.device.lost", "ble.device.lost",
"ble.device.disconnected",
"ble.device.connected",
"ble.device.service.discovered", "ble.device.service.discovered",
"ble.device.characteristic.discovered", "ble.device.characteristic.discovered",
"ble.device.disconnected",
"ble.device.connected",
"ble.connection.timeout",
"wifi.client.new", "wifi.client.new",
"wifi.client.lost", "wifi.client.lost",
"wifi.client.probe", "wifi.client.probe",

View File

@ -1,10 +1,9 @@
# Handles the commandline stuff
import os import os
import logging import logging
import glob import glob
import re import re
import shutil import shutil
import socket # <-- Added for DNS check
from fnmatch import fnmatch from fnmatch import fnmatch
from pwnagotchi.utils import download_file, unzip, save_config, parse_version, md5 from pwnagotchi.utils import download_file, unzip, save_config, parse_version, md5
from pwnagotchi.plugins import default_path from pwnagotchi.plugins import default_path
@ -164,8 +163,8 @@ def upgrade(args, config, pattern='*'):
installed_version = _extract_version(filename) installed_version = _extract_version(filename)
if installed_version and available_version: if installed_version and available_version:
if available_version <= installed_version: if available_version <= installed_version:
continue continue
else: else:
continue continue
@ -348,12 +347,34 @@ def _analyse_dir(path):
return results return results
def _check_internet():
"""
Simple DNS check to verify that we can resolve a common hostname.
Returns True if DNS resolution succeeds, False otherwise.
"""
try:
socket.gethostbyname('google.com')
return True
except:
return False
def update(config): def update(config):
""" """
Updates the database Updates the database
""" """
global SAVE_DIR global SAVE_DIR
if not _check_internet():
logging.error("No internet connection or DNS not working. Please follow these instructions:")
logging.error("https://github.com/jayofelony/pwnagotchi/wiki/Step-2-Connecting")
print("No internet/DNS. Please follow these instructions:")
print("https://github.com/jayofelony/pwnagotchi/wiki/Step-2-Connecting")
return 1
else:
logging.info("Internet detected - Please run sudo pwnagotchi plugins list")
print("Internet detected - Please run sudo pwnagotchi plugins list")
urls = config['main']['custom_plugin_repos'] urls = config['main']['custom_plugin_repos']
if not urls: if not urls:
logging.info('No plugin repositories configured.') logging.info('No plugin repositories configured.')
@ -393,3 +414,4 @@ def update(config):
logging.error('Error while updating plugins: %s', ex) logging.error('Error while updating plugins: %s', ex)
rc = 1 rc = 1
return rc return rc

View File

@ -1,93 +1,294 @@
import logging import logging
import subprocess import subprocess
import re
import time
from flask import abort, render_template_string
import pwnagotchi.plugins as plugins import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK from pwnagotchi.ui.view import BLACK
TEMPLATE = """
{% extends "base.html" %}
{% set active_page = "bt-tether" %}
{% block title %}
{{ title }}
{% endblock %}
{% block meta %}
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, user-scalable=0" />
{% endblock %}
{% block styles %}
{{ super() }}
<style>
#searchText {
width: 100%;
}
table {
table-layout: auto;
width: 100%;
}
table, th, td {
border: 1px solid;
border-collapse: collapse;
}
th, td {
padding: 15px;
text-align: left;
}
@media screen and (max-width:700px) {
table, tr, td {
padding:0;
border:1px solid;
}
table {
border:none;
}
tr:first-child, thead, th {
display:none;
border:none;
}
tr {
float: left;
width: 100%;
margin-bottom: 2em;
}
td {
float: left;
width: 100%;
padding:1em;
}
td::before {
content:attr(data-label);
word-wrap: break-word;
color: white;
border-right:2px solid;
width: 20%;
float:left;
padding:1em;
font-weight: bold;
margin:-1em 1em -1em -1em;
}
}
</style>
{% endblock %}
{% block script %}
var searchInput = document.getElementById("searchText");
searchInput.onkeyup = function() {
var filter, table, tr, td, i, txtValue;
filter = searchInput.value.toUpperCase();
table = document.getElementById("tableOptions");
if (table) {
tr = table.getElementsByTagName("tr");
for (i = 0; i < tr.length; i++) {
td = tr[i].getElementsByTagName("td")[0];
if (td) {
txtValue = td.textContent || td.innerText;
if (txtValue.toUpperCase().indexOf(filter) > -1) {
tr[i].style.display = "";
}else{
tr[i].style.display = "none";
}
}
}
}
}
{% endblock %}
{% block content %}
<input type="text" id="searchText" placeholder="Search for ..." title="Type in a filter">
<table id="tableOptions">
<tr>
<th>Item</th>
<th>Configuration</th>
</tr>
<tr>
<td data-label="bluetooth">Bluetooth</td>
<td>{{bluetooth|safe}}</td>
</tr>
<tr>
<td data-label="device">Device</td>
<td>{{device|safe}}</td>
</tr>
<tr>
<td data-label="connection">Connection</td>
<td>{{connection|safe}}</td>
</tr>
</table>
{% endblock %}
"""
# We all love crazy regex patterns
MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
class BTTether(plugins.Plugin): class BTTether(plugins.Plugin):
__author__ = 'Jayofelony' __author__ = "Jayofelony, modified my fmatray"
__version__ = '1.2' __version__ = "1.4"
__license__ = 'GPL3' __license__ = "GPL3"
__description__ = 'A new BT-Tether plugin' __description__ = "A new BT-Tether plugin"
def __init__(self): def __init__(self):
self.ready = False self.ready = False
self.options = dict() self.options = dict()
self.status = '-' self.phone_name = None
self.mac = None
@staticmethod
def exec_cmd(cmd, args, pattern=None):
try:
result = subprocess.run([cmd] + args,
check=True, capture_output=True, text=True)
if pattern:
return result.stdout.find(pattern)
return result
except Exception as exp:
logging.error(f"[BT-Tether] Error with {cmd} : {exp}")
raise exp
def bluetoothctl(self, args, pattern=None):
return self.exec_cmd("bluetoothctl", args, pattern)
def nmcli(self, args, pattern=None):
return self.exec_cmd("nmcli", args, pattern)
def on_loaded(self): def on_loaded(self):
logging.info("[BT-Tether] plugin loaded.") logging.info("[BT-Tether] plugin loaded.")
def on_config_changed(self, config): def on_config_changed(self, config):
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']): if "phone-name" not in self.options:
self.ready = False logging.error("[BT-Tether] Phone name not provided")
ip = self.options['ip'] return
mac = self.options['mac'] if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])):
phone_name = self.options['phone-name'] + ' Network' logging.error("[BT-Tether] Error with mac address")
if self.options['phone'].lower() == 'android': return
address = f'{ip}'
gateway = '192.168.44.1' if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]):
elif self.options['phone'].lower() == 'ios': logging.error("[BT-Tether] Phone type not supported")
address = f'{ip}' return
gateway = '172.20.10.1' if self.options["phone"].lower() == "android":
else: address = self.options.get("ip", "192.168.44.2")
logging.error("[BT-Tether] Phone type not supported.") gateway = "192.168.44.1"
elif self.options["phone"].lower() == "ios":
address = self.options.get("ip", "172.20.10.2")
gateway = "172.20.10.1"
if not re.match(IP_PTTRN, address):
logging.error(f"[BT-Tether] IP error: {address}")
return
self.phone_name = self.options["phone-name"] + " Network"
self.mac = self.options["mac"]
dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
if not re.match(DNS_PTTRN, dns):
logging.error(f"[BT-Tether] DNS error: {dns}")
return
dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
try:
# Configure connection. Metric is set to 200 to prefer connection over USB
self.nmcli(["connection", "modify", f"{self.phone_name}",
"connection.type", "bluetooth",
"bluetooth.type", "panu",
"bluetooth.bdaddr", f"{self.mac}",
"connection.autoconnect", "yes",
"connection.autoconnect-retries", "0",
"ipv4.method", "manual",
"ipv4.dns", f"{dns}",
"ipv4.addresses", f"{address}/24",
"ipv4.gateway", f"{gateway}",
"ipv4.route-metric", "200" ])
self.nmcli(["connection", "reload"])
self.ready = True
logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
except Exception as e:
logging.error(f"[BT-Tether] Error while configuring: {e}")
return return
try: try:
subprocess.run([ time.sleep(5) # Give some delay to configure before going up
'nmcli', 'connection', 'modify', f'{phone_name}', self.nmcli(["connection", "up", f"{self.phone_name}"])
'connection.type', 'bluetooth',
'bluetooth.type', 'panu',
'bluetooth.bdaddr', f'{mac}',
'ipv4.method', 'manual',
'ipv4.dns', '8.8.8.8;1.1.1.1;',
'ipv4.addresses', f'{address}',
'ipv4.gateway', f'{gateway}',
'ipv4.route-metric', '100'
], check=True)
subprocess.run(['nmcli', 'connection', 'reload'], check=True)
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
except Exception as e: except Exception as e:
logging.debug(f"[BT-Tether] Failed to connect to device: {e}") logging.error(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?") logging.error(
self.ready = True f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?"
)
def on_ready(self, agent): def on_ready(self, agent):
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']): try:
self.ready = False logging.info(f"[BT-Tether] Disabling bettercap's BLE module")
self.ready = True agent.run("ble.recon off", verbose_errors=False)
except Exception as e:
logging.info(f"[BT-Tether] Bettercap BLE was already off.")
def on_unload(self, ui):
with ui._lock:
ui.remove_element("bluetooth")
try:
self.nmcli(["connection", "down", f"{self.phone_name}"])
except Exception as e:
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
def on_ui_setup(self, ui): def on_ui_setup(self, ui):
with ui._lock: with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-',
position=(ui.width() / 2 - 10, 0), position=(ui.width() / 2 - 10, 0),
label_font=fonts.Bold, text_font=fonts.Medium)) label_font=fonts.Bold, text_font=fonts.Medium))
def on_ui_update(self, ui): def on_ui_update(self, ui):
if self.ready: if not self.ready:
phone_name = self.options['phone-name'] + ' Network' return
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
self.status = 'C'
else:
self.status = '-'
try:
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
except Exception as e:
logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
ui.set('bluetooth', self.status)
return
def on_unload(self, ui):
phone_name = self.options['phone-name'] + ' Network'
with ui._lock: with ui._lock:
ui.remove_element('bluetooth') status = ""
try: try:
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1: # Checking connection
subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True) if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}") "activated") != -1:
else: ui.set("bluetooth", "U")
logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting") return
except Exception as e: else:
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}") ui.set("bluetooth", "D")
status = "BT Conn. down"
# Checking device
if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
"(connected)") != -1:
ui.set("bluetooth", "C")
status += "\nBT dev conn."
else:
ui.set("bluetooth", "-")
status += "\nBT dev disconn."
ui.set("status", status)
except Exception as e:
logging.error(f"[BT-Tether] Error on update: {e}")
def on_webhook(self, path, request):
if not self.ready:
return """<html>
<head><title>BT-tether: Error</title></head>
<body><code>Plugin not ready</code></body>
</html>"""
if path == "/" or not path:
try:
bluetooth = self.bluetoothctl(["info", self.mac])
bluetooth = bluetooth.stdout.replace('\n', '<br>')
except Exception as e:
bluetooth = "Error while checking bluetoothctl"
try:
device = self.nmcli(["-w", "0","device", "show", self.mac])
device = device.stdout.replace('\n', '<br>')
except Exception as e:
device = "Error while checking nmcli device"
try:
connection = self.nmcli(["-w", "0","connection", "show", self.phone_name])
connection = connection.stdout.replace('\n', '<br>')
except Exception as e:
connection = "Error while checking nmcli connection"
logging.debug(device)
return render_template_string(TEMPLATE,
title="BT-Tether",
bluetooth=bluetooth,
device=device,
connection=connection)
abort(404)

View File

@ -13,8 +13,8 @@ import zipfile
class GdriveSync(plugins.Plugin): class GdriveSync(plugins.Plugin):
__author__ = '@jayofelony' __author__ = '@jayofelony & Moist'
__version__ = '1.2' __version__ = '1.4'
__license__ = 'GPL3' __license__ = 'GPL3'
__description__ = 'A plugin to backup various pwnagotchi files and folders to Google Drive. Once every hour from loading plugin.' __description__ = 'A plugin to backup various pwnagotchi files and folders to Google Drive. Once every hour from loading plugin.'
@ -26,12 +26,15 @@ class GdriveSync(plugins.Plugin):
self.status = StatusFile('/root/.gdrive-backup') self.status = StatusFile('/root/.gdrive-backup')
self.backup = True self.backup = True
self.backupfiles = [ self.backupfiles = [
'/root/brain.nn',
'/root/brain.json', '/root/brain.json',
'/root/.api-report.json', '/root/.api-report.json',
'/root/handshakes', '/home/pi/handshakes',
'/root/peers', '/root/peers',
'/etc/pwnagotchi' '/etc/pwnagotchi',
'.etc/profile/',
'/usr/local/share/pwnagotchi/custom-plugins',
'/boot/firmware/config.txt',
'/boot/firmware/cmdline.txt'
] ]
def on_loaded(self): def on_loaded(self):
@ -168,7 +171,7 @@ class GdriveSync(plugins.Plugin):
""" """
self.internet = True self.internet = True
def on_handshake(self, agent): def on_handshake(self, agent, filename, access_point, client_station):
display = agent.view() display = agent.view()
if not self.ready and not self.internet: if not self.ready and not self.internet:
return return

View File

@ -75,20 +75,16 @@ class ohcapi(plugins.Plugin):
return return
# Check if the internet is still available by pinging Google # Check if the internet is still available by pinging Google
self.internet_active = False
try: try:
response = requests.get('https://www.google.com', timeout=5) response = requests.get('https://www.google.com', timeout=5)
if response.status_code == 200:
self.internet_active = True
except requests.ConnectionError: except requests.ConnectionError:
self.internet_active = False
return
if response.status_code == 200:
self.internet_active = True
else:
self.internet_active = False
return return
current_time = time.time() current_time = time.time()
if current_time - self.last_run >= self.options['sleep']: if self.internet_active and current_time - self.last_run >= self.options['sleep']:
self._run_tasks(agent) self._run_tasks(agent)
self.last_run = current_time self.last_run = current_time

View File

@ -277,9 +277,9 @@ class PiSugarServer:
""" """
if (self.modle == "PiSugar2Plus") | (self.modle == "PiSugar3Plus"): if (self.modle == "PiSugar2Plus") | (self.modle == "PiSugar3Plus"):
curve = curve5000 curve = curve5000
elif (self.modle == "PiSugar2"): elif self.modle == "PiSugar2":
curve = curve1200 curve = curve1200
elif (self.modle == "PiSugar3"): elif self.modle == "PiSugar3":
curve = curve1200_3 curve = curve1200_3
# 将当前电压加入历史记录 # 将当前电压加入历史记录
@ -617,6 +617,7 @@ class PiSugar(plugins.Plugin):
# Log at debug to avoid clutter since it might be a false positive # Log at debug to avoid clutter since it might be a false positive
logging.warning(f"[PiSugarX] {e}") logging.warning(f"[PiSugarX] {e}")
def on_internet_available(self, agent): def on_internet_available(self, agent):
self._agent = agent self._agent = agent
self.safe_get(self.ps.rtc_web) self.safe_get(self.ps.rtc_web)

View File

@ -4,212 +4,302 @@ import json
import csv import csv
import requests import requests
import pwnagotchi import pwnagotchi
import re
from io import StringIO from glob import glob
from datetime import datetime
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
from threading import Lock from threading import Lock
from io import StringIO
from datetime import datetime, UTC
from flask import make_response, redirect
from pwnagotchi.utils import (
WifiInfo,
FieldNotFoundError,
extract_from_pcap,
StatusFile,
remove_whitelisted,
)
from pwnagotchi import plugins from pwnagotchi import plugins
from pwnagotchi._version import __version__ as __pwnagotchi_version__ from pwnagotchi._version import __version__ as __pwnagotchi_version__
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import Text
from pwnagotchi.ui.view import BLACK
def _extract_gps_data(path): from scapy.all import Scapy_Exception
"""
Extract data from gps-file
return json-obj
"""
try:
if path.endswith('.geo.json'):
with open(path, 'r') as json_file:
tempJson = json.load(json_file)
d = datetime.utcfromtimestamp(int(tempJson["ts"]))
return {"Latitude": tempJson["location"]["lat"],
"Longitude": tempJson["location"]["lng"],
"Altitude": 10,
"Accuracy": tempJson["accuracy"],
"Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
else:
with open(path, 'r') as json_file:
return json.load(json_file)
except OSError as os_err:
raise os_err
except json.JSONDecodeError as json_err:
raise json_err
def _format_auth(data):
out = ""
for auth in data:
out = f"{out}[{auth}]"
return [f"{auth}" for auth in data]
def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
"""
Transform to wigle entry in file
"""
dummy = StringIO()
# write kismet header
dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
dummy.write(
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
writer.writerow([
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
gps_data['Latitude'],
gps_data['Longitude'],
gps_data['Altitude'],
gps_data['Accuracy'],
'WIFI'])
return dummy.getvalue()
def _send_to_wigle(lines, api_key, donate=True, timeout=30):
"""
Uploads the file to wigle-net
"""
dummy = StringIO()
for line in lines:
dummy.write(f"{line}")
dummy.seek(0)
headers = {"Authorization": f"Basic {api_key}",
"Accept": "application/json",
"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
data = {"donate": "on" if donate else "false"}
payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
try:
res = requests.post('https://api.wigle.net/api/v2/file/upload',
data=data,
headers=headers,
files=payload,
timeout=timeout)
json_res = res.json()
if not json_res['success']:
raise requests.exceptions.RequestException(json_res['message'])
except requests.exceptions.RequestException as re_e:
raise re_e
class Wigle(plugins.Plugin): class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony" __author__ = "Dadav and updated by Jayofelony and fmatray"
__version__ = "3.1.0" __version__ = "4.0.0"
__license__ = "GPL3" __license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net" __description__ = "This plugin automatically uploads collected WiFi to wigle.net"
LABEL_SPACING = 0
def __init__(self): def __init__(self):
self.ready = False self.ready = False
self.report = StatusFile('/root/.wigle_uploads', data_format='json') self.report = None
self.skip = list() self.skip = list()
self.lock = Lock() self.lock = Lock()
self.options = dict() self.options = dict()
self.statistics = dict(
ready=False,
username=None,
rank=None,
monthrank=None,
discoveredwiFi=None,
last=None,
)
self.last_stat = datetime.now(tz=UTC)
self.ui_counter = 0
def on_loaded(self): def on_config_changed(self, config):
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None): self.api_key = self.options.get("api_key", None)
logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net") if not self.api_key:
logging.info("[WIGLE] api_key must be set.")
return return
self.donate = self.options.get("donate", False)
if 'donate' not in self.options: self.handshake_dir = config["bettercap"].get("handshakes")
self.options['donate'] = False report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
self.report = StatusFile(report_filename, data_format="json")
self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", [])
self.timeout = self.options.get("timeout", 30)
self.position = self.options.get("position", (10, 10))
self.ready = True self.ready = True
logging.info("WIGLE: ready") logging.info("[WIGLE] Ready for wardriving!!!")
self.get_statistics(force=True)
def on_webhook(self, path, request): def on_webhook(self, path, request):
from flask import make_response, redirect return make_response(redirect("https://www.wigle.net/", code=302))
response = make_response(redirect("https://www.wigle.net/", code=302))
return response def get_new_gps_files(self, reported):
all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
return set(all_gps_files) - set(reported) - set(self.skip)
@staticmethod
def get_pcap_filename(gps_file):
pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
if not os.path.exists(pcap_filename):
logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
return None
return pcap_filename
@staticmethod
def extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try:
if path.endswith(".geo.json"):
with open(path, "r") as json_file:
tempJson = json.load(json_file)
d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
return {
"Latitude": tempJson["location"]["lat"],
"Longitude": tempJson["location"]["lng"],
"Altitude": 10,
"Accuracy": tempJson["accuracy"],
"Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
}
with open(path, "r") as json_file:
return json.load(json_file)
except (OSError, json.JSONDecodeError) as exp:
raise exp
def get_gps_data(self, gps_file):
try:
gps_data = self.extract_gps_data(gps_file)
except (OSError, json.JSONDecodeError) as exp:
logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
return None
if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
return None
return gps_data
@staticmethod
def get_pcap_data(pcap_filename):
try:
pcap_data = extract_from_pcap(
pcap_filename,
[
WifiInfo.BSSID,
WifiInfo.ESSID,
WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL,
WifiInfo.FREQUENCY,
WifiInfo.RSSI,
],
)
logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
except FieldNotFoundError:
logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
return None
except Scapy_Exception as sc_e:
logging.debug(f"[WIGLE] {sc_e}")
return None
return pcap_data
def generate_csv(self, data):
date = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{pwnagotchi.name()}_{date}.csv"
content = StringIO()
# write kismet header + header
content.write(
f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
)
writer = csv.writer(
content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\"
)
for gps_data, pcap_data in data: # write WIFIs
try:
timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")
writer.writerow(
[
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
timestamp,
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.FREQUENCY],
pcap_data[WifiInfo.RSSI],
gps_data["Latitude"],
gps_data["Longitude"],
gps_data["Altitude"],
gps_data["Accuracy"],
"", # RCOIs to populate
"", # MfgrId always empty
"WIFI",
]
)
content.seek(0)
return filename, content
def save_to_file(self, cvs_filename, cvs_content):
if not self.cvs_dir:
return
filename = os.path.join(self.cvs_dir, cvs_filename)
logging.info(f"[WIGLE] Saving to file {filename}")
try:
with open(filename, mode="w") as f:
f.write(cvs_content.getvalue())
except Exception as exp:
logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
try:
json_res = requests.post(
"https://api.wigle.net/api/v2/file/upload",
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
data={"donate": "on" if self.donate else "false"},
files=dict(file=(cvs_filename, cvs_content, "text/csv")),
timeout=self.timeout,
).json()
if not json_res["success"]:
raise requests.exceptions.RequestException(json_res["message"])
reported += no_err_entries
self.report.update(data={"reported": reported})
logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
except (requests.exceptions.RequestException, OSError) as exp:
self.skip += no_err_entries
logging.debug(f"[WIGLE] Exception while uploading: {exp}")
def upload_new_handshakes(self, reported, new_gps_files, agent):
logging.info("[WIGLE] Uploading new handshakes to wigle.net")
csv_entries, no_err_entries = list(), list()
for gps_file in new_gps_files:
logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
if (
(pcap_filename := self.get_pcap_filename(gps_file))
and (gps_data := self.get_gps_data(gps_file))
and (pcap_data := self.get_pcap_data(pcap_filename))
):
csv_entries.append((gps_data, pcap_data))
no_err_entries.append(gps_file)
else:
self.skip.append(gps_file)
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
if csv_entries:
cvs_filename, cvs_content = self.generate_csv(csv_entries)
self.save_to_file(cvs_filename, cvs_content)
display = agent.view()
display.on_uploading("wigle.net")
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
display.on_normal()
def get_statistics(self, force=False):
if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30:
return
self.last_stat = datetime.now(tz=UTC)
try:
self.statistics["ready"] = False
json_res = requests.get(
"https://api.wigle.net/api/v2/stats/user",
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
timeout=self.timeout,
).json()
if not json_res["success"]:
return
self.statistics["ready"] = True
self.statistics["username"] = json_res["user"]
self.statistics["rank"] = json_res["rank"]
self.statistics["monthrank"] = json_res["monthRank"]
self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
except (requests.exceptions.RequestException, OSError) as exp:
pass
def on_internet_available(self, agent): def on_internet_available(self, agent):
""" if not self.ready:
Called when there's internet connectivity
"""
if not self.ready or self.lock.locked():
return return
with self.lock:
reported = self.report.data_field_or("reported", default=list())
if new_gps_files := self.get_new_gps_files(reported):
self.upload_new_handshakes(reported, new_gps_files, agent)
else:
self.get_statistics()
from scapy.all import Scapy_Exception def on_ui_setup(self, ui):
with ui._lock:
ui.add_element(
"wigle",
Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
)
config = agent.config() def on_unload(self, ui):
display = agent.view() with ui._lock:
reported = self.report.data_field_or('reported', default=list()) ui.remove_element("wigle")
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_gps_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist']) def on_ui_update(self, ui):
new_gps_files = set(all_gps_files) - set(reported) - set(self.skip) if not self.ready:
if new_gps_files: return
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net") with ui._lock:
csv_entries = list() if not self.statistics["ready"]:
no_err_entries = list() ui.set("wigle", "We Will Wait Wigle")
for gps_file in new_gps_files: return
if gps_file.endswith('.gps.json'): msg = "-"
pcap_filename = gps_file.replace('.gps.json', '.pcap') self.ui_counter = (self.ui_counter + 1) % 4
if gps_file.endswith('.geo.json'): if self.ui_counter == 0:
pcap_filename = gps_file.replace('.geo.json', '.pcap') msg = f"User:{self.statistics['username']}"
if not os.path.exists(pcap_filename): if self.ui_counter == 1:
logging.debug("WIGLE: Can't find pcap for %s", gps_file) msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}"
self.skip.append(gps_file) elif self.ui_counter == 2:
continue msg = f"{self.statistics['discoveredwiFi']} discovered WiFis"
try: elif self.ui_counter == 3:
gps_data = _extract_gps_data(gps_file) msg = f"Last upl.:{self.statistics['last']}"
except OSError as os_err: ui.set("wigle", msg)
logging.debug("WIGLE: %s", os_err)
self.skip.append(gps_file)
continue
except json.JSONDecodeError as json_err:
logging.debug("WIGLE: %s", json_err)
self.skip.append(gps_file)
continue
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
self.skip.append(gps_file)
continue
try:
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
WifiInfo.ESSID,
WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL,
WifiInfo.RSSI])
except FieldNotFoundError:
logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
self.skip.append(gps_file)
continue
except Scapy_Exception as sc_e:
logging.debug("WIGLE: %s", sc_e)
self.skip.append(gps_file)
continue
new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
csv_entries.append(new_entry)
no_err_entries.append(gps_file)
if csv_entries:
display.on_uploading('wigle.net')
try:
_send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
reported += no_err_entries
self.report.update(data={'reported': reported})
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
except requests.exceptions.RequestException as re_e:
self.skip += no_err_entries
logging.debug("WIGLE: Got an exception while uploading %s", re_e)
except OSError as os_e:
self.skip += no_err_entries
logging.debug("WIGLE: Got the following error: %s", os_e)
display.on_normal()

View File

@ -1,7 +1,6 @@
import os import os
import logging import logging
import requests import requests
import subprocess
from datetime import datetime from datetime import datetime
from threading import Lock from threading import Lock
from pwnagotchi.utils import StatusFile, remove_whitelisted from pwnagotchi.utils import StatusFile, remove_whitelisted

View File

@ -564,7 +564,8 @@ class WifiInfo(Enum):
ESSID = 1 ESSID = 1
ENCRYPTION = 2 ENCRYPTION = 2
CHANNEL = 3 CHANNEL = 3
RSSI = 4 FREQUENCY = 4
RSSI = 5
class FieldNotFoundError(Exception): class FieldNotFoundError(Exception):
@ -594,9 +595,6 @@ def extract_from_pcap(path, fields):
""" """
results = dict() results = dict()
for field in fields: for field in fields:
if not isinstance(field, WifiInfo):
raise TypeError("Invalid field")
subtypes = set() subtypes = set()
if field == WifiInfo.BSSID: if field == WifiInfo.BSSID:
@ -606,10 +604,9 @@ def extract_from_pcap(path, fields):
packets = sniff(offline=path, filter=bpf_filter) packets = sniff(offline=path, filter=bpf_filter)
try: try:
for packet in packets: for packet in packets:
if packet.haslayer(Dot11Beacon): if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'):
if hasattr(packet[Dot11], 'addr3'): results[field] = packet[Dot11].addr3
results[field] = packet[Dot11].addr3 break
break
else: # magic else: # magic
raise FieldNotFoundError("Could not find field [BSSID]") raise FieldNotFoundError("Could not find field [BSSID]")
except Exception: except Exception:
@ -654,6 +651,14 @@ def extract_from_pcap(path, fields):
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency) results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
except Exception: except Exception:
raise FieldNotFoundError("Could not find field [CHANNEL]") raise FieldNotFoundError("Could not find field [CHANNEL]")
elif field == WifiInfo.FREQUENCY:
from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
packets = sniff(offline=path, count=1)
try:
results[field] = packets[0][RadioTap].ChannelFrequency
except Exception:
raise FieldNotFoundError("Could not find field [FREQUENCY]")
elif field == WifiInfo.RSSI: elif field == WifiInfo.RSSI:
from scapy.layers.dot11 import sniff, RadioTap from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel from pwnagotchi.mesh.wifi import freq_to_channel
@ -662,7 +667,8 @@ def extract_from_pcap(path, fields):
results[field] = packets[0][RadioTap].dBm_AntSignal results[field] = packets[0][RadioTap].dBm_AntSignal
except Exception: except Exception:
raise FieldNotFoundError("Could not find field [RSSI]") raise FieldNotFoundError("Could not find field [RSSI]")
else:
raise TypeError("Invalid field")
return results return results