diff --git a/.gitignore b/.gitignore
index 7e99e367..2a45ed72 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,2 @@
-*.pyc
\ No newline at end of file
+*.pyc
+.vscode
diff --git a/pwnagotchi/_version.py b/pwnagotchi/_version.py
index 2f7fd932..5fb05872 100644
--- a/pwnagotchi/_version.py
+++ b/pwnagotchi/_version.py
@@ -1 +1 @@
-__version__ = '2.9.5.3'
+__version__ = '2.9.5.4'
diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml
index 6cf1fd6e..15820132 100644
--- a/pwnagotchi/defaults.toml
+++ b/pwnagotchi/defaults.toml
@@ -12,7 +12,8 @@ main.custom_plugin_repos = [
"https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip",
"https://github.com/NeonLightning/pwny/archive/master.zip",
"https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip",
- "https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip"
+ "https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip",
+ "https://github.com/cyberartemio/wardriver-pwnagotchi-plugin/archive/main.zip",
]
main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/"
@@ -27,14 +28,17 @@ main.plugins.bt-tether.enabled = false
main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
main.plugins.bt-tether.mac = ""
main.plugins.bt-tether.phone = "" # android or ios
-main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios
+main.plugins.bt-tether.ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios
+main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-)
main.plugins.fix_services.enabled = true
+main.plugins.cache.enabled = true
+
main.plugins.gdrivesync.enabled = false
main.plugins.gdrivesync.backupfiles = ['']
main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups"
-main.plugin.gdrivesync.interval = 1
+main.plugins.gdrivesync.interval = 1
main.plugins.gpio_buttons.enabled = false
@@ -65,6 +69,9 @@ main.plugins.pwndroid.display_altitude = false # show altitude on display
main.plugins.pisugarx.enabled = false
main.plugins.pisugarx.rotation = false
main.plugins.pisugarx.default_display = "percentage"
+main.plugins.pisugarx.lowpower_shutdown = true
+main.plugins.pisugarx.lowpower_shutdown_level = 10 # battery percent at which the device will turn off
+main.plugins.pisugarx.max_charge_voltage_protection = false #It will limit the battery voltage to about 80% to extend battery life
main.plugins.session-stats.enabled = false
main.plugins.session-stats.save_directory = "/var/tmp/pwnagotchi/sessions/"
@@ -83,8 +90,11 @@ main.plugins.webcfg.enabled = true
main.plugins.webgpsmap.enabled = false
main.plugins.wigle.enabled = false
-main.plugins.wigle.api_key = ""
-main.plugins.wigle.donate = false
+main.plugins.wigle.api_key = "" # mandatory
+main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory
+main.plugins.wigle.donate = false # default: off
+main.plugins.wigle.timeout = 30 # default: 30
+main.plugins.wigle.position = [7, 85] # optionnal
main.plugins.wpa-sec.enabled = false
main.plugins.wpa-sec.api_key = ""
@@ -176,10 +186,11 @@ bettercap.handshakes = "/home/pi/handshakes"
bettercap.silence = [
"ble.device.new",
"ble.device.lost",
- "ble.device.disconnected",
- "ble.device.connected",
"ble.device.service.discovered",
"ble.device.characteristic.discovered",
+ "ble.device.disconnected",
+ "ble.device.connected",
+ "ble.connection.timeout",
"wifi.client.new",
"wifi.client.lost",
"wifi.client.probe",
diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py
index 9048492c..dc12e71d 100644
--- a/pwnagotchi/plugins/default/bt-tether.py
+++ b/pwnagotchi/plugins/default/bt-tether.py
@@ -1,93 +1,328 @@
import logging
import subprocess
+import re
+import time
+from flask import abort, render_template_string
import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
+TEMPLATE = """
+{% extends "base.html" %}
+{% set active_page = "bt-tether" %}
+{% block title %}
+ {{ title }}
+{% endblock %}
+{% block meta %}
+
+
+{% endblock %}
+{% block styles %}
+{{ super() }}
+
+{% endblock %}
+{% block script %}
+ var searchInput = document.getElementById("searchText");
+ searchInput.onkeyup = function() {
+ var filter, table, tr, td, i, txtValue;
+ filter = searchInput.value.toUpperCase();
+ table = document.getElementById("tableOptions");
+ if (table) {
+ tr = table.getElementsByTagName("tr");
+
+ for (i = 0; i < tr.length; i++) {
+ td = tr[i].getElementsByTagName("td")[0];
+ if (td) {
+ txtValue = td.textContent || td.innerText;
+ if (txtValue.toUpperCase().indexOf(filter) > -1) {
+ tr[i].style.display = "";
+ }else{
+ tr[i].style.display = "none";
+ }
+ }
+ }
+ }
+ }
+{% endblock %}
+{% block content %}
+
+
+
+ Item |
+ Configuration |
+
+
+ Bluetooth |
+ {{bluetooth|safe}} |
+
+
+ Device |
+ {{device|safe}} |
+
+
+ Connection |
+ {{connection|safe}} |
+
+
+{% endblock %}
+"""
+
+# We all love crazy regex patterns
+MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
+IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
+DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
+
+
class BTTether(plugins.Plugin):
- __author__ = 'Jayofelony'
- __version__ = '1.2'
- __license__ = 'GPL3'
- __description__ = 'A new BT-Tether plugin'
+ __author__ = "Jayofelony, modified my fmatray"
+ __version__ = "1.4"
+ __license__ = "GPL3"
+ __description__ = "A new BT-Tether plugin"
def __init__(self):
self.ready = False
self.options = dict()
- self.status = '-'
+ self.phone_name = None
+ self.mac = None
+
+ @staticmethod
+ def exec_cmd(cmd, args, pattern=None):
+ try:
+ result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True)
+ if pattern:
+ return result.stdout.find(pattern)
+ return result
+ except Exception as exp:
+ logging.error(f"[BT-Tether] Error with {cmd}")
+ logging.error(f"[BT-Tether] Exception : {exp}")
+ raise exp
+
+ def bluetoothctl(self, args, pattern=None):
+ return self.exec_cmd("bluetoothctl", args, pattern)
+
+ def nmcli(self, args, pattern=None):
+ return self.exec_cmd("nmcli", args, pattern)
def on_loaded(self):
logging.info("[BT-Tether] plugin loaded.")
def on_config_changed(self, config):
- if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
- self.ready = False
- ip = self.options['ip']
- mac = self.options['mac']
- phone_name = self.options['phone-name'] + ' Network'
- if self.options['phone'].lower() == 'android':
- address = f'{ip}'
- gateway = '192.168.44.1'
- elif self.options['phone'].lower() == 'ios':
- address = f'{ip}'
- gateway = '172.20.10.1'
- else:
- logging.error("[BT-Tether] Phone type not supported.")
+ if "phone-name" not in self.options:
+ logging.error("[BT-Tether] Phone name not provided")
+ return
+ if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])):
+ logging.error("[BT-Tether] Error with mac address")
+ return
+
+ if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]):
+ logging.error("[BT-Tether] Phone type not supported")
+ return
+ if self.options["phone"].lower() == "android":
+ address = self.options.get("ip", "192.168.44.2")
+ gateway = "192.168.44.1"
+ elif self.options["phone"].lower() == "ios":
+ address = self.options.get("ip", "172.20.10.2")
+ gateway = "172.20.10.1"
+ if not re.match(IP_PTTRN, address):
+ logging.error(f"[BT-Tether] IP error: {address}")
+ return
+
+ self.phone_name = self.options["phone-name"] + " Network"
+ self.mac = self.options["mac"]
+ dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
+ if not re.match(DNS_PTTRN, dns):
+ if dns == "":
+ logging.error(f"[BT-Tether] Empty DNS setting")
+ else:
+ logging.error(f"[BT-Tether] Wrong DNS setting: '{dns}'")
+ return
+ dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
+
+ try:
+ # Configure connection. Metric is set to 200 to prefer connection over USB
+ self.nmcli(
+ [
+ "connection", "modify", f"{self.phone_name}",
+ "connection.type", "bluetooth",
+ "bluetooth.type", "panu",
+ "bluetooth.bdaddr", f"{self.mac}",
+ "connection.autoconnect", "yes",
+ "connection.autoconnect-retries", "0",
+ "ipv4.method", "manual",
+ "ipv4.dns", f"{dns}",
+ "ipv4.addresses", f"{address}/24",
+ "ipv4.gateway", f"{gateway}",
+ "ipv4.route-metric", "200",
+ ]
+ )
+ # Configure Device to autoconnect
+ self.nmcli([
+ "device", "set", f"{self.mac}",
+ "autoconnect", "yes",
+ "managed", "yes"
+ ])
+ self.nmcli(["connection", "reload"])
+ self.ready = True
+ logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
+ except Exception as e:
+ logging.error(f"[BT-Tether] Error while configuring: {e}")
return
try:
- subprocess.run([
- 'nmcli', 'connection', 'modify', f'{phone_name}',
- 'connection.type', 'bluetooth',
- 'bluetooth.type', 'panu',
- 'bluetooth.bdaddr', f'{mac}',
- 'ipv4.method', 'manual',
- 'ipv4.dns', '8.8.8.8 1.1.1.1',
- 'ipv4.addresses', f'{address}/24',
- 'ipv4.gateway', f'{gateway}',
- 'ipv4.route-metric', '100'
- ], check=True)
- subprocess.run(['nmcli', 'connection', 'reload'], check=True)
- subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
+ time.sleep(5) # Give some delay to configure before going up
+ self.nmcli(["connection", "up", f"{self.phone_name}"])
except Exception as e:
logging.error(f"[BT-Tether] Failed to connect to device: {e}")
- logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
- self.ready = True
+ logging.error(
+ f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?"
+ )
def on_ready(self, agent):
- if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
- self.ready = False
- self.ready = True
+ try:
+ logging.info(f"[BT-Tether] Disabling bettercap's BLE module")
+ agent.run("ble.recon off", verbose_errors=False)
+ except Exception as e:
+ logging.info(f"[BT-Tether] Bettercap BLE was already off.")
+
+ def on_unload(self, ui):
+ with ui._lock:
+ ui.remove_element("bluetooth")
+ try:
+ self.nmcli(["connection", "down", f"{self.phone_name}"])
+ except Exception as e:
+ logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
def on_ui_setup(self, ui):
with ui._lock:
- ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-',
- position=(ui.width() / 2 - 10, 0),
- label_font=fonts.Bold, text_font=fonts.Medium))
+ ui.add_element(
+ "bluetooth",
+ LabeledValue(
+ color=BLACK,
+ label="BT",
+ value="-",
+ position=(ui.width() / 2 - 10, 0),
+ label_font=fonts.Bold,
+ text_font=fonts.Medium,
+ ),
+ )
def on_ui_update(self, ui):
- if self.ready:
- phone_name = self.options['phone-name'] + ' Network'
- if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
- self.status = 'C'
- else:
- self.status = '-'
- try:
- subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
- except Exception as e:
- logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
- logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
- ui.set('bluetooth', self.status)
- return
-
- def on_unload(self, ui):
- phone_name = self.options['phone-name'] + ' Network'
+ if not self.ready:
+ return
with ui._lock:
- ui.remove_element('bluetooth')
- try:
- if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
- subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True)
- logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}")
- else:
- logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting")
- except Exception as e:
- logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
\ No newline at end of file
+ status = ""
+ try:
+ # Checking connection
+ if (
+ self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
+ "activated",
+ )
+ != -1
+ ):
+ ui.set("bluetooth", "U")
+ return
+ else:
+ ui.set("bluetooth", "D")
+ status = "BT Conn. down"
+
+ # Checking device
+ if (
+ self.nmcli(
+ ["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
+ "(connected)",
+ )
+ != -1
+ ):
+ ui.set("bluetooth", "C")
+ status += "\nBT dev conn."
+ else:
+ ui.set("bluetooth", "-")
+ status += "\nBT dev disconn."
+ ui.set("status", status)
+ except Exception as e:
+ logging.error(f"[BT-Tether] Error on update: {e}")
+
+ def on_webhook(self, path, request):
+ if not self.ready:
+ return """
+ BT-tether: Error
+ Plugin not ready
+ """
+ if path == "/" or not path:
+ try:
+ bluetooth = self.bluetoothctl(["info", self.mac])
+ bluetooth = bluetooth.stdout.replace("\n", "
")
+ except Exception as e:
+ bluetooth = "Error while checking bluetoothctl"
+
+ try:
+ device = self.nmcli(["-w", "0", "device", "show", self.mac])
+ device = device.stdout.replace("\n", "
")
+ except Exception as e:
+ device = "Error while checking nmcli device"
+
+ try:
+ connection = self.nmcli(["-w", "0", "connection", "show", self.phone_name])
+ connection = connection.stdout.replace("\n", "
")
+ except Exception as e:
+ connection = "Error while checking nmcli connection"
+
+ logging.debug(device)
+ return render_template_string(
+ TEMPLATE,
+ title="BT-Tether",
+ bluetooth=bluetooth,
+ device=device,
+ connection=connection,
+ )
+ abort(404)
diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py
new file mode 100644
index 00000000..f3b826ed
--- /dev/null
+++ b/pwnagotchi/plugins/default/cache.py
@@ -0,0 +1,119 @@
+import logging
+import json
+import os
+import re
+import pathlib
+import pwnagotchi.plugins as plugins
+from datetime import datetime, UTC
+from threading import Lock
+
+
+def read_ap_cache(cache_dir, file):
+ cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file))
+ cache_filename = os.path.join(cache_dir, cache_filename)
+ if not os.path.exists(cache_filename):
+ logging.info("Cache not exist")
+ return None
+ try:
+ with open(cache_filename, "r") as f:
+ return json.load(f)
+ except Exception as e:
+ logging.info(f"Exception {e}")
+ return None
+
+
+class Cache(plugins.Plugin):
+ __author__ = "fmatray"
+ __version__ = "1.0.0"
+ __license__ = "GPL3"
+ __description__ = "A simple plugin to cache AP informations"
+
+ def __init__(self):
+ self.options = dict()
+ self.ready = False
+ self.lock = Lock()
+
+ def on_loaded(self):
+ logging.info("[CACHE] plugin loaded.")
+
+ def on_config_changed(self, config):
+ try:
+ handshake_dir = config["bettercap"].get("handshakes")
+ self.cache_dir = os.path.join(handshake_dir, "cache")
+ os.makedirs(self.cache_dir, exist_ok=True)
+ except Exception:
+ logging.info(f"[CACHE] Cannot access to the cache directory")
+ return
+ self.last_clean = datetime.now(tz=UTC)
+ self.ready = True
+ logging.info(f"[CACHE] Cache plugin configured")
+ self.clean_ap_cache()
+
+ def on_unload(self, ui):
+ self.clean_ap_cache()
+
+ def clean_ap_cache(self):
+ if not self.ready:
+ return
+ with self.lock:
+ ctime = datetime.now(tz=UTC)
+ cache_to_delete = list()
+ for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"):
+ try:
+ mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC)
+ if (ctime - mtime).total_seconds() > 60 * 5:
+ cache_to_delete.append(cache_file)
+ except FileNotFoundError:
+ pass
+ if cache_to_delete:
+ logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files")
+ for cache_file in cache_to_delete:
+ try:
+ cache_file.unlink()
+ except FileNotFoundError as e:
+ pass
+
+ def write_ap_cache(self, access_point):
+ with self.lock:
+ try:
+ mac = access_point["mac"].replace(":", "")
+ hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"])
+ except KeyError:
+ return
+ cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache")
+ try:
+ with open(cache_file, "w") as f:
+ json.dump(access_point, f)
+ except Exception as e:
+ logging.error(f"[CACHE] Cannot write {cache_file}: {e}")
+ pass
+
+ def on_wifi_update(self, agent, access_points):
+ if self.ready:
+ for ap in filter(lambda ap: ap["hostname"] not in ["", ""], access_points):
+ self.write_ap_cache(ap)
+
+ def on_unfiltered_ap_list(self, agent, aps):
+ if self.ready:
+ for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps):
+ self.write_ap_cache(ap)
+
+ def on_association(self, agent, access_point):
+ if self.ready:
+ self.write_ap_cache(access_point)
+
+ def on_deauthentication(self, agent, access_point, client_station):
+ if self.ready:
+ self.write_ap_cache(access_point)
+
+ def on_handshake(self, agent, filename, access_point, client_station):
+ if self.ready:
+ self.write_ap_cache(access_point)
+
+ def on_ui_update(self, ui):
+ if not self.ready:
+ return
+ current_time = datetime.now(tz=UTC)
+ if (current_time - self.last_clean).total_seconds() > 60:
+ self.clean_ap_cache()
+ self.last_clean = current_time
diff --git a/pwnagotchi/plugins/default/ohcapi.py b/pwnagotchi/plugins/default/ohcapi.py
index b400c572..5aa6ac5d 100644
--- a/pwnagotchi/plugins/default/ohcapi.py
+++ b/pwnagotchi/plugins/default/ohcapi.py
@@ -75,20 +75,16 @@ class ohcapi(plugins.Plugin):
return
# Check if the internet is still available by pinging Google
+ self.internet_active = False
try:
response = requests.get('https://www.google.com', timeout=5)
+ if response.status_code == 200:
+ self.internet_active = True
except requests.ConnectionError:
- self.internet_active = False
- return
-
- if response.status_code == 200:
- self.internet_active = True
- else:
- self.internet_active = False
return
current_time = time.time()
- if current_time - self.last_run >= self.options['sleep']:
+ if self.internet_active and current_time - self.last_run >= self.options['sleep']:
self._run_tasks(agent)
self.last_run = current_time
diff --git a/pwnagotchi/plugins/default/pisugarx.py b/pwnagotchi/plugins/default/pisugarx.py
index 8d7daca6..c8c33a21 100644
--- a/pwnagotchi/plugins/default/pisugarx.py
+++ b/pwnagotchi/plugins/default/pisugarx.py
@@ -29,7 +29,7 @@ curve1200 = [
(3.49, 3.2),
(3.1, 0.0),
]
-curve1200_3= [
+curve1200_3 = [
(4.2, 100.0), # 高电量阶段 (100%)
(4.0, 80.0), # 中电量阶段 (80%)
(3.7, 60.0), # 中电量阶段 (60%)
@@ -50,26 +50,42 @@ curve5000 = [
]
+
+
class PiSugarServer:
def __init__(self):
"""
PiSugar initialization, if unable to connect to any version of PiSugar, return false
"""
self._bus = smbus.SMBus(1)
+ self.ready = False
self.modle = None
self.i2creg = []
self.address = 0
- self.battery_voltage = 0
- self.voltage_history = deque(maxlen=10)
+ self.battery_voltage = 0.00
+ self.voltage_history = deque(maxlen=10)
self.battery_level = 0
self.battery_charging = 0
self.temperature = 0
self.power_plugged = False
self.allow_charging = True
+ self.lowpower_shutdown = False
+ self.lowpower_shutdown_level = 10
+ self.max_charge_voltage_protection = False
+ self.max_protection_level=80
+ # Start the device connection in a background thread
+ self.connection_thread = threading.Thread(
+ target=self._connect_device, daemon=True)
+ self.connection_thread.start()
+
+ def _connect_device(self):
+ """
+ Attempt to connect to the PiSugar device in a background thread.
+ """
while self.modle is None:
if self.check_device(PiSugar_addresses["PiSugar2"]) is not None:
self.address = PiSugar_addresses["PiSugar2"]
- if self.check_device(PiSugar_addresses["PiSugar2"], 0Xc2) != 0:
+ if self.check_device(PiSugar_addresses["PiSugar2"], 0xC2) != 0:
self.modle = "PiSugar2Plus"
else:
self.modle = "PiSugar2"
@@ -77,16 +93,20 @@ class PiSugarServer:
elif self.check_device(PiSugar_addresses["PiSugar3"]) is not None:
self.modle = 'PiSugar3'
self.address = PiSugar_addresses["PiSugar3"]
+ self.device_init()
else:
self.modle = None
- logging.error(
- "No PiSugar device was found. Please check if the PiSugar device is powered on.")
+ logging.info(
+ "No PiSugar device was found. Please check if the PiSugar device is powered on."
+ )
time.sleep(5)
-
- # self.update_value()
+ logging.info(f"{self.modle} is connected")
+ # Once connected, start the timer
self.start_timer()
while len(self.i2creg) < 256:
time.sleep(1)
+ self.ready = True
+ logging.info(f"{self.modle} is ready")
def start_timer(self):
@@ -99,6 +119,9 @@ class PiSugarServer:
"""每三秒更新pisugar状态,包括触发自动关机"""
while True:
try:
+ if( self.modle == 'PiSugar2') | (self.modle == 'PiSugar2Plus'):
+ self.set_battery_notallow_charging() #短暂关闭充电以获取准确电池电压
+ time.sleep(0.05)
self.i2creg = []
for i in range(0, 256, 32):
# 计算当前读取的起始寄存器地址
@@ -121,6 +144,20 @@ class PiSugarServer:
ctr1 = self.i2creg[0x02] # 读取控制寄存器 1
self.power_plugged = (ctr1 & (1 << 7)) != 0 # 检查电源是否插入
self.allow_charging = (ctr1 & (1 << 6)) != 0 # 检查是否允许充电
+ if self.max_charge_voltage_protection:
+ self._bus.write_byte_data(
+ self.address, 0x0B, 0x29) # 关闭写保护
+ self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
+ self.address, 0x20) | 0b10000000)
+ self._bus.write_byte_data(
+ self.address, 0x0B, 0x00) # 开启写保护
+ else:
+ self._bus.write_byte_data(
+ self.address, 0x0B, 0x29) # 关闭写保护
+ self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
+ self.address, 0x20) & 0b01111111)
+ self._bus.write_byte_data(
+ self.address, 0x0B, 0x00) # 开启写保护
elif self.modle == 'PiSugar2':
high = self.i2creg[0xa3]
low = self.i2creg[0xa2]
@@ -129,20 +166,60 @@ class PiSugarServer:
2600.0 + (((high & 0x1f) << 8) + low) * 0.26855) / 1000.0
self.power_plugged = (self.i2creg[0x55] & 0b00010000) != 0
+ if self.max_charge_voltage_protection:
+ self.voltage_history.append(self.battery_voltage)
+ self.battery_level = self.convert_battery_voltage_to_level()
+ if (self.battery_level) > self.max_protection_level:
+ self.set_battery_notallow_charging()
+ else:
+ self.set_battery_allow_charging()
+ else:
+ self.set_battery_allow_charging()
+
elif self.modle == 'PiSugar2Plus':
low = self.i2creg[0xd0]
high = self.i2creg[0xd1]
self.battery_voltage = (
(((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000
self.power_plugged = self.i2creg[0xdd] == 0x1f
-
+ if self.max_charge_voltage_protection:
+ self.voltage_history.append(self.battery_voltage)
+ self.battery_level = self.convert_battery_voltage_to_level()
+ if (self.battery_level) > self.max_protection_level:
+ self.set_battery_notallow_charging()
+ else:
+ self.set_battery_allow_charging()
+ else:
+ self.set_battery_allow_charging()
+
self.voltage_history.append(self.battery_voltage)
- self.battery_level=self.convert_battery_voltage_to_level()
+ self.battery_level = self.convert_battery_voltage_to_level()
+
+ if self.lowpower_shutdown:
+ if self.battery_level < self.lowpower_shutdown_level:
+ logging.info("[PiSugarX] low power shutdown now.")
+ self.shutdown()
+ pwnagotchi.shutdown()
time.sleep(3)
- except:
- logging.error(f"read error")
+ except Exception as e:
+ logging.error(f"read error{e}")
time.sleep(3)
+ def shutdown(self):
+ # logging.info("[PiSugarX] PiSugar set shutdown .")
+ if self.modle == 'PiSugar3':
+ # 10秒后关闭电源
+ self._bus.write_byte_data(self.address, 0x0B, 0x29) # 关闭写保护
+ self._bus.write_byte_data(self.address, 0x09, 10)
+ self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data(
+ self.address, 0x02) & 0b11011111)
+ self._bus.write_byte_data(self.address, 0x0B, 0x00) # 开启写保护
+ logging.info("[PiSugarX] PiSugar shutdown in 10s.")
+ elif self.modle == 'PiSugar2':
+ pass
+ elif self.modle == 'PiSugar2Plus':
+ pass
+
def check_device(self, address, reg=0):
"""Check if a device is present at the specified address"""
try:
@@ -278,6 +355,58 @@ class PiSugarServer:
"""
return self.allow_charging
+ def set_battery_allow_charging(self):
+ if self.modle == 'PiSugar3':
+ pass
+ elif self.modle == 'PiSugar2':
+ # 禁止 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
+ self.address, 0x54) & 0b11111011)
+ # 开启充电
+ self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data(
+ self.address, 0x55) & 0b11111011)
+ # 开启 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
+ self.address, 0x54) | 0b00000100)
+ elif self.modle == 'PiSugar2Plus':
+ # 禁止 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
+ self.address, 0x56) & 0b11111011)
+ # 开启充电
+ self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data(
+ self.address, 0x58) & 0b11111011)
+ # 开启 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
+ self.address, 0x56) | 0b00000100)
+
+ return
+
+ def set_battery_notallow_charging(self):
+ if self.modle == 'PiSugar3':
+ pass
+ elif self.modle == 'PiSugar2':
+ # 禁止 gpio2 输出
+ print(self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
+ self.address, 0x54) & 0b11111011))
+ # 关闭充电
+ self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data(
+ self.address, 0x55) | 0b00000100)
+ # 开启 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
+ self.address, 0x54) | 0b00000100)
+ elif self.modle == 'PiSugar2Plus':
+ # 禁止 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
+ self.address, 0x56) & 0b11111011)
+ # 关闭充电
+ self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data(
+ self.address, 0x58) | 0b00000100)
+ # 开启 gpio2 输出
+ self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
+ self.address, 0x56) | 0b00000100)
+
+ return
+
def get_battery_charging_range(self):
"""
Get the battery charging range.
@@ -406,6 +535,8 @@ class PiSugarServer:
"""
pass
+
+
class PiSugar(plugins.Plugin):
__author__ = "jayofelony"
__version__ = "1.2"
@@ -418,14 +549,25 @@ class PiSugar(plugins.Plugin):
def __init__(self):
self._agent = None
- self.is_new_model = False
self.options = dict()
+ """
+ self.options = {
+ 'enabled': True,
+ 'rotation': False,
+ 'default_display': 'percentage',
+ 'lowpower_shutdown': True,
+ 'lowpower_shutdown_level': 10,
+ 'max_charge_voltage_protection': True
+ }
+ """
self.ps = None
+ # logging.debug(f"[PiSugarX] {self.options}")
try:
self.ps = PiSugarServer()
except Exception as e:
# Log at debug to avoid clutter since it might be a false positive
- logging.debug("[PiSugarX] Unable to establish connection: %s", repr(e))
+ logging.debug(
+ "[PiSugarX] Unable to establish connection: %s", repr(e))
self.ready = False
self.lasttemp = 69
@@ -444,7 +586,8 @@ class PiSugar(plugins.Plugin):
try:
return func()
except Exception as e:
- logging.debug("[PiSugarX] Failed to get data using %s: %s", func.__name__, e)
+ logging.debug(
+ "[PiSugarX] Failed to get data using %s: %s", func.__name__, e)
return default
def on_loaded(self):
@@ -455,15 +598,25 @@ class PiSugar(plugins.Plugin):
valid_displays = ['voltage', 'percentage', 'temp']
if self.default_display not in valid_displays:
- logging.warning(f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.")
+ logging.warning(
+ f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.")
self.default_display = 'voltage'
- logging.info(f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.")
- logging.info(f"[PiSugarX] Default display (when rotation disabled): {self.default_display}")
+ logging.info(
+ f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.")
+ logging.info(
+ f"[PiSugarX] Default display (when rotation disabled): {self.default_display}")
+ self.ps.lowpower_shutdown = self.options['lowpower_shutdown']
+ self.ps.lowpower_shutdown_level = self.options['lowpower_shutdown_level']
+ self.ps.max_charge_voltage_protection = self.options['max_charge_voltage_protection']
def on_ready(self, agent):
- self.ready = True
- self._agent = agent
+ try:
+ self.ready = self.ps.ready
+ except Exception as e:
+ # Log at debug to avoid clutter since it might be a false positive
+ logging.warning(f"[PiSugarX] {e}")
+
def on_internet_available(self, agent):
self._agent = agent
@@ -477,30 +630,52 @@ class PiSugar(plugins.Plugin):
try:
if request.method == "GET":
if path == "/" or not path:
- version = self.safe_get(self.ps.get_version, default='Unknown')
+ version = self.safe_get(
+ self.ps.get_version, default='Unknown')
model = self.safe_get(self.ps.get_model, default='Unknown')
- battery_level = self.safe_get(self.ps.get_battery_level, default='N/A')
- battery_voltage = self.safe_get(self.ps.get_battery_voltage, default='N/A')
- battery_current = self.safe_get(self.ps.get_battery_current, default='N/A')
- battery_allow_charging = self.safe_get(self.ps.get_battery_allow_charging, default=False)
- battery_charging_range = self.safe_get(self.ps.get_battery_charging_range, default='N/A') if self.is_new_model or model == 'Pisugar 3' else 'Not supported'
- battery_full_charge_duration = getattr(self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')()
- safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=None)
+ battery_level = self.safe_get(
+ self.ps.get_battery_level, default='N/A')
+ battery_voltage = self.safe_get(
+ self.ps.get_battery_voltage, default='N/A')
+ battery_current = self.safe_get(
+ self.ps.get_battery_current, default='N/A')
+ battery_allow_charging = self.safe_get(
+ self.ps.get_battery_allow_charging, default=False)
+ battery_charging_range = self.safe_get(
+ self.ps.get_battery_charging_range, default='N/A')
+ battery_full_charge_duration = getattr(
+ self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')()
+ safe_shutdown_level = self.safe_get(
+ self.ps.get_battery_safe_shutdown_level, default=None)
battery_safe_shutdown_level = f"{safe_shutdown_level}%" if safe_shutdown_level is not None else 'Not set'
- battery_safe_shutdown_delay = self.safe_get(self.ps.get_battery_safe_shutdown_delay, default='N/A')
- battery_auto_power_on = self.safe_get(self.ps.get_battery_auto_power_on, default=False)
- battery_soft_poweroff = self.safe_get(self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False
- system_time = self.safe_get(self.ps.get_system_time, default='N/A')
- rtc_adjust_ppm = self.safe_get(self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported'
- rtc_alarm_repeat = self.safe_get(self.ps.get_rtc_alarm_repeat, default='N/A')
- single_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='single'), default=False)
- double_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='double'), default=False)
- long_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='long'), default=False)
- single_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='single'), default='N/A')
- double_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='double'), default='N/A')
- long_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='long'), default='N/A')
- anti_mistouch = self.safe_get(self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False
- temperature = self.safe_get(self.ps.get_temperature, default='N/A')
+ battery_safe_shutdown_delay = self.safe_get(
+ self.ps.get_battery_safe_shutdown_delay, default='N/A')
+ battery_auto_power_on = self.safe_get(
+ self.ps.get_battery_auto_power_on, default=False)
+ battery_soft_poweroff = self.safe_get(
+ self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False
+ system_time = self.safe_get(
+ self.ps.get_system_time, default='N/A')
+ rtc_adjust_ppm = self.safe_get(
+ self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported'
+ rtc_alarm_repeat = self.safe_get(
+ self.ps.get_rtc_alarm_repeat, default='N/A')
+ single_tap_enabled = self.safe_get(
+ lambda: self.ps.get_tap_enable(tap='single'), default=False)
+ double_tap_enabled = self.safe_get(
+ lambda: self.ps.get_tap_enable(tap='double'), default=False)
+ long_tap_enabled = self.safe_get(
+ lambda: self.ps.get_tap_enable(tap='long'), default=False)
+ single_tap_shell = self.safe_get(
+ lambda: self.ps.get_tap_shell(tap='single'), default='N/A')
+ double_tap_shell = self.safe_get(
+ lambda: self.ps.get_tap_shell(tap='double'), default='N/A')
+ long_tap_shell = self.safe_get(
+ lambda: self.ps.get_tap_shell(tap='long'), default='N/A')
+ anti_mistouch = self.safe_get(
+ self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False
+ temperature = self.safe_get(
+ self.ps.get_temperature, default='N/A')
ret = '''
@@ -561,7 +736,7 @@ class PiSugar(plugins.Plugin):
Battery Level | {battery_level}% |
Battery Voltage | {battery_voltage}V |
Battery Current | {battery_current}A |
- Battery Allow Charging | {"Yes" if battery_allow_charging and self.is_new_model else "No"} |
+ Battery Allow Charging | {"Yes" if battery_allow_charging else "No"} |
Battery Charging Range | {battery_charging_range} |
Duration of Keep Charging When Full | {battery_full_charge_duration} seconds |
Battery Safe Shutdown Level | {battery_safe_shutdown_level} |
@@ -628,13 +803,25 @@ class PiSugar(plugins.Plugin):
# Make sure "bat" is in the UI state (guard to prevent KeyError)
if 'bat' not in ui._state._state:
return
+ try:
+ self.ready = self.ps.ready
+ except Exception as e:
+ # Log at debug to avoid clutter since it might be a false positive
+ logging.warning(f"[PiSugarX] {e}")
+ if self.ready:
+ capacity = self.safe_get(self.ps.get_battery_level, default=0)
+ voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00)
+ temp = self.safe_get(self.ps.get_temperature, default=0)
- capacity = self.safe_get(self.ps.get_battery_level, default=0)
- voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00)
- temp = self.safe_get(self.ps.get_temperature, default=0)
+ else:
+ capacity = 0
+ voltage = 0.00
+ temp = 0
+ logging.info(f"[PiSugarX] PiSugar is not ready")
# Check if battery is plugged in
- battery_plugged = self.safe_get(self.ps.get_battery_power_plugged, default=False)
+ battery_plugged = self.safe_get(
+ self.ps.get_battery_power_plugged, default=False)
if battery_plugged:
# If plugged in, display "CHG"
@@ -663,13 +850,3 @@ class PiSugar(plugins.Plugin):
ui.set('bat', f"{capacity:.0f}%")
elif self.default_display == 'temp':
ui.set('bat', f"{temp}°C")
-
- charging = self.safe_get(self.ps.get_battery_charging, default=None)
- safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=0)
- if charging is not None:
- if capacity <= safe_shutdown_level:
- logging.info(
- f"[PiSugarX] Empty battery (<= {safe_shutdown_level}%): shutting down"
- )
- ui.update(force=True, new_data={"status": "Battery exhausted, bye ..."})
-
diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py
index 5577ebee..5000629d 100644
--- a/pwnagotchi/plugins/default/wigle.py
+++ b/pwnagotchi/plugins/default/wigle.py
@@ -4,212 +4,368 @@ import json
import csv
import requests
import pwnagotchi
-
-from io import StringIO
-from datetime import datetime
-from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
+import re
+from glob import glob
from threading import Lock
+from io import StringIO
+from datetime import datetime, UTC
+from dataclasses import dataclass
+
+from flask import make_response, redirect
+from pwnagotchi.utils import (
+ WifiInfo,
+ FieldNotFoundError,
+ extract_from_pcap,
+ StatusFile,
+ remove_whitelisted,
+)
from pwnagotchi import plugins
+from pwnagotchi.plugins.default.cache import read_ap_cache
from pwnagotchi._version import __version__ as __pwnagotchi_version__
+import pwnagotchi.ui.fonts as fonts
+from pwnagotchi.ui.components import Text
+from pwnagotchi.ui.view import BLACK
-def _extract_gps_data(path):
- """
- Extract data from gps-file
-
- return json-obj
- """
-
- try:
- if path.endswith('.geo.json'):
- with open(path, 'r') as json_file:
- tempJson = json.load(json_file)
- d = datetime.utcfromtimestamp(int(tempJson["ts"]))
- return {"Latitude": tempJson["location"]["lat"],
- "Longitude": tempJson["location"]["lng"],
- "Altitude": 10,
- "Accuracy": tempJson["accuracy"],
- "Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
- else:
- with open(path, 'r') as json_file:
- return json.load(json_file)
- except OSError as os_err:
- raise os_err
- except json.JSONDecodeError as json_err:
- raise json_err
+from scapy.all import Scapy_Exception
-def _format_auth(data):
- out = ""
- for auth in data:
- out = f"{out}[{auth}]"
- return [f"{auth}" for auth in data]
+@dataclass
+class WigleStatistics:
+ ready: bool = False
+ username: str = None
+ rank: int = None
+ monthrank: int = None
+ discoveredwiFi: int = None
+ last: str = None
+ groupID: str = None
+ groupname: str = None
+ grouprank: int = None
+ def update_user(self, json_res):
+ self.ready = True
+ self.username = json_res["user"]
+ self.rank = json_res["rank"]
+ self.monthrank = json_res["monthRank"]
+ self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"]
+ last = json_res["statistics"]["last"]
+ self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
-def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
- """
- Transform to wigle entry in file
- """
- dummy = StringIO()
- # write kismet header
- dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
- f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
- dummy.write(
- "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
+ def update_user_group(self, json_res):
+ self.groupID = json_res["groupId"]
+ self.groupname = json_res["groupName"]
- writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
- writer.writerow([
- pcap_data[WifiInfo.BSSID],
- pcap_data[WifiInfo.ESSID],
- _format_auth(pcap_data[WifiInfo.ENCRYPTION]),
- datetime.strptime(gps_data['Updated'].rsplit('.')[0],
- "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
- pcap_data[WifiInfo.CHANNEL],
- pcap_data[WifiInfo.RSSI],
- gps_data['Latitude'],
- gps_data['Longitude'],
- gps_data['Altitude'],
- gps_data['Accuracy'],
- 'WIFI'])
- return dummy.getvalue()
-
-
-def _send_to_wigle(lines, api_key, donate=True, timeout=30):
- """
- Uploads the file to wigle-net
- """
-
- dummy = StringIO()
-
- for line in lines:
- dummy.write(f"{line}")
-
- dummy.seek(0)
-
- headers = {"Authorization": f"Basic {api_key}",
- "Accept": "application/json",
- "HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
- data = {"donate": "on" if donate else "false"}
- payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
- try:
- res = requests.post('https://api.wigle.net/api/v2/file/upload',
- data=data,
- headers=headers,
- files=payload,
- timeout=timeout)
- json_res = res.json()
- if not json_res['success']:
- raise requests.exceptions.RequestException(json_res['message'])
- except requests.exceptions.RequestException as re_e:
- raise re_e
+ def update_group(self, json_res):
+ rank = 1
+ for group in json_res["groups"]:
+ if group["groupId"] == self.groupID:
+ self.grouprank = rank
+ rank += 1
class Wigle(plugins.Plugin):
- __author__ = "Dadav and updated by Jayofelony"
- __version__ = "3.1.0"
+ __author__ = "Dadav and updated by Jayofelony and fmatray"
+ __version__ = "4.1.0"
__license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
+ LABEL_SPACING = 0
def __init__(self):
self.ready = False
- self.report = StatusFile('/root/.wigle_uploads', data_format='json')
+ self.report = None
self.skip = list()
self.lock = Lock()
self.options = dict()
+ self.statistics = WigleStatistics()
+ self.last_stat = datetime.now(tz=UTC)
+ self.ui_counter = 0
def on_loaded(self):
- if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
- logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
+ logging.info("[WIGLE] plugin loaded.")
+
+ def on_config_changed(self, config):
+ self.api_key = self.options.get("api_key", None)
+ if not self.api_key:
+ logging.info("[WIGLE] api_key must be set.")
return
-
- if 'donate' not in self.options:
- self.options['donate'] = False
-
+ self.donate = self.options.get("donate", False)
+ self.handshake_dir = config["bettercap"].get("handshakes")
+ report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
+ self.report = StatusFile(report_filename, data_format="json")
+ self.cache_dir = os.path.join(self.handshake_dir, "cache")
+ self.cvs_dir = self.options.get("cvs_dir", None)
+ self.whitelist = config["main"].get("whitelist", [])
+ self.timeout = self.options.get("timeout", 30)
+ self.position = self.options.get("position", (10, 10))
self.ready = True
- logging.info("WIGLE: ready")
-
+ logging.info("[WIGLE] Ready for wardriving!!!")
+ self.get_statistics(force=True)
+
def on_webhook(self, path, request):
- from flask import make_response, redirect
- response = make_response(redirect("https://www.wigle.net/", code=302))
- return response
+ return make_response(redirect("https://www.wigle.net/", code=302))
+
+ def get_new_gps_files(self, reported):
+ all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
+ all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
+ all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
+ return set(all_gps_files) - set(reported) - set(self.skip)
+
+ @staticmethod
+ def get_pcap_filename(gps_file):
+ pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
+ if not os.path.exists(pcap_filename):
+ logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
+ return None
+ return pcap_filename
+
+ @staticmethod
+ def extract_gps_data(path):
+ """
+ Extract data from gps-file
+ return json-obj
+ """
+ try:
+ if path.endswith(".geo.json"):
+ with open(path, "r") as json_file:
+ tempJson = json.load(json_file)
+ d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
+ return {
+ "Latitude": tempJson["location"]["lat"],
+ "Longitude": tempJson["location"]["lng"],
+ "Altitude": 10,
+ "Accuracy": tempJson["accuracy"],
+ "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
+ }
+ with open(path, "r") as json_file:
+ return json.load(json_file)
+ except (OSError, json.JSONDecodeError) as exp:
+ raise exp
+
+ def get_gps_data(self, gps_file):
+ try:
+ gps_data = self.extract_gps_data(gps_file)
+ except (OSError, json.JSONDecodeError) as exp:
+ logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
+ return None
+ if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
+ logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
+ return None
+ return gps_data
+
+ def get_pcap_data(self, pcap_filename):
+ try:
+ if cache := read_ap_cache(self.cache_dir, self.pcap_filename):
+ logging.info(f"[WIGLE] Using cache for {pcap_filename}")
+ return {
+ WifiInfo.BSSID: cache["mac"],
+ WifiInfo.ESSID: cache["hostname"],
+ WifiInfo.ENCRYPTION: cache["encryption"],
+ WifiInfo.CHANNEL: cache["channel"],
+ WifiInfo.FREQUENCY: cache["frequency"],
+ WifiInfo.RSSI: cache["rssi"],
+ }
+ except (AttributeError, KeyError):
+ pass
+ try:
+ pcap_data = extract_from_pcap(
+ pcap_filename,
+ [
+ WifiInfo.BSSID,
+ WifiInfo.ESSID,
+ WifiInfo.ENCRYPTION,
+ WifiInfo.CHANNEL,
+ WifiInfo.FREQUENCY,
+ WifiInfo.RSSI,
+ ],
+ )
+ logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
+ except FieldNotFoundError:
+ logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
+ return None
+ except Scapy_Exception as sc_e:
+ logging.debug(f"[WIGLE] {sc_e}")
+ return None
+ return pcap_data
+
+ def generate_csv(self, data):
+ date = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"{pwnagotchi.name()}_{date}.csv"
+
+ content = StringIO()
+ # write kismet header + header
+ content.write(
+ f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
+ f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
+ f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
+ )
+ writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
+ for gps_data, pcap_data in data: # write WIFIs
+ try:
+ timestamp = datetime.strptime(
+ gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
+ ).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ timestamp = datetime.strptime(
+ gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S"
+ ).strftime("%Y-%m-%d %H:%M:%S")
+ writer.writerow(
+ [
+ pcap_data[WifiInfo.BSSID],
+ pcap_data[WifiInfo.ESSID],
+ f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
+ timestamp,
+ pcap_data[WifiInfo.CHANNEL],
+ pcap_data[WifiInfo.FREQUENCY],
+ pcap_data[WifiInfo.RSSI],
+ gps_data["Latitude"],
+ gps_data["Longitude"],
+ gps_data["Altitude"],
+ gps_data["Accuracy"],
+ "", # RCOIs to populate
+ "", # MfgrId always empty
+ "WIFI",
+ ]
+ )
+ content.seek(0)
+ return filename, content
+
+ def save_to_file(self, cvs_filename, cvs_content):
+ if not self.cvs_dir:
+ return
+ filename = os.path.join(self.cvs_dir, cvs_filename)
+ logging.info(f"[WIGLE] Saving to file {filename}")
+ try:
+ with open(filename, mode="w") as f:
+ f.write(cvs_content.getvalue())
+ except Exception as exp:
+ logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
+
+ def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
+ try:
+ json_res = requests.post(
+ "https://api.wigle.net/api/v2/file/upload",
+ headers={
+ "Authorization": f"Basic {self.api_key}",
+ "Accept": "application/json",
+ },
+ data={"donate": "on" if self.donate else "false"},
+ files=dict(file=(cvs_filename, cvs_content, "text/csv")),
+ timeout=self.timeout,
+ ).json()
+ if not json_res["success"]:
+ raise requests.exceptions.RequestException(json_res["message"])
+ reported += no_err_entries
+ self.report.update(data={"reported": reported})
+ logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
+ except (requests.exceptions.RequestException, OSError) as exp:
+ self.skip += no_err_entries
+ logging.debug(f"[WIGLE] Exception while uploading: {exp}")
+
+ def upload_new_handshakes(self, reported, new_gps_files, agent):
+ logging.info("[WIGLE] Uploading new handshakes to wigle.net")
+ csv_entries, no_err_entries = list(), list()
+ for gps_file in new_gps_files:
+ logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
+ if (
+ (pcap_filename := self.get_pcap_filename(gps_file))
+ and (gps_data := self.get_gps_data(gps_file))
+ and (pcap_data := self.get_pcap_data(pcap_filename))
+ ):
+ csv_entries.append((gps_data, pcap_data))
+ no_err_entries.append(gps_file)
+ else:
+ self.skip.append(gps_file)
+ logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
+ if csv_entries:
+ cvs_filename, cvs_content = self.generate_csv(csv_entries)
+ self.save_to_file(cvs_filename, cvs_content)
+ display = agent.view()
+ display.on_uploading("wigle.net")
+ self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
+ display.on_normal()
+
+ def request_statistics(self, url):
+ try:
+ return requests.get(
+ url,
+ headers={
+ "Authorization": f"Basic {self.api_key}",
+ "Accept": "application/json",
+ },
+ timeout=self.timeout,
+ ).json()
+ except (requests.exceptions.RequestException, OSError) as exp:
+ return None
+
+ def get_user_statistics(self):
+ json_res = self.request_statistics(
+ "https://api.wigle.net/api/v2/stats/user",
+ )
+ if json_res and json_res["success"]:
+ self.statistics.update_user(json_res)
+
+ def get_usergroup_statistics(self):
+ if not self.statistics.username or self.statistics.groupID:
+ return
+ url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}"
+ if json_res := self.request_statistics(url):
+ self.statistics.update_user_group(json_res)
+
+ def get_group_statistics(self):
+ if not self.statistics.groupID:
+ return
+ json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group")
+ if json_res and json_res["success"]:
+ self.statistics.update_group(json_res)
+
+ def get_statistics(self, force=False):
+ if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30:
+ self.last_stat = datetime.now(tz=UTC)
+ self.get_user_statistics()
+ self.get_usergroup_statistics()
+ self.get_group_statistics()
def on_internet_available(self, agent):
- """
- Called when there's internet connectivity
- """
- if not self.ready or self.lock.locked():
+ if not self.ready:
return
+ with self.lock:
+ reported = self.report.data_field_or("reported", default=list())
+ if new_gps_files := self.get_new_gps_files(reported):
+ self.upload_new_handshakes(reported, new_gps_files, agent)
+ else:
+ self.get_statistics()
- from scapy.all import Scapy_Exception
+ def on_ui_setup(self, ui):
+ with ui._lock:
+ ui.add_element(
+ "wigle",
+ Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
+ )
- config = agent.config()
- display = agent.view()
- reported = self.report.data_field_or('reported', default=list())
- handshake_dir = config['bettercap']['handshakes']
- all_files = os.listdir(handshake_dir)
- all_gps_files = [os.path.join(handshake_dir, filename)
- for filename in all_files
- if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
+ def on_unload(self, ui):
+ with ui._lock:
+ try:
+ ui.remove_element("wigle")
+ except KeyError:
+ pass
- all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
- new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
- if new_gps_files:
- logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
- csv_entries = list()
- no_err_entries = list()
- for gps_file in new_gps_files:
- if gps_file.endswith('.gps.json'):
- pcap_filename = gps_file.replace('.gps.json', '.pcap')
- if gps_file.endswith('.geo.json'):
- pcap_filename = gps_file.replace('.geo.json', '.pcap')
- if not os.path.exists(pcap_filename):
- logging.debug("WIGLE: Can't find pcap for %s", gps_file)
- self.skip.append(gps_file)
- continue
- try:
- gps_data = _extract_gps_data(gps_file)
- except OSError as os_err:
- logging.debug("WIGLE: %s", os_err)
- self.skip.append(gps_file)
- continue
- except json.JSONDecodeError as json_err:
- logging.debug("WIGLE: %s", json_err)
- self.skip.append(gps_file)
- continue
- if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
- logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
- self.skip.append(gps_file)
- continue
- try:
- pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
- WifiInfo.ESSID,
- WifiInfo.ENCRYPTION,
- WifiInfo.CHANNEL,
- WifiInfo.RSSI])
- except FieldNotFoundError:
- logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
- self.skip.append(gps_file)
- continue
- except Scapy_Exception as sc_e:
- logging.debug("WIGLE: %s", sc_e)
- self.skip.append(gps_file)
- continue
- new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
- csv_entries.append(new_entry)
- no_err_entries.append(gps_file)
- if csv_entries:
- display.on_uploading('wigle.net')
-
- try:
- _send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
- reported += no_err_entries
- self.report.update(data={'reported': reported})
- logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
- except requests.exceptions.RequestException as re_e:
- self.skip += no_err_entries
- logging.debug("WIGLE: Got an exception while uploading %s", re_e)
- except OSError as os_e:
- self.skip += no_err_entries
- logging.debug("WIGLE: Got the following error: %s", os_e)
-
- display.on_normal()
+ def on_ui_update(self, ui):
+ with ui._lock:
+ if not (self.ready and self.statistics.ready):
+ ui.set("wigle", "We Will Wait Wigle")
+ return
+ msg = "-"
+ self.ui_counter = (self.ui_counter + 1) % 6
+ if self.ui_counter == 0:
+ msg = f"User:{self.statistics.username}"
+ if self.ui_counter == 1:
+ msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}"
+ elif self.ui_counter == 2:
+ msg = f"{self.statistics.discoveredwiFi} discovered WiFis"
+ elif self.ui_counter == 3:
+ msg = f"Last upl.:{self.statistics.last}"
+ elif self.ui_counter == 4:
+ msg = f"Grp:{self.statistics.groupname}"
+ elif self.ui_counter == 5:
+ msg = f"Grp rank:{self.statistics.grouprank}"
+ ui.set("wigle", msg)
diff --git a/pwnagotchi/utils.py b/pwnagotchi/utils.py
index 3fcf7cc7..26f507a8 100644
--- a/pwnagotchi/utils.py
+++ b/pwnagotchi/utils.py
@@ -564,7 +564,8 @@ class WifiInfo(Enum):
ESSID = 1
ENCRYPTION = 2
CHANNEL = 3
- RSSI = 4
+ FREQUENCY = 4
+ RSSI = 5
class FieldNotFoundError(Exception):
@@ -594,9 +595,6 @@ def extract_from_pcap(path, fields):
"""
results = dict()
for field in fields:
- if not isinstance(field, WifiInfo):
- raise TypeError("Invalid field")
-
subtypes = set()
if field == WifiInfo.BSSID:
@@ -606,10 +604,9 @@ def extract_from_pcap(path, fields):
packets = sniff(offline=path, filter=bpf_filter)
try:
for packet in packets:
- if packet.haslayer(Dot11Beacon):
- if hasattr(packet[Dot11], 'addr3'):
- results[field] = packet[Dot11].addr3
- break
+ if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'):
+ results[field] = packet[Dot11].addr3
+ break
else: # magic
raise FieldNotFoundError("Could not find field [BSSID]")
except Exception:
@@ -654,6 +651,14 @@ def extract_from_pcap(path, fields):
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
except Exception:
raise FieldNotFoundError("Could not find field [CHANNEL]")
+ elif field == WifiInfo.FREQUENCY:
+ from scapy.layers.dot11 import sniff, RadioTap
+ from pwnagotchi.mesh.wifi import freq_to_channel
+ packets = sniff(offline=path, count=1)
+ try:
+ results[field] = packets[0][RadioTap].ChannelFrequency
+ except Exception:
+ raise FieldNotFoundError("Could not find field [FREQUENCY]")
elif field == WifiInfo.RSSI:
from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
@@ -662,7 +667,8 @@ def extract_from_pcap(path, fields):
results[field] = packets[0][RadioTap].dBm_AntSignal
except Exception:
raise FieldNotFoundError("Could not find field [RSSI]")
-
+ else:
+ raise TypeError("Invalid field")
return results
diff --git a/pwnagotchi/voice.py b/pwnagotchi/voice.py
index fad1eea8..38dbaefd 100644
--- a/pwnagotchi/voice.py
+++ b/pwnagotchi/voice.py
@@ -27,11 +27,19 @@ class Voice:
self._('Hack the Planet!'),
self._('No more mister Wi-Fi!!'),
self._('Pretty fly 4 a Wi-Fi!'),
+ self._('Good Pwning!'), # Battlestar Galactica
+ self._('Ensign, Engage!'), # Star trek
+ self._('Free your Wi-Fi!'), # Matrix
+ self._('Chevron Seven, locked.'), # Stargate
+ self._('May the Wi-fi be with you'), # Star wars
])
def on_keys_generation(self):
return random.choice([
- self._('Generating keys, do not turn off ...')])
+ self._('Generating keys, do not turn off ...'),
+ self._('Are you the keymaster?'), # Ghostbusters
+ self._('I am the keymaster!'), # Ghostbusters
+ ])
def on_normal(self):
return random.choice([
@@ -44,8 +52,7 @@ class Voice:
def on_reading_logs(self, lines_so_far=0):
if lines_so_far == 0:
return self._('Reading last session logs ...')
- else:
- return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far)
+ return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far)
def on_bored(self):
return random.choice([
@@ -53,7 +60,11 @@ class Voice:
self._('Let\'s go for a walk!')])
def on_motivated(self, reward):
- return self._('This is the best day of my life!')
+ return random.choice([
+ self._('This is the best day of my life!'),
+ self._('All your base are belong to us'),
+ self._('Fascinating!'), # Star trek
+ ])
def on_demotivated(self, reward):
return self._('Shitty day :/')
@@ -63,6 +74,8 @@ class Voice:
self._('I\'m extremely bored ...'),
self._('I\'m very sad ...'),
self._('I\'m sad'),
+ self._('I\'m so happy ...'), # Marvin in H2G2
+ self._('Life? Don\'t talk to me about life.'), # Also Marvin in H2G2
'...'])
def on_angry(self):
@@ -78,17 +91,17 @@ class Voice:
self._('I pwn therefore I am.'),
self._('So many networks!!!'),
self._('I\'m having so much fun!'),
+ self._('It\'s a Wi-Fi system! I know this!'), # Jurassic park
self._('My crime is that of curiosity ...')])
def on_new_peer(self, peer):
if peer.first_encounter():
return random.choice([
self._('Hello {name}! Nice to meet you.').format(name=peer.name())])
- else:
- return random.choice([
- self._('Yo {name}! Sup?').format(name=peer.name()),
- self._('Hey {name} how are you doing?').format(name=peer.name()),
- self._('Unit {name} is nearby!').format(name=peer.name())])
+ return random.choice([
+ self._('Yo {name}! Sup?').format(name=peer.name()),
+ self._('Hey {name} how are you doing?').format(name=peer.name()),
+ self._('Unit {name} is nearby!').format(name=peer.name())])
def on_lost_peer(self, peer):
return random.choice([
@@ -104,19 +117,23 @@ class Voice:
def on_grateful(self):
return random.choice([
self._('Good friends are a blessing!'),
- self._('I love my friends!')])
+ self._('I love my friends!')
+ ])
def on_lonely(self):
return random.choice([
self._('Nobody wants to play with me ...'),
self._('I feel so alone ...'),
+ self._('Let\'s find friends'),
self._('Where\'s everybody?!')])
def on_napping(self, secs):
return random.choice([
self._('Napping for {secs}s ...').format(secs=secs),
self._('Zzzzz'),
- self._('ZzzZzzz ({secs}s)').format(secs=secs)])
+ self._('Snoring ...'),
+ self._('ZzzZzzz ({secs}s)').format(secs=secs),
+ ])
def on_shutdown(self):
return random.choice([
@@ -124,12 +141,17 @@ class Voice:
self._('Zzz')])
def on_awakening(self):
- return random.choice(['...', '!'])
+ return random.choice([
+ '...',
+ '!',
+ 'Hello World!',
+ self._('I dreamed of electric sheep'),
+ ])
def on_waiting(self, secs):
return random.choice([
- self._('Waiting for {secs}s ...').format(secs=secs),
'...',
+ self._('Waiting for {secs}s ...').format(secs=secs),
self._('Looking around ({secs}s)').format(secs=secs)])
def on_assoc(self, ap):
@@ -138,12 +160,16 @@ class Voice:
return random.choice([
self._('Hey {what} let\'s be friends!').format(what=what),
self._('Associating to {what}').format(what=what),
- self._('Yo {what}!').format(what=what)])
+ self._('Yo {what}!').format(what=what),
+ self._('Rise and Shine Mr. {what}!').format(what=what), # Half Life
+ ])
def on_deauth(self, sta):
return random.choice([
- self._('Just decided that {mac} needs no WiFi!').format(mac=sta['mac']),
+ self._('Just decided that {mac} needs no Wi-Fi!').format(mac=sta['mac']),
self._('Deauthenticating {mac}').format(mac=sta['mac']),
+ self._('No more Wi-Fi for {mac}').format(mac=sta['mac']),
+ self._('It\'s a trap! {mac}').format(mac=sta['mac']), # Star wars
self._('Kickbanning {mac}!').format(mac=sta['mac'])])
def on_handshakes(self, new_shakes):
@@ -155,10 +181,19 @@ class Voice:
return self._('You have {count} new message{plural}!').format(count=count, plural=s)
def on_rebooting(self):
- return self._("Oops, something went wrong ... Rebooting ...")
+ return random.choice([
+ self._("Oops, something went wrong ... Rebooting ..."),
+ self._("Have you tried turning it off and on again?"), # The IT crew
+ self._("I\'m afraid Dave"), # 2001 Space Odyssey
+ self._("I\'m dead, Jim!"), # Star Trek
+ self._("I have a bad feeling about this"), # Star wars
+ ])
def on_uploading(self, to):
- return self._("Uploading data to {to} ...").format(to=to)
+ return random.choice([
+ self._("Uploading data to {to} ...").format(to=to),
+ self._("Beam me up to {to}").format(to=to),
+ ])
def on_downloading(self, name):
return self._("Downloading from {name} ...").format(name=name)