From 89af46c6fce447ba8fab9aab938ab4abdaed57c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Mon, 20 Jan 2025 22:01:10 +0100 Subject: [PATCH 01/33] Update ohcapi.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A more elegant code to test the internet connection Signed-off-by: Frédéric --- pwnagotchi/plugins/default/ohcapi.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pwnagotchi/plugins/default/ohcapi.py b/pwnagotchi/plugins/default/ohcapi.py index b400c572..5aa6ac5d 100644 --- a/pwnagotchi/plugins/default/ohcapi.py +++ b/pwnagotchi/plugins/default/ohcapi.py @@ -75,20 +75,16 @@ class ohcapi(plugins.Plugin): return # Check if the internet is still available by pinging Google + self.internet_active = False try: response = requests.get('https://www.google.com', timeout=5) + if response.status_code == 200: + self.internet_active = True except requests.ConnectionError: - self.internet_active = False - return - - if response.status_code == 200: - self.internet_active = True - else: - self.internet_active = False return current_time = time.time() - if current_time - self.last_run >= self.options['sleep']: + if self.internet_active and current_time - self.last_run >= self.options['sleep']: self._run_tasks(agent) self.last_run = current_time From 83b9077e09e9a4eeecc0ac314d277fd924e1b622 Mon Sep 17 00:00:00 2001 From: Jeroen Oudshoorn Date: Sun, 2 Feb 2025 22:22:27 +0100 Subject: [PATCH 02/33] Fix gdrivesync defaults Signed-off-by: Jeroen Oudshoorn --- pwnagotchi/defaults.toml | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 6cf1fd6e..2d9165e3 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -12,7 +12,8 @@ main.custom_plugin_repos = [ "https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip", "https://github.com/NeonLightning/pwny/archive/master.zip", "https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip", - "https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip" + "https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip", + "https://github.com/cyberartemio/wardriver-pwnagotchi-plugin/archive/main.zip", ] main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/" @@ -34,7 +35,7 @@ main.plugins.fix_services.enabled = true main.plugins.gdrivesync.enabled = false main.plugins.gdrivesync.backupfiles = [''] main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups" -main.plugin.gdrivesync.interval = 1 +main.plugins.gdrivesync.interval = 1 main.plugins.gpio_buttons.enabled = false @@ -82,9 +83,20 @@ main.plugins.webcfg.enabled = true main.plugins.webgpsmap.enabled = false -main.plugins.wigle.enabled = false -main.plugins.wigle.api_key = "" -main.plugins.wigle.donate = false +main.plugins.wardriver.enabled = false +main.plugins.wardriver.path = "/root/wardriver" +main.plugins.wardriver.ui.enabled = true +main.plugins.wardriver.ui.icon = false +main.plugins.wardriver.ui.icon_reverse = false +main.plugins.wardriver.ui.position.x = 7 +main.plugins.wardriver.ui.position.y = 85 +main.plugins.wardriver.wigle.enabled = true +main.plugins.wardriver.wigle.api_key = "" +main.plugins.wardriver.wigle.donate = false +main.plugins.wardriver.whitelist = [ + "network-1", + "network-2" +] main.plugins.wpa-sec.enabled = false main.plugins.wpa-sec.api_key = "" From 7a0301b57f88ee1c0e966731c16b4f87ead45ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Fri, 7 Feb 2025 22:34:52 +0100 Subject: [PATCH 03/33] bt-tether rework MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move connexion to NetworkManager autoconnect for more reliability - Check options formats for several values - Add a dns option to be able to choose another DNS provider (ex: OpenNIC for no traceability) - The "BT" show more information (Device Connected/disconnected, Connexion Up/Down) - Status now show a message on error - nmcli calls are non-blocking - Less logs when disconnected (ex: phone's bluetooth is off) Signed-off-by: Frédéric --- pwnagotchi/plugins/default/bt-tether.py | 157 +++++++++++++++--------- 1 file changed, 97 insertions(+), 60 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 9048492c..a59cc1f0 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -1,93 +1,130 @@ import logging import subprocess +import re + import pwnagotchi.plugins as plugins import pwnagotchi.ui.fonts as fonts from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK +MAC_PTTRN = "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" +IP_PTTRN = "^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$" + class BTTether(plugins.Plugin): - __author__ = 'Jayofelony' - __version__ = '1.2' - __license__ = 'GPL3' - __description__ = 'A new BT-Tether plugin' + __author__ = "Jayofelony, modified my fmatray" + __version__ = "1.3" + __license__ = "GPL3" + __description__ = "A new BT-Tether plugin" def __init__(self): self.ready = False self.options = dict() - self.status = '-' + self.phone_name = None + self.mac = None + + @staticmethod + def nmcli(args, pattern=None): + try: + result = subprocess.run(["nmcli"] + args, + check=True, capture_output=True, text=True) + if pattern: + return result.stdout.find(pattern) + return result + except Exception as exp: + logging.error(f"[BT-Tether] Error with nmcli: {exp}") + raise exp def on_loaded(self): logging.info("[BT-Tether] plugin loaded.") def on_config_changed(self, config): - if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']): - self.ready = False - ip = self.options['ip'] - mac = self.options['mac'] - phone_name = self.options['phone-name'] + ' Network' - if self.options['phone'].lower() == 'android': - address = f'{ip}' - gateway = '192.168.44.1' - elif self.options['phone'].lower() == 'ios': - address = f'{ip}' - gateway = '172.20.10.1' - else: - logging.error("[BT-Tether] Phone type not supported.") + if "phone-name" not in self.options: + logging.error("[BT-Tether] Phone name not provided") + return + if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])): + logging.error("[BT-Tether] Error with mac adresse") + return + + if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]): + logging.error("[BT-Tether] Phone type not supported") + return + if self.options["phone"].lower() == "android": + address = self.options.get("ip", "192.168.44.2") + gateway = "192.168.44.1" + elif self.options["phone"].lower() == "ios": + address = self.options.get("ip", "172.20.10.2") + gateway = "172.20.10.1" + if not re.match(IP_PTTRN, address): + logging.error(f"[BT-Tether] IP error: {address}") + return + + self.phone_name = self.options["phone-name"] + " Network" + self.mac = self.options["mac"] + dns = self.options.get("dns", "8.8.8.8 1.1.1.1").replace(",", " ").replace(";", " ") + + try: + # Configure connection. Metric is set to 200 to prefer connection over USB + self.nmcli(["connection", "modify", f"{self.phone_name}", + "connection.type", "bluetooth", + "bluetooth.type", "panu", + "bluetooth.bdaddr", f"{self.mac}", + "connection.autoconnect", "yes", + "connection.autoconnect-retries", "0", + "ipv4.method", "manual", + "ipv4.dns", f"{dns}", + "ipv4.addresses", f"{address}/24", + "ipv4.gateway", f"{gateway}", + "ipv4.route-metric", "200" ]) + self.nmcli(["connection", "reload"]) + self.ready = True + logging.info(f"[BT-Tether] Connection {self.phone_name} configured") + except Exception as e: + logging.error(f"[BT-Tether] Error while configuring: {e}") return try: - subprocess.run([ - 'nmcli', 'connection', 'modify', f'{phone_name}', - 'connection.type', 'bluetooth', - 'bluetooth.type', 'panu', - 'bluetooth.bdaddr', f'{mac}', - 'ipv4.method', 'manual', - 'ipv4.dns', '8.8.8.8 1.1.1.1', - 'ipv4.addresses', f'{address}/24', - 'ipv4.gateway', f'{gateway}', - 'ipv4.route-metric', '100' - ], check=True) - subprocess.run(['nmcli', 'connection', 'reload'], check=True) - subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True) + self.nmcli(["connection", "up", f"{self.phone_name}"]) except Exception as e: logging.error(f"[BT-Tether] Failed to connect to device: {e}") - logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?") - self.ready = True - - def on_ready(self, agent): - if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']): - self.ready = False - self.ready = True + logging.error( + f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?" + ) def on_ui_setup(self, ui): with ui._lock: ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 10, 0), label_font=fonts.Bold, text_font=fonts.Medium)) - def on_ui_update(self, ui): - if self.ready: - phone_name = self.options['phone-name'] + ' Network' - if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1: - self.status = 'C' - else: - self.status = '-' - try: - subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True) - except Exception as e: - logging.debug(f"[BT-Tether] Failed to connect to device: {e}") - logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?") - ui.set('bluetooth', self.status) - return + if not self.ready: + return + with ui._lock: + status = "" + try: + # Checking connection + if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name], + "activated") != -1: + ui.set("bluetooth", "U") + return + else: + ui.set("bluetooth", "D") + status = "BT Conn. down" + + # Checking device + if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac], + "(connected)") != -1: + ui.set("bluetooth", "C") + status += "\nBT dev conn." + else: + ui.set("bluetooth", "-") + status += "\nBT dev disconn." + ui.set("status", status) + except Exception as e: + logging.error(f"[BT-Tether] Error on update: {e}") def on_unload(self, ui): - phone_name = self.options['phone-name'] + ' Network' with ui._lock: - ui.remove_element('bluetooth') + ui.remove_element("bluetooth") try: - if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1: - subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True) - logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}") - else: - logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting") + self.nmcli(["connection", "down", f"{self.phone_name}"]) except Exception as e: - logging.error(f"[BT-Tether] Failed to disconnect from device: {e}") \ No newline at end of file + logging.error(f"[BT-Tether] Failed to disconnect from device: {e}") From 675e275f34bc7f7bf1201e1298899cd574c159f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Sat, 8 Feb 2025 00:07:41 +0100 Subject: [PATCH 04/33] Add web hook to bt-tether.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Webhook now show Bluetooth, Device and Connection configuration Signed-off-by: Frédéric --- pwnagotchi/plugins/default/bt-tether.py | 175 ++++++++++++++++++++++-- 1 file changed, 162 insertions(+), 13 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index a59cc1f0..470ba919 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -1,14 +1,124 @@ import logging import subprocess import re - +from flask import abort, render_template_string import pwnagotchi.plugins as plugins import pwnagotchi.ui.fonts as fonts from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK -MAC_PTTRN = "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" -IP_PTTRN = "^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$" +TEMPLATE = """ +{% extends "base.html" %} +{% set active_page = "bt-tether" %} +{% block title %} + {{ title }} +{% endblock %} +{% block meta %} + + +{% endblock %} +{% block styles %} +{{ super() }} + +{% endblock %} +{% block script %} + var searchInput = document.getElementById("searchText"); + searchInput.onkeyup = function() { + var filter, table, tr, td, i, txtValue; + filter = searchInput.value.toUpperCase(); + table = document.getElementById("tableOptions"); + if (table) { + tr = table.getElementsByTagName("tr"); + + for (i = 0; i < tr.length; i++) { + td = tr[i].getElementsByTagName("td")[0]; + if (td) { + txtValue = td.textContent || td.innerText; + if (txtValue.toUpperCase().indexOf(filter) > -1) { + tr[i].style.display = ""; + }else{ + tr[i].style.display = "none"; + } + } + } + } + } +{% endblock %} +{% block content %} + + + + + + + + + + + + + + + + + + +
ItemConfiguration
Bluetooth{{bluetooth|safe}}
Device{{device|safe}}
Connection{{connection|safe}}
+{% endblock %} +""" + +MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" +IP_PTTRN = r"^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$" class BTTether(plugins.Plugin): __author__ = "Jayofelony, modified my fmatray" @@ -23,17 +133,23 @@ class BTTether(plugins.Plugin): self.mac = None @staticmethod - def nmcli(args, pattern=None): + def exec_cmd(cmd, args, pattern=None): try: - result = subprocess.run(["nmcli"] + args, + result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True) if pattern: return result.stdout.find(pattern) return result except Exception as exp: - logging.error(f"[BT-Tether] Error with nmcli: {exp}") + logging.error(f"[BT-Tether] Error with {cmd} : {exp}") raise exp + def bluetoothctl(self, args, pattern=None): + return self.exec_cmd("bluetoothctl", args, pattern) + + def nmcli(self, args, pattern=None): + return self.exec_cmd("nmcli", args, pattern) + def on_loaded(self): logging.info("[BT-Tether] plugin loaded.") @@ -89,6 +205,14 @@ class BTTether(plugins.Plugin): f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?" ) + def on_unload(self, ui): + with ui._lock: + ui.remove_element("bluetooth") + try: + self.nmcli(["connection", "down", f"{self.phone_name}"]) + except Exception as e: + logging.error(f"[BT-Tether] Failed to disconnect from device: {e}") + def on_ui_setup(self, ui): with ui._lock: ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', @@ -121,10 +245,35 @@ class BTTether(plugins.Plugin): except Exception as e: logging.error(f"[BT-Tether] Error on update: {e}") - def on_unload(self, ui): - with ui._lock: - ui.remove_element("bluetooth") - try: - self.nmcli(["connection", "down", f"{self.phone_name}"]) - except Exception as e: - logging.error(f"[BT-Tether] Failed to disconnect from device: {e}") + def on_webhook(self, path, request): + if not self.ready: + return """ + BT-tether: Error + Plugin not ready + """ + if path == "/" or not path: + try: + bluetooth = self.bluetoothctl(["info", self.mac]) + bluetooth = bluetooth.stdout.replace('\n', '
') + except Exception as e: + bluetooth = "Error while checking bluetoothctl" + + try: + device =self.nmcli(["-w", "0","device", "show", self.mac]) + device = device.stdout.replace('\n', '
') + except Exception as e: + device = "Error while checking nmcli device" + + try: + connection = self.nmcli(["-w", "0","connection", "show", self.phone_name]) + connection = connection.stdout.replace('\n', '
') + except Exception as e: + connection = "Error while checking nmcli connection" + + logging.debug(device) + return render_template_string(TEMPLATE, + title="BT-Tether", + bluetooth=bluetooth, + device=device, + connection=connection) + abort(404) \ No newline at end of file From 1337494e74c201558e3bbad22f8dc5aca652d68a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Sat, 8 Feb 2025 00:52:57 +0100 Subject: [PATCH 05/33] bt-tether.py update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add a delay before going up to give a change to the NetworkManager to get ready - now check DNS format Signed-off-by: Frédéric --- pwnagotchi/plugins/default/bt-tether.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 470ba919..045563ed 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -1,6 +1,7 @@ import logging import subprocess import re +import time from flask import abort, render_template_string import pwnagotchi.plugins as plugins import pwnagotchi.ui.fonts as fonts @@ -117,8 +118,10 @@ TEMPLATE = """ {% endblock %} """ +# We all love crazy regex patterns MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" -IP_PTTRN = r"^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$" +IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$" +DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$" class BTTether(plugins.Plugin): __author__ = "Jayofelony, modified my fmatray" @@ -176,7 +179,11 @@ class BTTether(plugins.Plugin): self.phone_name = self.options["phone-name"] + " Network" self.mac = self.options["mac"] - dns = self.options.get("dns", "8.8.8.8 1.1.1.1").replace(",", " ").replace(";", " ") + dns = self.options.get("dns", "8.8.8.8 1.1.1.1") + if not re.match(DNS_PTTRN, dns): + logging.error(f"[BT-Tether] DNS error: {dns}") + return + dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning try: # Configure connection. Metric is set to 200 to prefer connection over USB @@ -198,6 +205,7 @@ class BTTether(plugins.Plugin): logging.error(f"[BT-Tether] Error while configuring: {e}") return try: + time.sleep(5) # Give some delay to configure before going up self.nmcli(["connection", "up", f"{self.phone_name}"]) except Exception as e: logging.error(f"[BT-Tether] Failed to connect to device: {e}") From e82621c80bb8c910f3874297404298ef1f9d9fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=B2=E5=B1=8E=E5=B0=86=E5=86=9B?= <37292630+wlmh110@users.noreply.github.com> Date: Sat, 8 Feb 2025 17:52:54 +0800 Subject: [PATCH 06/33] update New Features: Low Battery Shutdown Functionality Bug Fixes: Resolved an issue where the main process would become blocked when unable to connect to a device. --- pwnagotchi/defaults.toml | 3 + pwnagotchi/plugins/default/pisugarx.py | 212 +++++++++++++++++-------- 2 files changed, 148 insertions(+), 67 deletions(-) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 880bc2cf..3882e75e 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -65,6 +65,9 @@ main.plugins.pwndroid.display_alitude = false # show altitude on display main.plugins.pisugarx.enabled = false main.plugins.pisugarx.rotation = false main.plugins.pisugarx.default_display = "percentage" +main.plugins.pisugarx.lowpower_shutdown = true +main.plugins.pisugarx.lowpower_shutdown_level = 10 # battery percent at which the device will turn off +main.plugins.pisugarx.max_charge_voltage_protection = true #It will limit the battery voltage to about 80% to extend battery life main.plugins.session-stats.enabled = false main.plugins.session-stats.save_directory = "/var/tmp/pwnagotchi/sessions/" diff --git a/pwnagotchi/plugins/default/pisugarx.py b/pwnagotchi/plugins/default/pisugarx.py index 4106eadc..984225a5 100644 --- a/pwnagotchi/plugins/default/pisugarx.py +++ b/pwnagotchi/plugins/default/pisugarx.py @@ -29,7 +29,7 @@ curve1200 = [ (3.49, 3.2), (3.1, 0.0), ] -curve1200_3= [ +curve1200_3 = [ (4.2, 100.0), # 高电量阶段 (100%) (4.0, 80.0), # 中电量阶段 (80%) (3.7, 60.0), # 中电量阶段 (60%) @@ -56,37 +56,54 @@ class PiSugarServer: PiSugar initialization, if unable to connect to any version of PiSugar, return false """ self._bus = smbus.SMBus(1) + self.ready = False self.modle = None self.i2creg = [] self.address = 0 - self.battery_voltage = 0 - self.voltage_history = deque(maxlen=10) + self.battery_voltage = 0.00 + self.voltage_history = deque(maxlen=10) self.battery_level = 0 self.battery_charging = 0 self.temperature = 0 self.power_plugged = False self.allow_charging = True - while self.modle == None: - if self.check_device(PiSugar_addresses["PiSugar2"]) != None: + self.lowpower_shutdown = False + self.lowpower_shutdown_level = 10 + self.max_charge_voltage_protection = False + # Start the device connection in a background thread + self.connection_thread = threading.Thread( + target=self._connect_device, daemon=True) + self.connection_thread.start() + + def _connect_device(self): + """ + Attempt to connect to the PiSugar device in a background thread. + """ + while self.modle is None: + if self.check_device(PiSugar_addresses["PiSugar2"]) is not None: self.address = PiSugar_addresses["PiSugar2"] - if self.check_device(PiSugar_addresses["PiSugar2"], 0Xc2) != 0: + if self.check_device(PiSugar_addresses["PiSugar2"], 0xC2) != 0: self.modle = "PiSugar2Plus" else: self.modle = "PiSugar2" self.device_init() - elif self.check_device(PiSugar_addresses["PiSugar3"]) != None: + elif self.check_device(PiSugar_addresses["PiSugar3"]) is not None: self.modle = 'PiSugar3' self.address = PiSugar_addresses["PiSugar3"] + self.device_init() else: self.modle = None - logging.error( - "No PiSugar device was found. Please check if the PiSugar device is powered on.") + logging.info( + "No PiSugar device was found. Please check if the PiSugar device is powered on." + ) time.sleep(5) - - # self.update_value() + logging.info(f"{self.modle} is connected") + # Once connected, start the timer self.start_timer() while len(self.i2creg) < 256: time.sleep(1) + self.ready = True + logging.info(f"{self.modle} is ready") def start_timer(self): @@ -135,14 +152,36 @@ class PiSugarServer: self.battery_voltage = ( (((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000 self.power_plugged = self.i2creg[0xdd] == 0x1f - + self.voltage_history.append(self.battery_voltage) - self.battery_level=self.convert_battery_voltage_to_level() + self.battery_level = self.convert_battery_voltage_to_level() + + if self.lowpower_shutdown: + if self.battery_level < self.lowpower_shutdown_level: + logging.info("[PiSugarX] low power shutdown now.") + self.shutdown() + pwnagotchi.shutdown() + time.sleep(3) - except: - logging.error(f"read error") + except Exception as e: + logging.error(f"read error{e}") time.sleep(3) + def shutdown(self): + # logging.info("[PiSugarX] PiSugar set shutdown .") + if self.modle == 'PiSugar3': + # 10秒后关闭电源 + self._bus.write_byte_data(self.address, 0x0B, 0x29) #关闭写保护 + self._bus.write_byte_data(self.address, 0x09, 10) + self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data( + self.address, 0x02) & 0b11011111) + self._bus.write_byte_data(self.address, 0x0B, 0x00) #开启写保护 + logging.info("[PiSugarX] PiSugar shutdown in 10s.") + elif self.modle == 'PiSugar2': + pass + elif self.modle == 'PiSugar2Plus': + pass + def check_device(self, address, reg=0): """Check if a device is present at the specified address""" try: @@ -406,6 +445,7 @@ class PiSugarServer: """ pass + class PiSugar(plugins.Plugin): __author__ = "jayofelony" __version__ = "1.2" @@ -418,14 +458,25 @@ class PiSugar(plugins.Plugin): def __init__(self): self._agent = None - self.is_new_model = False self.options = dict() + """ + self.options = { + 'enabled': True, + 'rotation': False, + 'default_display': 'percentage', + 'lowpower_shutdown': True, + 'lowpower_shutdown_level': 10, + 'max_charge_voltage_protection': True + } + """ self.ps = None + # logging.debug(f"[PiSugarX] {self.options}") try: self.ps = PiSugarServer() except Exception as e: # Log at debug to avoid clutter since it might be a false positive - logging.debug("[PiSugarX] Unable to establish connection: %s", repr(e)) + logging.debug( + "[PiSugarX] Unable to establish connection: %s", repr(e)) self.ready = False self.lasttemp = 69 @@ -444,7 +495,8 @@ class PiSugar(plugins.Plugin): try: return func() except Exception as e: - logging.debug("[PiSugarX] Failed to get data using %s: %s", func.__name__, e) + logging.debug( + "[PiSugarX] Failed to get data using %s: %s", func.__name__, e) return default def on_loaded(self): @@ -455,20 +507,24 @@ class PiSugar(plugins.Plugin): valid_displays = ['voltage', 'percentage', 'temp'] if self.default_display not in valid_displays: - logging.warning(f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.") + logging.warning( + f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.") self.default_display = 'voltage' - logging.info(f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.") - logging.info(f"[PiSugarX] Default display (when rotation disabled): {self.default_display}") + logging.info( + f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.") + logging.info( + f"[PiSugarX] Default display (when rotation disabled): {self.default_display}") + self.ps.lowpower_shutdown = self.options['lowpower_shutdown'] + self.ps.lowpower_shutdown_level = self.options['lowpower_shutdown_level'] + self.ps.max_charge_voltage_protection = self.options['max_charge_voltage_protection'] def on_ready(self, agent): - self.ready = True - self._agent = agent - led_amount = self.safe_get(self.ps.get_battery_led_amount, default=0) - if led_amount == 2: - self.is_new_model = True - else: - self.is_new_model = False + try: + self.ready = self.ps.ready + except Exception as e: + # Log at debug to avoid clutter since it might be a false positive + logging.warning(f"[PiSugarX] {e}") def on_internet_available(self, agent): self._agent = agent @@ -482,31 +538,52 @@ class PiSugar(plugins.Plugin): try: if request.method == "GET": if path == "/" or not path: - version = self.safe_get(self.ps.get_version, default='Unknown') + version = self.safe_get( + self.ps.get_version, default='Unknown') model = self.safe_get(self.ps.get_model, default='Unknown') - battery_level = self.safe_get(self.ps.get_battery_level, default='N/A') - battery_voltage = self.safe_get(self.ps.get_battery_voltage, default='N/A') - battery_current = self.safe_get(self.ps.get_battery_current, default='N/A') - battery_led_amount = self.safe_get(self.ps.get_battery_led_amount, default='N/A') if model == 'Pisugar 2' else 'Not supported' - battery_allow_charging = self.safe_get(self.ps.get_battery_allow_charging, default=False) - battery_charging_range = self.safe_get(self.ps.get_battery_charging_range, default='N/A') if self.is_new_model or model == 'Pisugar 3' else 'Not supported' - battery_full_charge_duration = getattr(self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')() - safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=None) + battery_level = self.safe_get( + self.ps.get_battery_level, default='N/A') + battery_voltage = self.safe_get( + self.ps.get_battery_voltage, default='N/A') + battery_current = self.safe_get( + self.ps.get_battery_current, default='N/A') + battery_allow_charging = self.safe_get( + self.ps.get_battery_allow_charging, default=False) + battery_charging_range = self.safe_get( + self.ps.get_battery_charging_range, default='N/A') + battery_full_charge_duration = getattr( + self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')() + safe_shutdown_level = self.safe_get( + self.ps.get_battery_safe_shutdown_level, default=None) battery_safe_shutdown_level = f"{safe_shutdown_level}%" if safe_shutdown_level is not None else 'Not set' - battery_safe_shutdown_delay = self.safe_get(self.ps.get_battery_safe_shutdown_delay, default='N/A') - battery_auto_power_on = self.safe_get(self.ps.get_battery_auto_power_on, default=False) - battery_soft_poweroff = self.safe_get(self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False - system_time = self.safe_get(self.ps.get_system_time, default='N/A') - rtc_adjust_ppm = self.safe_get(self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported' - rtc_alarm_repeat = self.safe_get(self.ps.get_rtc_alarm_repeat, default='N/A') - single_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='single'), default=False) - double_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='double'), default=False) - long_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='long'), default=False) - single_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='single'), default='N/A') - double_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='double'), default='N/A') - long_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='long'), default='N/A') - anti_mistouch = self.safe_get(self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False - temperature = self.safe_get(self.ps.get_temperature, default='N/A') + battery_safe_shutdown_delay = self.safe_get( + self.ps.get_battery_safe_shutdown_delay, default='N/A') + battery_auto_power_on = self.safe_get( + self.ps.get_battery_auto_power_on, default=False) + battery_soft_poweroff = self.safe_get( + self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False + system_time = self.safe_get( + self.ps.get_system_time, default='N/A') + rtc_adjust_ppm = self.safe_get( + self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported' + rtc_alarm_repeat = self.safe_get( + self.ps.get_rtc_alarm_repeat, default='N/A') + single_tap_enabled = self.safe_get( + lambda: self.ps.get_tap_enable(tap='single'), default=False) + double_tap_enabled = self.safe_get( + lambda: self.ps.get_tap_enable(tap='double'), default=False) + long_tap_enabled = self.safe_get( + lambda: self.ps.get_tap_enable(tap='long'), default=False) + single_tap_shell = self.safe_get( + lambda: self.ps.get_tap_shell(tap='single'), default='N/A') + double_tap_shell = self.safe_get( + lambda: self.ps.get_tap_shell(tap='double'), default='N/A') + long_tap_shell = self.safe_get( + lambda: self.ps.get_tap_shell(tap='long'), default='N/A') + anti_mistouch = self.safe_get( + self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False + temperature = self.safe_get( + self.ps.get_temperature, default='N/A') ret = ''' @@ -567,8 +644,7 @@ class PiSugar(plugins.Plugin): Battery Level{battery_level}% Battery Voltage{battery_voltage}V Battery Current{battery_current}A - Battery LED Amount{battery_led_amount} - Battery Allow Charging{"Yes" if battery_allow_charging and self.is_new_model else "No"} + Battery Allow Charging{"Yes" if battery_allow_charging else "No"} Battery Charging Range{battery_charging_range} Duration of Keep Charging When Full{battery_full_charge_duration} seconds Battery Safe Shutdown Level{battery_safe_shutdown_level} @@ -635,13 +711,25 @@ class PiSugar(plugins.Plugin): # Make sure "bat" is in the UI state (guard to prevent KeyError) if 'bat' not in ui._state._state: return + try: + self.ready = self.ps.ready + except Exception as e: + # Log at debug to avoid clutter since it might be a false positive + logging.warning(f"[PiSugarX] {e}") + if self.ready: + capacity = self.safe_get(self.ps.get_battery_level, default=0) + voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00) + temp = self.safe_get(self.ps.get_temperature, default=0) - capacity = self.safe_get(self.ps.get_battery_level, default=0) - voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00) - temp = self.safe_get(self.ps.get_temperature, default=0) + else: + capacity = 0 + voltage = 0.00 + temp = 0 + logging.info(f"[PiSugarX] PiSugar is not ready") # Check if battery is plugged in - battery_plugged = self.safe_get(self.ps.get_battery_power_plugged, default=False) + battery_plugged = self.safe_get( + self.ps.get_battery_power_plugged, default=False) if battery_plugged: # If plugged in, display "CHG" @@ -670,13 +758,3 @@ class PiSugar(plugins.Plugin): ui.set('bat', f"{capacity:.0f}%") elif self.default_display == 'temp': ui.set('bat', f"{temp}°C") - - charging = self.safe_get(self.ps.get_battery_charging, default=None) - safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=0) - if charging is not None: - if capacity <= safe_shutdown_level: - logging.info( - f"[PiSugarX] Empty battery (<= {safe_shutdown_level}%): shutting down" - ) - ui.update(force=True, new_data={"status": "Battery exhausted, bye ..."}) - From 5a113a2163c7db4bebcc1ead2253d23879603d20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Sat, 8 Feb 2025 17:29:33 +0100 Subject: [PATCH 07/33] wigle.py update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CSV format debug - Update auth method to API Name+key - Add logs - Code refactor and cleaning stil debugging Signed-off-by: Frédéric --- pwnagotchi/plugins/default/wigle.py | 372 ++++++++++++++++------------ 1 file changed, 218 insertions(+), 154 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 5577ebee..0e498d94 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -4,13 +4,22 @@ import json import csv import requests import pwnagotchi - -from io import StringIO -from datetime import datetime -from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted +import re from threading import Lock +from io import StringIO +from datetime import datetime, UTC + +from flask import make_response, redirect +from pwnagotchi.utils import ( + WifiInfo, + FieldNotFoundError, + extract_from_pcap, + StatusFile, + remove_whitelisted, +) from pwnagotchi import plugins from pwnagotchi._version import __version__ as __pwnagotchi_version__ +from scapy.all import Scapy_Exception def _extract_gps_data(path): @@ -19,197 +28,252 @@ def _extract_gps_data(path): return json-obj """ - try: - if path.endswith('.geo.json'): - with open(path, 'r') as json_file: + if path.endswith(".geo.json"): + with open(path, "r") as json_file: tempJson = json.load(json_file) - d = datetime.utcfromtimestamp(int(tempJson["ts"])) - return {"Latitude": tempJson["location"]["lat"], - "Longitude": tempJson["location"]["lng"], - "Altitude": 10, - "Accuracy": tempJson["accuracy"], - "Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')} - else: - with open(path, 'r') as json_file: - return json.load(json_file) - except OSError as os_err: - raise os_err - except json.JSONDecodeError as json_err: - raise json_err + d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC) + return { + "Latitude": tempJson["location"]["lat"], + "Longitude": tempJson["location"]["lng"], + "Altitude": 10, + "Accuracy": tempJson["accuracy"], + "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"), + } + with open(path, "r") as json_file: + return json.load(json_file) + except (OSError, json.JSONDecodeError) as exp: + raise exp - -def _format_auth(data): - out = "" - for auth in data: - out = f"{out}[{auth}]" - return [f"{auth}" for auth in data] - - -def _transform_wigle_entry(gps_data, pcap_data, plugin_version): +def _transform_wigle_entry(gps_data, pcap_data): """ Transform to wigle entry in file """ + logging.info(f"transform to wigle") dummy = StringIO() - # write kismet header - dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__}," - f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n") - dummy.write( - "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n") - writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") - writer.writerow([ - pcap_data[WifiInfo.BSSID], - pcap_data[WifiInfo.ESSID], - _format_auth(pcap_data[WifiInfo.ENCRYPTION]), - datetime.strptime(gps_data['Updated'].rsplit('.')[0], - "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'), - pcap_data[WifiInfo.CHANNEL], - pcap_data[WifiInfo.RSSI], - gps_data['Latitude'], - gps_data['Longitude'], - gps_data['Altitude'], - gps_data['Accuracy'], - 'WIFI']) + writer.writerow( + [ + pcap_data[WifiInfo.BSSID], + pcap_data[WifiInfo.ESSID], + list(pcap_data[WifiInfo.ENCRYPTION]), + datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S"), + pcap_data[WifiInfo.CHANNEL], + pcap_data[WifiInfo.RSSI], + gps_data["Latitude"], + gps_data["Longitude"], + gps_data["Altitude"], + gps_data["Accuracy"], + "WIFI", + ] + ) return dummy.getvalue() -def _send_to_wigle(lines, api_key, donate=True, timeout=30): +def _generate_csv(lines, plugin_version): + """ + Generates the csv file + """ + dummy = StringIO() + # write kismet header + dummy.write( + f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__}," + f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" + ) + # write header + dummy.write( + "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n" + ) + # write WIFIs + for line in lines: + dummy.write(f"{line}") + dummy.seek(0) + return dummy + +def to_file(filename, content): + try: + with open(f"/tmp/{filename}", mode="w") as f: + f.write(content) + except Exception as exp: + logging.debug(f"WIGLE: {exp}") + pass + +def _send_to_wigle(lines, api_name, api_key, plugin_version, donate=False, timeout=30): """ Uploads the file to wigle-net """ + dummy = _generate_csv(lines, plugin_version) - dummy = StringIO() - - for line in lines: - dummy.write(f"{line}") - - dummy.seek(0) - - headers = {"Authorization": f"Basic {api_key}", - "Accept": "application/json", - "HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"} - data = {"donate": "on" if donate else "false"} - payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})} + date = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{pwnagotchi.name()}_{date}.csv" + payload = { + "file": ( + filename, + dummy, + "text/csv", + ) + } + to_file(filename, dummy.getvalue()) try: - res = requests.post('https://api.wigle.net/api/v2/file/upload', - data=data, - headers=headers, - files=payload, - timeout=timeout) + res = requests.post( + "https://api.wigle.net/api/v2/file/upload", + headers={"Accept": "application/json"}, + auth=(api_name, api_key), + data={"donate": "on" if donate else "false"}, + files=payload, + timeout=timeout, + ) json_res = res.json() - if not json_res['success']: - raise requests.exceptions.RequestException(json_res['message']) + logging.info(f"Request result: {json_res}") + if not json_res["success"]: + raise requests.exceptions.RequestException(json_res["message"]) except requests.exceptions.RequestException as re_e: raise re_e class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony" - __version__ = "3.1.0" + __version__ = "4.0.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" def __init__(self): self.ready = False - self.report = StatusFile('/root/.wigle_uploads', data_format='json') + self.report = StatusFile("/root/.wigle_uploads", data_format="json") self.skip = list() self.lock = Lock() self.options = dict() + self.api_name = None + self.api_key = None + self.donate = False + self.handshake_dir = None + self.whitelist = None - def on_loaded(self): - if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None): - logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net") + def on_config_changed(self, config): + self.api_name = self.options.get("api_name", None) + self.api_key = self.options.get("api_key", None) + if not (self.api_name and self.api_key): + logging.debug( + "WIGLE: api_name and/or api_key isn't set. Can't upload to wigle.net" + ) return - - if 'donate' not in self.options: - self.options['donate'] = False - + self.donate = self.options.get("donate", False) + self.handshake_dir = config["bettercap"].get("handshakes", None) + self.whitelist = config["main"].get("whitelist", None) self.ready = True logging.info("WIGLE: ready") - + def on_webhook(self, path, request): - from flask import make_response, redirect response = make_response(redirect("https://www.wigle.net/", code=302)) return response + def get_new_gps_files(self, reported): + all_files = os.listdir(self.handshake_dir) + all_gps_files = [ + os.path.join(self.handshake_dir, filename) + for filename in all_files + if filename.endswith(".gps.json") or filename.endswith(".geo.json") + ] + + all_gps_files = remove_whitelisted(all_gps_files, self.whitelist) + return set(all_gps_files) - set(reported) - set(self.skip) + + @staticmethod + def get_pcap_filename(gps_file): + pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file) + if not os.path.exists(pcap_filename): + logging.debug("WIGLE: Can't find pcap for %s", gps_file) + return None + return pcap_filename + + @staticmethod + def get_gps_data(gps_file): + try: + gps_data = _extract_gps_data(gps_file) + except (OSError, json.JSONDecodeError) as exp: + logging.debug(f"WIGLE: {exp}") + return None + if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0: + logging.debug( + f"WIGLE: Not enough gps-information for {gps_file}. Trying again next time." + ) + return None + return gps_data + + @staticmethod + def get_pcap_data(pcap_filename): + try: + logging.info(f"Extracting PCAP for {pcap_filename}") + pcap_data = extract_from_pcap( + pcap_filename, + [ + WifiInfo.BSSID, + WifiInfo.ESSID, + WifiInfo.ENCRYPTION, + WifiInfo.CHANNEL, + WifiInfo.RSSI, + ], + ) + logging.info(f"PCAP DATA for {pcap_data}") + logging.info(f"Extracting PCAP for {pcap_filename} DONE: {pcap_data}") + except FieldNotFoundError: + logging.debug( + f"WIGLE: Could not extract all information. Skip {pcap_filename}" + ) + return None + except Scapy_Exception as sc_e: + logging.debug(f"WIGLE: {sc_e}") + return None + return pcap_data + + def upload(self, reported, csv_entries, no_err_entries): + try: + logging.info("Uploading to Wigle") + _send_to_wigle( + csv_entries, self.api_name, self.api_key, self.__version__, donate=self.donate + ) + reported += no_err_entries + self.report.update(data={"reported": reported}) + logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) + except requests.exceptions.RequestException as re_e: + self.skip += no_err_entries + logging.debug("WIGLE: Got an exception while uploading %s", re_e) + except OSError as os_e: + self.skip += no_err_entries + logging.debug("WIGLE: Got the following error: %s", os_e) + def on_internet_available(self, agent): """ Called when there's internet connectivity """ - if not self.ready or self.lock.locked(): + if not self.ready: return - - from scapy.all import Scapy_Exception - - config = agent.config() - display = agent.view() - reported = self.report.data_field_or('reported', default=list()) - handshake_dir = config['bettercap']['handshakes'] - all_files = os.listdir(handshake_dir) - all_gps_files = [os.path.join(handshake_dir, filename) - for filename in all_files - if filename.endswith('.gps.json') or filename.endswith('.geo.json')] - - all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist']) - new_gps_files = set(all_gps_files) - set(reported) - set(self.skip) - if new_gps_files: - logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net") - csv_entries = list() - no_err_entries = list() - for gps_file in new_gps_files: - if gps_file.endswith('.gps.json'): - pcap_filename = gps_file.replace('.gps.json', '.pcap') - if gps_file.endswith('.geo.json'): - pcap_filename = gps_file.replace('.geo.json', '.pcap') - if not os.path.exists(pcap_filename): - logging.debug("WIGLE: Can't find pcap for %s", gps_file) - self.skip.append(gps_file) - continue - try: - gps_data = _extract_gps_data(gps_file) - except OSError as os_err: - logging.debug("WIGLE: %s", os_err) - self.skip.append(gps_file) - continue - except json.JSONDecodeError as json_err: - logging.debug("WIGLE: %s", json_err) - self.skip.append(gps_file) - continue - if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0: - logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file) - self.skip.append(gps_file) - continue - try: - pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID, - WifiInfo.ESSID, - WifiInfo.ENCRYPTION, - WifiInfo.CHANNEL, - WifiInfo.RSSI]) - except FieldNotFoundError: - logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file) - self.skip.append(gps_file) - continue - except Scapy_Exception as sc_e: - logging.debug("WIGLE: %s", sc_e) - self.skip.append(gps_file) - continue - new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__) - csv_entries.append(new_entry) - no_err_entries.append(gps_file) - if csv_entries: - display.on_uploading('wigle.net') - - try: - _send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate']) - reported += no_err_entries - self.report.update(data={'reported': reported}) - logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) - except requests.exceptions.RequestException as re_e: - self.skip += no_err_entries - logging.debug("WIGLE: Got an exception while uploading %s", re_e) - except OSError as os_e: - self.skip += no_err_entries - logging.debug("WIGLE: Got the following error: %s", os_e) - - display.on_normal() + with self.lock: + reported = self.report.data_field_or("reported", default=list()) + if new_gps_files := self.get_new_gps_files(reported): + logging.info( + "WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net" + ) + csv_entries = list() + no_err_entries = list() + for gps_file in new_gps_files: + logging.info(f"WIGLE: handeling {gps_file}") + if not (pcap_filename := self.get_pcap_filename(gps_file)): + self.skip.append(gps_file) + continue + if not (gps_data := self.get_gps_data(gps_file)): + self.skip.append(gps_file) + continue + if not (pcap_data := self.get_pcap_data(pcap_filename)): + self.skip.append(gps_file) + continue + new_entry = _transform_wigle_entry(gps_data, pcap_data) + csv_entries.append(new_entry) + no_err_entries.append(gps_file) + if csv_entries: + display = agent.view() + display.on_uploading("wigle.net") + self.upload(reported, csv_entries, no_err_entries) + display.on_normal() From d4874a18e1039f827ef32aa32228d207a9949db3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Sun, 9 Feb 2025 01:32:59 +0100 Subject: [PATCH 08/33] utils.py update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add WifiInfo.FREQUENCY to extract_from_pcap() - Move the "Invalid field" to the the "else" as a default Signed-off-by: Frédéric --- pwnagotchi/utils.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/pwnagotchi/utils.py b/pwnagotchi/utils.py index 3fcf7cc7..26f507a8 100644 --- a/pwnagotchi/utils.py +++ b/pwnagotchi/utils.py @@ -564,7 +564,8 @@ class WifiInfo(Enum): ESSID = 1 ENCRYPTION = 2 CHANNEL = 3 - RSSI = 4 + FREQUENCY = 4 + RSSI = 5 class FieldNotFoundError(Exception): @@ -594,9 +595,6 @@ def extract_from_pcap(path, fields): """ results = dict() for field in fields: - if not isinstance(field, WifiInfo): - raise TypeError("Invalid field") - subtypes = set() if field == WifiInfo.BSSID: @@ -606,10 +604,9 @@ def extract_from_pcap(path, fields): packets = sniff(offline=path, filter=bpf_filter) try: for packet in packets: - if packet.haslayer(Dot11Beacon): - if hasattr(packet[Dot11], 'addr3'): - results[field] = packet[Dot11].addr3 - break + if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'): + results[field] = packet[Dot11].addr3 + break else: # magic raise FieldNotFoundError("Could not find field [BSSID]") except Exception: @@ -654,6 +651,14 @@ def extract_from_pcap(path, fields): results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency) except Exception: raise FieldNotFoundError("Could not find field [CHANNEL]") + elif field == WifiInfo.FREQUENCY: + from scapy.layers.dot11 import sniff, RadioTap + from pwnagotchi.mesh.wifi import freq_to_channel + packets = sniff(offline=path, count=1) + try: + results[field] = packets[0][RadioTap].ChannelFrequency + except Exception: + raise FieldNotFoundError("Could not find field [FREQUENCY]") elif field == WifiInfo.RSSI: from scapy.layers.dot11 import sniff, RadioTap from pwnagotchi.mesh.wifi import freq_to_channel @@ -662,7 +667,8 @@ def extract_from_pcap(path, fields): results[field] = packets[0][RadioTap].dBm_AntSignal except Exception: raise FieldNotFoundError("Could not find field [RSSI]") - + else: + raise TypeError("Invalid field") return results From 0b3b38bb44ee9a356cf3b4270cac0522d967de72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Sun, 9 Feb 2025 01:50:57 +0100 Subject: [PATCH 09/33] Wigle.py update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Correct the CSV format - Add 2 missing columns: frequency(populated) and RCOI(not populated) - Before sending to wigle, if cvs_dir is set, the CVS data ti written to that directory - Update auth method to match the api: api_name + api_key - Refactoring into only one plugin class Signed-off-by: Frédéric --- pwnagotchi/plugins/default/wigle.py | 303 ++++++++++++---------------- 1 file changed, 124 insertions(+), 179 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 0e498d94..a4b79970 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -5,6 +5,7 @@ import csv import requests import pwnagotchi import re +from glob import glob from threading import Lock from io import StringIO from datetime import datetime, UTC @@ -22,119 +23,8 @@ from pwnagotchi._version import __version__ as __pwnagotchi_version__ from scapy.all import Scapy_Exception -def _extract_gps_data(path): - """ - Extract data from gps-file - - return json-obj - """ - try: - if path.endswith(".geo.json"): - with open(path, "r") as json_file: - tempJson = json.load(json_file) - d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC) - return { - "Latitude": tempJson["location"]["lat"], - "Longitude": tempJson["location"]["lng"], - "Altitude": 10, - "Accuracy": tempJson["accuracy"], - "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"), - } - with open(path, "r") as json_file: - return json.load(json_file) - except (OSError, json.JSONDecodeError) as exp: - raise exp - -def _transform_wigle_entry(gps_data, pcap_data): - """ - Transform to wigle entry in file - """ - logging.info(f"transform to wigle") - dummy = StringIO() - writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") - writer.writerow( - [ - pcap_data[WifiInfo.BSSID], - pcap_data[WifiInfo.ESSID], - list(pcap_data[WifiInfo.ENCRYPTION]), - datetime.strptime( - gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" - ).strftime("%Y-%m-%d %H:%M:%S"), - pcap_data[WifiInfo.CHANNEL], - pcap_data[WifiInfo.RSSI], - gps_data["Latitude"], - gps_data["Longitude"], - gps_data["Altitude"], - gps_data["Accuracy"], - "WIFI", - ] - ) - return dummy.getvalue() - - -def _generate_csv(lines, plugin_version): - """ - Generates the csv file - """ - dummy = StringIO() - # write kismet header - dummy.write( - f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__}," - f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" - ) - # write header - dummy.write( - "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n" - ) - # write WIFIs - for line in lines: - dummy.write(f"{line}") - dummy.seek(0) - return dummy - -def to_file(filename, content): - try: - with open(f"/tmp/{filename}", mode="w") as f: - f.write(content) - except Exception as exp: - logging.debug(f"WIGLE: {exp}") - pass - -def _send_to_wigle(lines, api_name, api_key, plugin_version, donate=False, timeout=30): - """ - Uploads the file to wigle-net - """ - dummy = _generate_csv(lines, plugin_version) - - date = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"{pwnagotchi.name()}_{date}.csv" - payload = { - "file": ( - filename, - dummy, - "text/csv", - ) - } - to_file(filename, dummy.getvalue()) - try: - res = requests.post( - "https://api.wigle.net/api/v2/file/upload", - headers={"Accept": "application/json"}, - auth=(api_name, api_key), - data={"donate": "on" if donate else "false"}, - files=payload, - timeout=timeout, - ) - json_res = res.json() - logging.info(f"Request result: {json_res}") - if not json_res["success"]: - raise requests.exceptions.RequestException(json_res["message"]) - except requests.exceptions.RequestException as re_e: - raise re_e - - class Wigle(plugins.Plugin): - __author__ = "Dadav and updated by Jayofelony" + __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" @@ -145,38 +35,28 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() - self.api_name = None - self.api_key = None - self.donate = False - self.handshake_dir = None - self.whitelist = None def on_config_changed(self, config): - self.api_name = self.options.get("api_name", None) - self.api_key = self.options.get("api_key", None) - if not (self.api_name and self.api_key): - logging.debug( - "WIGLE: api_name and/or api_key isn't set. Can't upload to wigle.net" - ) + api_name = self.options.get("api_name", None) + api_key = self.options.get("api_key", None) + if not (api_name and api_key): + logging.info("[WIGLE] api_name and api_key must be set.") return + self.auth = (api_name, api_key) self.donate = self.options.get("donate", False) self.handshake_dir = config["bettercap"].get("handshakes", None) + self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", None) + self.timeout = config["main"].get("timeout", 30) self.ready = True - logging.info("WIGLE: ready") + logging.info("[WIGLE] Ready for wardriving!!!") def on_webhook(self, path, request): - response = make_response(redirect("https://www.wigle.net/", code=302)) - return response + return make_response(redirect("https://www.wigle.net/", code=302)) def get_new_gps_files(self, reported): - all_files = os.listdir(self.handshake_dir) - all_gps_files = [ - os.path.join(self.handshake_dir, filename) - for filename in all_files - if filename.endswith(".gps.json") or filename.endswith(".geo.json") - ] - + all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json")) + all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json")) all_gps_files = remove_whitelisted(all_gps_files, self.whitelist) return set(all_gps_files) - set(reported) - set(self.skip) @@ -184,28 +64,47 @@ class Wigle(plugins.Plugin): def get_pcap_filename(gps_file): pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file) if not os.path.exists(pcap_filename): - logging.debug("WIGLE: Can't find pcap for %s", gps_file) + logging.debug("[WIGLE] Can't find pcap for %s", gps_file) return None return pcap_filename @staticmethod - def get_gps_data(gps_file): + def extract_gps_data(path): + """ + Extract data from gps-file + return json-obj + """ try: - gps_data = _extract_gps_data(gps_file) + if path.endswith(".geo.json"): + with open(path, "r") as json_file: + tempJson = json.load(json_file) + d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC) + return { + "Latitude": tempJson["location"]["lat"], + "Longitude": tempJson["location"]["lng"], + "Altitude": 10, + "Accuracy": tempJson["accuracy"], + "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"), + } + with open(path, "r") as json_file: + return json.load(json_file) except (OSError, json.JSONDecodeError) as exp: - logging.debug(f"WIGLE: {exp}") + raise exp + + def get_gps_data(self, gps_file): + try: + gps_data = self.extract_gps_data(gps_file) + except (OSError, json.JSONDecodeError) as exp: + logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}") return None if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0: - logging.debug( - f"WIGLE: Not enough gps-information for {gps_file}. Trying again next time." - ) + logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.") return None return gps_data @staticmethod def get_pcap_data(pcap_filename): try: - logging.info(f"Extracting PCAP for {pcap_filename}") pcap_data = extract_from_pcap( pcap_filename, [ @@ -213,67 +112,113 @@ class Wigle(plugins.Plugin): WifiInfo.ESSID, WifiInfo.ENCRYPTION, WifiInfo.CHANNEL, + WifiInfo.FREQUENCY, WifiInfo.RSSI, ], ) - logging.info(f"PCAP DATA for {pcap_data}") - logging.info(f"Extracting PCAP for {pcap_filename} DONE: {pcap_data}") + logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}") except FieldNotFoundError: - logging.debug( - f"WIGLE: Could not extract all information. Skip {pcap_filename}" - ) + logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)") return None except Scapy_Exception as sc_e: - logging.debug(f"WIGLE: {sc_e}") + logging.debug(f"[WIGLE] {sc_e}") return None return pcap_data - def upload(self, reported, csv_entries, no_err_entries): - try: - logging.info("Uploading to Wigle") - _send_to_wigle( - csv_entries, self.api_name, self.api_key, self.__version__, donate=self.donate + def generate_csv(self, data): + date = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{pwnagotchi.name()}_{date}.csv" + + content = StringIO() + # write kismet header + header + content.write( + f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__}," + f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" + f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n" + ) + writer = csv.writer( + content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\" + ) + for gps_data, pcap_data in data: # write WIFIs + writer.writerow( + [ + pcap_data[WifiInfo.BSSID], + pcap_data[WifiInfo.ESSID], + f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]", + datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S"), + pcap_data[WifiInfo.CHANNEL], + pcap_data[WifiInfo.FREQUENCY], + pcap_data[WifiInfo.RSSI], + gps_data["Latitude"], + gps_data["Longitude"], + gps_data["Altitude"], + gps_data["Accuracy"], + "", # RCOIs to populate + "", # MfgrId always empty + "WIFI", + ] ) + content.seek(0) + return filename, content + + def save_to_file(self, cvs_filename, cvs_content): + if not self.cvs_dir: + return + filename = os.path.join(self.cvs_dir, cvs_filename) + logging.info(f"[WIGLE] Saving to file {filename}") + try: + with open(filename, mode="w") as f: + f.write(cvs_content.getvalue()) + except Exception as exp: + logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}") + + def upload_to_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): + try: + json_res = requests.post( + "https://api.wigle.net/api/v2/file/upload", + headers={"Accept": "application/json"}, + auth=self.auth, + data={"donate": "on" if self.donate else "false"}, + files=dict(file=(cvs_filename, cvs_content, "text/csv")), + timeout=self.timeout, + ).json() + if not json_res["success"]: + raise requests.exceptions.RequestException(json_res["message"]) reported += no_err_entries self.report.update(data={"reported": reported}) - logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) - except requests.exceptions.RequestException as re_e: + logging.info("[WIGLE] Successfully uploaded %d wifis", len(no_err_entries)) + except (requests.exceptions.RequestException, OSError) as exp: self.skip += no_err_entries - logging.debug("WIGLE: Got an exception while uploading %s", re_e) - except OSError as os_e: - self.skip += no_err_entries - logging.debug("WIGLE: Got the following error: %s", os_e) + logging.debug(f"[WIGLE] Exception while uploading: {exp}") def on_internet_available(self, agent): - """ - Called when there's internet connectivity - """ if not self.ready: return with self.lock: reported = self.report.data_field_or("reported", default=list()) if new_gps_files := self.get_new_gps_files(reported): - logging.info( - "WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net" - ) - csv_entries = list() - no_err_entries = list() + logging.info("[WIGLE] Uploading new handshakes to wigle.net") + csv_entries, no_err_entries = list(), list() for gps_file in new_gps_files: - logging.info(f"WIGLE: handeling {gps_file}") - if not (pcap_filename := self.get_pcap_filename(gps_file)): + logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}") + if ( + (pcap_filename := self.get_pcap_filename(gps_file)) + and (gps_data := self.get_gps_data(gps_file)) + and (pcap_data := self.get_pcap_data(pcap_filename)) + ): + csv_entries.append((gps_data, pcap_data)) + no_err_entries.append(gps_file) + else: self.skip.append(gps_file) - continue - if not (gps_data := self.get_gps_data(gps_file)): - self.skip.append(gps_file) - continue - if not (pcap_data := self.get_pcap_data(pcap_filename)): - self.skip.append(gps_file) - continue - new_entry = _transform_wigle_entry(gps_data, pcap_data) - csv_entries.append(new_entry) - no_err_entries.append(gps_file) + logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") if csv_entries: + cvs_filename, cvs_content = self.generate_csv(csv_entries) + self.save_to_file(cvs_filename, cvs_content) display = agent.view() display.on_uploading("wigle.net") - self.upload(reported, csv_entries, no_err_entries) + self.upload_to_wigle( + reported, cvs_filename, cvs_content, no_err_entries + ) display.on_normal() From 2893632ee50fe6bbfdc8a952b9cbaf3c55fee60d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Sun, 9 Feb 2025 18:44:02 +0100 Subject: [PATCH 10/33] Add display MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Now the plugins display some statistics from WIGLE Signed-off-by: Frédéric --- pwnagotchi/plugins/default/wigle.py | 130 ++++++++++++++++++++++------ 1 file changed, 102 insertions(+), 28 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index a4b79970..d446973d 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -20,6 +20,11 @@ from pwnagotchi.utils import ( ) from pwnagotchi import plugins from pwnagotchi._version import __version__ as __pwnagotchi_version__ + +import pwnagotchi.ui.fonts as fonts +from pwnagotchi.ui.components import Text +from pwnagotchi.ui.view import BLACK + from scapy.all import Scapy_Exception @@ -28,6 +33,7 @@ class Wigle(plugins.Plugin): __version__ = "4.0.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" + LABEL_SPACING = 0 def __init__(self): self.ready = False @@ -35,6 +41,16 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() + self.statistics = dict( + ready=False, + username=None, + rank=None, + monthrank=None, + discoveredwiFi=None, + last=None, + ) + self.last_stat = datetime.now(tz=UTC) + self.ui_counter = 0 def on_config_changed(self, config): api_name = self.options.get("api_name", None) @@ -44,10 +60,11 @@ class Wigle(plugins.Plugin): return self.auth = (api_name, api_key) self.donate = self.options.get("donate", False) - self.handshake_dir = config["bettercap"].get("handshakes", None) + self.handshake_dir = config["bettercap"].get("handshakes") self.cvs_dir = self.options.get("cvs_dir", None) - self.whitelist = config["main"].get("whitelist", None) - self.timeout = config["main"].get("timeout", 30) + self.whitelist = config["main"].get("whitelist", []) + self.timeout = self.options.get("timeout", 30) + self.position = self.options.get("position", (10, 10)) self.ready = True logging.info("[WIGLE] Ready for wardriving!!!") @@ -174,7 +191,7 @@ class Wigle(plugins.Plugin): except Exception as exp: logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}") - def upload_to_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): + def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): try: json_res = requests.post( "https://api.wigle.net/api/v2/file/upload", @@ -188,37 +205,94 @@ class Wigle(plugins.Plugin): raise requests.exceptions.RequestException(json_res["message"]) reported += no_err_entries self.report.update(data={"reported": reported}) - logging.info("[WIGLE] Successfully uploaded %d wifis", len(no_err_entries)) + logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis") except (requests.exceptions.RequestException, OSError) as exp: self.skip += no_err_entries logging.debug(f"[WIGLE] Exception while uploading: {exp}") + def upload_new_handshakes(self, reported, new_gps_files, agent): + logging.info("[WIGLE] Uploading new handshakes to wigle.net") + csv_entries, no_err_entries = list(), list() + for gps_file in new_gps_files: + logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}") + if ( + (pcap_filename := self.get_pcap_filename(gps_file)) + and (gps_data := self.get_gps_data(gps_file)) + and (pcap_data := self.get_pcap_data(pcap_filename)) + ): + csv_entries.append((gps_data, pcap_data)) + no_err_entries.append(gps_file) + else: + self.skip.append(gps_file) + logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") + if csv_entries: + cvs_filename, cvs_content = self.generate_csv(csv_entries) + self.save_to_file(cvs_filename, cvs_content) + display = agent.view() + display.on_uploading("wigle.net") + self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) + display.on_normal() + + def get_statistics(self): + if (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: + return + self.last_stat = datetime.now(tz=UTC) + try: + self.statistics["ready"] = False + json_res = requests.get( + "https://api.wigle.net/api/v2/stats/user", + headers={"Accept": "application/json"}, + auth=self.auth, + timeout=self.timeout, + ).json() + if not json_res["success"]: + return + self.statistics["ready"] = True + self.statistics["username"] = json_res["user"] + self.statistics["rank"] = json_res["rank"] + self.statistics["monthrank"] = json_res["monthRank"] + self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"] + last = json_res["statistics"]["last"] + self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" + except (requests.exceptions.RequestException, OSError) as exp: + pass + def on_internet_available(self, agent): if not self.ready: return with self.lock: reported = self.report.data_field_or("reported", default=list()) if new_gps_files := self.get_new_gps_files(reported): - logging.info("[WIGLE] Uploading new handshakes to wigle.net") - csv_entries, no_err_entries = list(), list() - for gps_file in new_gps_files: - logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}") - if ( - (pcap_filename := self.get_pcap_filename(gps_file)) - and (gps_data := self.get_gps_data(gps_file)) - and (pcap_data := self.get_pcap_data(pcap_filename)) - ): - csv_entries.append((gps_data, pcap_data)) - no_err_entries.append(gps_file) - else: - self.skip.append(gps_file) - logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") - if csv_entries: - cvs_filename, cvs_content = self.generate_csv(csv_entries) - self.save_to_file(cvs_filename, cvs_content) - display = agent.view() - display.on_uploading("wigle.net") - self.upload_to_wigle( - reported, cvs_filename, cvs_content, no_err_entries - ) - display.on_normal() + self.upload_new_handshakes(reported, new_gps_files, agent) + else: + self.get_statistics() + + def on_ui_setup(self, ui): + with ui._lock: + ui.add_element( + "wigle", + Text(value="-", position=self.position, font=fonts.Small, color=BLACK), + ) + + def on_unload(self, ui): + with ui._lock: + ui.remove_element("wigle") + + def on_ui_update(self, ui): + if not self.ready: + return + with ui._lock: + if not self.statistics["ready"]: + ui.set("wigle", "We Will Wait Wigle") + return + msg = "-" + self.ui_counter = (self.ui_counter + 1) % 4 + if self.ui_counter == 0: + msg = f"User:{self.statistics['username']}" + if self.ui_counter == 1: + msg = f"Rank:{self.statistics['rank']} Monthly:{self.statistics['monthrank']}" + elif self.ui_counter == 2: + msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" + elif self.ui_counter == 3: + msg = f"Last report:{self.statistics['last']}" + ui.set("wigle", msg) From beb8308969e1ce998f1c370443ed7cb3ea7038d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=B2=E5=B1=8E=E5=B0=86=E5=86=9B?= <37292630+wlmh110@users.noreply.github.com> Date: Mon, 10 Feb 2025 18:23:26 +0800 Subject: [PATCH 11/33] Update pisugarx.py Added charging voltage protection function --- pwnagotchi/plugins/default/pisugarx.py | 92 +++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 3 deletions(-) diff --git a/pwnagotchi/plugins/default/pisugarx.py b/pwnagotchi/plugins/default/pisugarx.py index 984225a5..9d1a797f 100644 --- a/pwnagotchi/plugins/default/pisugarx.py +++ b/pwnagotchi/plugins/default/pisugarx.py @@ -50,6 +50,7 @@ curve5000 = [ ] + class PiSugarServer: def __init__(self): """ @@ -70,6 +71,7 @@ class PiSugarServer: self.lowpower_shutdown = False self.lowpower_shutdown_level = 10 self.max_charge_voltage_protection = False + self.max_protection_voltage=4 # Start the device connection in a background thread self.connection_thread = threading.Thread( target=self._connect_device, daemon=True) @@ -138,6 +140,20 @@ class PiSugarServer: ctr1 = self.i2creg[0x02] # 读取控制寄存器 1 self.power_plugged = (ctr1 & (1 << 7)) != 0 # 检查电源是否插入 self.allow_charging = (ctr1 & (1 << 6)) != 0 # 检查是否允许充电 + if self.max_charge_voltage_protection: + self._bus.write_byte_data( + self.address, 0x0B, 0x29) # 关闭写保护 + self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data( + self.address, 0x20) | 0b10000000) + self._bus.write_byte_data( + self.address, 0x0B, 0x00) # 开启写保护 + else: + self._bus.write_byte_data( + self.address, 0x0B, 0x29) # 关闭写保护 + self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data( + self.address, 0x20) & 0b01111111) + self._bus.write_byte_data( + self.address, 0x0B, 0x00) # 开启写保护 elif self.modle == 'PiSugar2': high = self.i2creg[0xa3] low = self.i2creg[0xa2] @@ -146,12 +162,29 @@ class PiSugarServer: 2600.0 + (((high & 0x1f) << 8) + low) * 0.26855) / 1000.0 self.power_plugged = (self.i2creg[0x55] & 0b00010000) != 0 + if self.max_charge_voltage_protection: + battery_voltage = self.battery_voltage + if (battery_voltage) > self.max_protection_voltage: + self.set_battery_notallow_charging() + else: + self.set_battery_allow_charging() + else: + self.set_battery_allow_charging() + elif self.modle == 'PiSugar2Plus': low = self.i2creg[0xd0] high = self.i2creg[0xd1] self.battery_voltage = ( (((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000 self.power_plugged = self.i2creg[0xdd] == 0x1f + if self.max_charge_voltage_protection: + battery_voltage = self.battery_voltage + if (battery_voltage) > self.max_protection_voltage: + self.set_battery_notallow_charging() + else: + self.set_battery_allow_charging() + else: + self.set_battery_allow_charging() self.voltage_history.append(self.battery_voltage) self.battery_level = self.convert_battery_voltage_to_level() @@ -161,7 +194,6 @@ class PiSugarServer: logging.info("[PiSugarX] low power shutdown now.") self.shutdown() pwnagotchi.shutdown() - time.sleep(3) except Exception as e: logging.error(f"read error{e}") @@ -171,11 +203,11 @@ class PiSugarServer: # logging.info("[PiSugarX] PiSugar set shutdown .") if self.modle == 'PiSugar3': # 10秒后关闭电源 - self._bus.write_byte_data(self.address, 0x0B, 0x29) #关闭写保护 + self._bus.write_byte_data(self.address, 0x0B, 0x29) # 关闭写保护 self._bus.write_byte_data(self.address, 0x09, 10) self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data( self.address, 0x02) & 0b11011111) - self._bus.write_byte_data(self.address, 0x0B, 0x00) #开启写保护 + self._bus.write_byte_data(self.address, 0x0B, 0x00) # 开启写保护 logging.info("[PiSugarX] PiSugar shutdown in 10s.") elif self.modle == 'PiSugar2': pass @@ -317,6 +349,60 @@ class PiSugarServer: """ return self.allow_charging + def set_battery_allow_charging(self): + print("开启充电") + if self.modle == 'PiSugar3': + pass + elif self.modle == 'PiSugar2': + # 禁止 gpio2 输出 + self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data( + self.address, 0x54) & 0b11111011) + # 开启充电 + self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data( + self.address, 0x55) & 0b11111011) + # 开启 gpio2 输出 + self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data( + self.address, 0x54) | 0b00000100) + elif self.modle == 'PiSugar2Plus': + # 禁止 gpio2 输出 + self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( + self.address, 0x56) & 0b11111011) + # 开启充电 + self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data( + self.address, 0x55) & 0b11111011) + # 开启 gpio2 输出 + self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( + self.address, 0x56) | 0b00000100) + + return + + def set_battery_notallow_charging(self): + print("关闭充电") + if self.modle == 'PiSugar3': + pass + elif self.modle == 'PiSugar2': + # 禁止 gpio2 输出 + print(self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data( + self.address, 0x54) & 0b11111011)) + # 关闭充电 + self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data( + self.address, 0x55) | 0b00000100) + # 开启 gpio2 输出 + self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data( + self.address, 0x54) | 0b00000100) + elif self.modle == 'PiSugar2Plus': + # 禁止 gpio2 输出 + self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( + self.address, 0x56) & 0b11111011) + # 关闭充电 + self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data( + self.address, 0x55) | 0b00000100) + # 开启 gpio2 输出 + self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( + self.address, 0x56) | 0b00000100) + + return + def get_battery_charging_range(self): """ Get the battery charging range. From a00d94b5205a36d4a12c1c0531c6c6392febbf5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=B2=E5=B1=8E=E5=B0=86=E5=86=9B?= <37292630+wlmh110@users.noreply.github.com> Date: Mon, 10 Feb 2025 18:26:52 +0800 Subject: [PATCH 12/33] Update pisugarx.py --- pwnagotchi/plugins/default/pisugarx.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pwnagotchi/plugins/default/pisugarx.py b/pwnagotchi/plugins/default/pisugarx.py index 9d1a797f..6f972fda 100644 --- a/pwnagotchi/plugins/default/pisugarx.py +++ b/pwnagotchi/plugins/default/pisugarx.py @@ -71,7 +71,7 @@ class PiSugarServer: self.lowpower_shutdown = False self.lowpower_shutdown_level = 10 self.max_charge_voltage_protection = False - self.max_protection_voltage=4 + self.max_protection_level=80 # Start the device connection in a background thread self.connection_thread = threading.Thread( target=self._connect_device, daemon=True) @@ -163,8 +163,9 @@ class PiSugarServer: self.power_plugged = (self.i2creg[0x55] & 0b00010000) != 0 if self.max_charge_voltage_protection: - battery_voltage = self.battery_voltage - if (battery_voltage) > self.max_protection_voltage: + self.voltage_history.append(self.battery_voltage) + self.battery_level = self.convert_battery_voltage_to_level() + if (self.battery_level) > self.max_protection_level: self.set_battery_notallow_charging() else: self.set_battery_allow_charging() @@ -178,8 +179,9 @@ class PiSugarServer: (((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000 self.power_plugged = self.i2creg[0xdd] == 0x1f if self.max_charge_voltage_protection: - battery_voltage = self.battery_voltage - if (battery_voltage) > self.max_protection_voltage: + self.voltage_history.append(self.battery_voltage) + self.battery_level = self.convert_battery_voltage_to_level() + if (self.battery_level) > self.max_protection_level: self.set_battery_notallow_charging() else: self.set_battery_allow_charging() From d3231a11ce5ee164de36150ec3b4c1181b4bf808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=B2=E5=B1=8E=E5=B0=86=E5=86=9B?= <37292630+wlmh110@users.noreply.github.com> Date: Mon, 10 Feb 2025 19:08:19 +0800 Subject: [PATCH 13/33] Update pisugarx.py bug fix --- pwnagotchi/plugins/default/pisugarx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pwnagotchi/plugins/default/pisugarx.py b/pwnagotchi/plugins/default/pisugarx.py index 6f972fda..c4e07a48 100644 --- a/pwnagotchi/plugins/default/pisugarx.py +++ b/pwnagotchi/plugins/default/pisugarx.py @@ -370,8 +370,8 @@ class PiSugarServer: self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( self.address, 0x56) & 0b11111011) # 开启充电 - self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data( - self.address, 0x55) & 0b11111011) + self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data( + self.address, 0x58) & 0b11111011) # 开启 gpio2 输出 self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( self.address, 0x56) | 0b00000100) @@ -397,8 +397,8 @@ class PiSugarServer: self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( self.address, 0x56) & 0b11111011) # 关闭充电 - self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data( - self.address, 0x55) | 0b00000100) + self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data( + self.address, 0x58) | 0b00000100) # 开启 gpio2 输出 self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data( self.address, 0x56) | 0b00000100) From 1135ec3df1f166006104a1404a4883defb5327c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=B2=E5=B1=8E=E5=B0=86=E5=86=9B?= <37292630+wlmh110@users.noreply.github.com> Date: Mon, 10 Feb 2025 19:11:49 +0800 Subject: [PATCH 14/33] Update pisugarx.py bug fix --- pwnagotchi/plugins/default/pisugarx.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pwnagotchi/plugins/default/pisugarx.py b/pwnagotchi/plugins/default/pisugarx.py index c4e07a48..540bae8c 100644 --- a/pwnagotchi/plugins/default/pisugarx.py +++ b/pwnagotchi/plugins/default/pisugarx.py @@ -51,6 +51,7 @@ curve5000 = [ + class PiSugarServer: def __init__(self): """ @@ -118,6 +119,9 @@ class PiSugarServer: """每三秒更新pisugar状态,包括触发自动关机""" while True: try: + if( self.modle == 'PiSugar2') | (self.modle == 'PiSugar2Plus'): + self.set_battery_notallow_charging() #短暂关闭充电以获取准确电池电压 + time.sleep(0.05) self.i2creg = [] for i in range(0, 256, 32): # 计算当前读取的起始寄存器地址 @@ -352,7 +356,6 @@ class PiSugarServer: return self.allow_charging def set_battery_allow_charging(self): - print("开启充电") if self.modle == 'PiSugar3': pass elif self.modle == 'PiSugar2': @@ -379,7 +382,6 @@ class PiSugarServer: return def set_battery_notallow_charging(self): - print("关闭充电") if self.modle == 'PiSugar3': pass elif self.modle == 'PiSugar2': @@ -534,6 +536,7 @@ class PiSugarServer: pass + class PiSugar(plugins.Plugin): __author__ = "jayofelony" __version__ = "1.2" From 81d377f491a823f16f0bd39062a33868d00500d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Mon, 10 Feb 2025 19:34:19 +0100 Subject: [PATCH 15/33] Set bettercap "ble.recon off" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Frédéric --- pwnagotchi/plugins/default/bt-tether.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 045563ed..1f253f13 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -125,7 +125,7 @@ DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\ class BTTether(plugins.Plugin): __author__ = "Jayofelony, modified my fmatray" - __version__ = "1.3" + __version__ = "1.4" __license__ = "GPL3" __description__ = "A new BT-Tether plugin" @@ -213,6 +213,13 @@ class BTTether(plugins.Plugin): f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?" ) + def on_ready(self, agent): + try: + logging.info(f"[BT-Tether] Disabling bettercap's BLE module") + agent.run("ble.recon off", verbose_errors=False) + except Exception as e: + logging.info(f"[BT-Tether] Bettercap BLE was already off.") + def on_unload(self, ui): with ui._lock: ui.remove_element("bluetooth") From 5a398c70bb67dd440da4570d3a4272d354a64133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Mon, 10 Feb 2025 19:42:51 +0100 Subject: [PATCH 16/33] wigle.py update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - '.wigle_uploads' is no longer hardcoded but moved to the handshakes directory Signed-off-by: Frédéric --- pwnagotchi/plugins/default/wigle.py | 33 +++++++++++++++++------------ 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index d446973d..5f872dc5 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -37,7 +37,7 @@ class Wigle(plugins.Plugin): def __init__(self): self.ready = False - self.report = StatusFile("/root/.wigle_uploads", data_format="json") + self.report = None self.skip = list() self.lock = Lock() self.options = dict() @@ -53,20 +53,21 @@ class Wigle(plugins.Plugin): self.ui_counter = 0 def on_config_changed(self, config): - api_name = self.options.get("api_name", None) - api_key = self.options.get("api_key", None) - if not (api_name and api_key): - logging.info("[WIGLE] api_name and api_key must be set.") + self.api_key = self.options.get("api_key", None) + if not self.api_key: + logging.info("[WIGLE] api_key must be set.") return - self.auth = (api_name, api_key) self.donate = self.options.get("donate", False) self.handshake_dir = config["bettercap"].get("handshakes") + report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") + self.report = StatusFile(report_filename, data_format="json") self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) self.position = self.options.get("position", (10, 10)) self.ready = True logging.info("[WIGLE] Ready for wardriving!!!") + self.get_statistics(force=True) def on_webhook(self, path, request): return make_response(redirect("https://www.wigle.net/", code=302)) @@ -195,8 +196,10 @@ class Wigle(plugins.Plugin): try: json_res = requests.post( "https://api.wigle.net/api/v2/file/upload", - headers={"Accept": "application/json"}, - auth=self.auth, + headers={ + "Authorization": f"Basic {self.api_key}", + "Accept": "application/json", + }, data={"donate": "on" if self.donate else "false"}, files=dict(file=(cvs_filename, cvs_content, "text/csv")), timeout=self.timeout, @@ -233,16 +236,18 @@ class Wigle(plugins.Plugin): self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) display.on_normal() - def get_statistics(self): - if (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: + def get_statistics(self, force=False): + if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: return self.last_stat = datetime.now(tz=UTC) try: self.statistics["ready"] = False json_res = requests.get( "https://api.wigle.net/api/v2/stats/user", - headers={"Accept": "application/json"}, - auth=self.auth, + headers={ + "Authorization": f"Basic {self.api_key}", + "Accept": "application/json", + }, timeout=self.timeout, ).json() if not json_res["success"]: @@ -290,9 +295,9 @@ class Wigle(plugins.Plugin): if self.ui_counter == 0: msg = f"User:{self.statistics['username']}" if self.ui_counter == 1: - msg = f"Rank:{self.statistics['rank']} Monthly:{self.statistics['monthrank']}" + msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}" elif self.ui_counter == 2: msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" elif self.ui_counter == 3: - msg = f"Last report:{self.statistics['last']}" + msg = f"Last upl.:{self.statistics['last']}" ui.set("wigle", msg) From 1400e8aac86d04b7b6654fdddf08fe865b222f4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric?= Date: Mon, 10 Feb 2025 20:29:13 +0100 Subject: [PATCH 17/33] Update defaults.toml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add options for bt-tether and Wigle Signed-off-by: Frédéric --- pwnagotchi/defaults.toml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 2d9165e3..5edaf9da 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -28,7 +28,8 @@ main.plugins.bt-tether.enabled = false main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone" main.plugins.bt-tether.mac = "" main.plugins.bt-tether.phone = "" # android or ios -main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios +main.plugins.bt-tether.ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios +main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-) main.plugins.fix_services.enabled = true @@ -98,6 +99,13 @@ main.plugins.wardriver.whitelist = [ "network-2" ] +ain.plugins.wigle.enabled = false +main.plugins.wigle.api_key = "" # mandatory +main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory +main.plugins.wigle.donate = false # default: off +main.plugins.wigle.timeout = 30 # default: 30 +main.plugins.wigle.position = (7, 85) # optionnal + main.plugins.wpa-sec.enabled = false main.plugins.wpa-sec.api_key = "" main.plugins.wpa-sec.api_url = "https://wpa-sec.stanev.org" @@ -188,10 +196,11 @@ bettercap.handshakes = "/home/pi/handshakes" bettercap.silence = [ "ble.device.new", "ble.device.lost", - "ble.device.disconnected", - "ble.device.connected", "ble.device.service.discovered", "ble.device.characteristic.discovered", + "ble.device.disconnected", + "ble.device.connected", + "ble.connection.timeout", "wifi.client.new", "wifi.client.lost", "wifi.client.probe", From 40918029e74964dc4c86e75c783ebf1f21aa1e27 Mon Sep 17 00:00:00 2001 From: Jeroen Oudshoorn Date: Tue, 11 Feb 2025 10:28:20 +0100 Subject: [PATCH 18/33] Small changes Signed-off-by: Jeroen Oudshoorn --- pwnagotchi/plugins/default/bt-tether.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 1f253f13..7d797c1f 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -161,7 +161,7 @@ class BTTether(plugins.Plugin): logging.error("[BT-Tether] Phone name not provided") return if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])): - logging.error("[BT-Tether] Error with mac adresse") + logging.error("[BT-Tether] Error with mac address") return if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]): @@ -274,7 +274,7 @@ class BTTether(plugins.Plugin): bluetooth = "Error while checking bluetoothctl" try: - device =self.nmcli(["-w", "0","device", "show", self.mac]) + device = self.nmcli(["-w", "0","device", "show", self.mac]) device = device.stdout.replace('\n', '
') except Exception as e: device = "Error while checking nmcli device" From 43d58b77c9cd58c1c409fa57f908a27cf8d588cc Mon Sep 17 00:00:00 2001 From: Jeroen Oudshoorn Date: Tue, 11 Feb 2025 10:31:57 +0100 Subject: [PATCH 19/33] Remove wardriver config from defaults.toml Signed-off-by: Jeroen Oudshoorn --- pwnagotchi/defaults.toml | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 5edaf9da..64d53f67 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -84,21 +84,6 @@ main.plugins.webcfg.enabled = true main.plugins.webgpsmap.enabled = false -main.plugins.wardriver.enabled = false -main.plugins.wardriver.path = "/root/wardriver" -main.plugins.wardriver.ui.enabled = true -main.plugins.wardriver.ui.icon = false -main.plugins.wardriver.ui.icon_reverse = false -main.plugins.wardriver.ui.position.x = 7 -main.plugins.wardriver.ui.position.y = 85 -main.plugins.wardriver.wigle.enabled = true -main.plugins.wardriver.wigle.api_key = "" -main.plugins.wardriver.wigle.donate = false -main.plugins.wardriver.whitelist = [ - "network-1", - "network-2" -] - ain.plugins.wigle.enabled = false main.plugins.wigle.api_key = "" # mandatory main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory From e1bcea681ad2a2384482de95485561a499b5fd1a Mon Sep 17 00:00:00 2001 From: Jeroen Oudshoorn Date: Tue, 11 Feb 2025 10:33:23 +0100 Subject: [PATCH 20/33] Next version will be 2.9.5.4 Signed-off-by: Jeroen Oudshoorn --- pwnagotchi/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pwnagotchi/_version.py b/pwnagotchi/_version.py index 2f7fd932..5fb05872 100644 --- a/pwnagotchi/_version.py +++ b/pwnagotchi/_version.py @@ -1 +1 @@ -__version__ = '2.9.5.3' +__version__ = '2.9.5.4' From 20ddc9425cea082a54b90ac2e1a41c50bfd4d004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=B2=E5=B1=8E=E5=B0=86=E5=86=9B?= <37292630+wlmh110@users.noreply.github.com> Date: Wed, 12 Feb 2025 11:32:14 +0800 Subject: [PATCH 21/33] Update defaults.toml The charging protection feature is disabled by default. --- pwnagotchi/defaults.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 3882e75e..3d9051ca 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -67,7 +67,7 @@ main.plugins.pisugarx.rotation = false main.plugins.pisugarx.default_display = "percentage" main.plugins.pisugarx.lowpower_shutdown = true main.plugins.pisugarx.lowpower_shutdown_level = 10 # battery percent at which the device will turn off -main.plugins.pisugarx.max_charge_voltage_protection = true #It will limit the battery voltage to about 80% to extend battery life +main.plugins.pisugarx.max_charge_voltage_protection = false #It will limit the battery voltage to about 80% to extend battery life main.plugins.session-stats.enabled = false main.plugins.session-stats.save_directory = "/var/tmp/pwnagotchi/sessions/" From c66728f03e198ec3b3e8533c6b72b5cd7151e5a7 Mon Sep 17 00:00:00 2001 From: Jeroen Oudshoorn Date: Wed, 12 Feb 2025 10:42:13 +0100 Subject: [PATCH 22/33] Edits for defaults.toml Signed-off-by: Jeroen Oudshoorn --- pwnagotchi/defaults.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 64d53f67..532e9d6a 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -84,12 +84,12 @@ main.plugins.webcfg.enabled = true main.plugins.webgpsmap.enabled = false -ain.plugins.wigle.enabled = false +main.plugins.wigle.enabled = false main.plugins.wigle.api_key = "" # mandatory main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory main.plugins.wigle.donate = false # default: off main.plugins.wigle.timeout = 30 # default: 30 -main.plugins.wigle.position = (7, 85) # optionnal +main.plugins.wigle.position = [7, 85] # optionnal main.plugins.wpa-sec.enabled = false main.plugins.wpa-sec.api_key = "" From f4a7588a48b50712296e804de0cc2bf45bc90e1b Mon Sep 17 00:00:00 2001 From: Jeroen Oudshoorn Date: Wed, 12 Feb 2025 12:59:28 +0100 Subject: [PATCH 23/33] Update for wigle Signed-off-by: Jeroen Oudshoorn --- pwnagotchi/plugins/default/wigle.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 5f872dc5..e181d950 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -158,14 +158,16 @@ class Wigle(plugins.Plugin): content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\" ) for gps_data, pcap_data in data: # write WIFIs + try: + timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") writer.writerow( [ pcap_data[WifiInfo.BSSID], pcap_data[WifiInfo.ESSID], f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]", - datetime.strptime( - gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" - ).strftime("%Y-%m-%d %H:%M:%S"), + timestamp, pcap_data[WifiInfo.CHANNEL], pcap_data[WifiInfo.FREQUENCY], pcap_data[WifiInfo.RSSI], From e1f22cd6a04fcf0b4f0e09489ab4ce9f857ded95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:02:22 +0100 Subject: [PATCH 24/33] modified: .gitignore new file: pwnagotchi/plugins/default/cache.py modified: pwnagotchi/plugins/default/wigle.py - Add a cache plugin - Add statistics to wigle --- .gitignore | 3 +- pwnagotchi/plugins/default/cache.py | 53 ++++++++++ pwnagotchi/plugins/default/wigle.py | 153 +++++++++++++++++++++------- 3 files changed, 173 insertions(+), 36 deletions(-) create mode 100644 pwnagotchi/plugins/default/cache.py diff --git a/.gitignore b/.gitignore index 7e99e367..2a45ed72 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -*.pyc \ No newline at end of file +*.pyc +.vscode diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py new file mode 100644 index 00000000..92feff12 --- /dev/null +++ b/pwnagotchi/plugins/default/cache.py @@ -0,0 +1,53 @@ +import logging +import json +import os +import re + +import pwnagotchi.plugins as plugins +from pwnagotchi.ui.components import LabeledValue +from pwnagotchi.ui.view import BLACK +import pwnagotchi.ui.fonts as fonts + + +class Cache(plugins.Plugin): + __author__ = "fmatray" + __version__ = "1.0.0" + __license__ = "GPL3" + __description__ = "A simple plugin to cache AP informations" + + def __init__(self): + self.options = dict() + + def on_config_changed(self, config): + self.handshake_dir = config["bettercap"].get("handshakes") + self.cache_dir = os.path.join(self.handshake_dir, "cache") + if not (os.path.exists(self.cache_dir)): + os.mkdir(self.cache_dir) + + def get_cache(self, file): + cache_filename = os.path.basename( + re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file) + ) + cache_filename = os.path.join(self.cache_dir, cache_filename) + if not os.path.exists(cache_filename): + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + return None + + def cache_ap(self, ap): + mac = ap["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) + filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") + with open(filename, "w") as f: + json.dump(ap, f) + + def on_unfiltered_ap_list(self, agent, aps): + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.cache_ap(ap) + + def on_handshake(self, agent, filename, access_point, client_station): + logging.info(f"[WIGLE] on_handshake") + self.cache_ap(access_point) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 5f872dc5..2d9cb7a7 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -9,6 +9,7 @@ from glob import glob from threading import Lock from io import StringIO from datetime import datetime, UTC +from dataclasses import dataclass from flask import make_response, redirect from pwnagotchi.utils import ( @@ -28,6 +29,40 @@ from pwnagotchi.ui.view import BLACK from scapy.all import Scapy_Exception +@dataclass +class WigleStatistics: + ready: bool = False + username: str = None + rank: int = None + monthrank: int = None + discoveredwiFi: int = None + last: str = None + groupID: str = None + groupname: str = None + grouprank: int = None + + def update_user(self, json_res): + self.ready = True + self.username = json_res["user"] + self.rank = json_res["rank"] + self.monthrank = json_res["monthRank"] + self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"] + last = json_res["statistics"]["last"] + self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" + + def update_user_group(self, json_res): + self.groupID = json_res["groupId"] + self.groupname = json_res["groupName"] + + def update_group(self, json_res): + rank = 1 + for group in json_res["groups"]: + if group['groupId'] == self.groupID: + self.grouprank = rank + rank += 1 + + + class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" @@ -41,14 +76,7 @@ class Wigle(plugins.Plugin): self.skip = list() self.lock = Lock() self.options = dict() - self.statistics = dict( - ready=False, - username=None, - rank=None, - monthrank=None, - discoveredwiFi=None, - last=None, - ) + self.statistics = WigleStatistics() self.last_stat = datetime.now(tz=UTC) self.ui_counter = 0 @@ -61,6 +89,9 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") + self.cache_dir = os.path.join(self.handshake_dir, "cache") + if not (os.path.exists(self.cache_dir)): + os.mkdir(self.cache_dir) self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -86,6 +117,17 @@ class Wigle(plugins.Plugin): return None return pcap_filename + def get_cache(self, pcap_file): + cache_filename = os.path.basename(pcap_file.replace(".pcap", ".cache")) + cache_filename = os.path.join(self.cache_dir, cache_filename) + if not os.path.exists(cache_filename): + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + return None + @staticmethod def extract_gps_data(path): """ @@ -120,8 +162,17 @@ class Wigle(plugins.Plugin): return None return gps_data - @staticmethod - def get_pcap_data(pcap_filename): + def get_pcap_data(self, pcap_filename): + if cache := self.get_cache(pcap_filename): + logging.info(f"[WIGLE] Using cache for {pcap_filename}") + return { + WifiInfo.BSSID: cache["mac"], + WifiInfo.ESSID: cache["hostname"], + WifiInfo.ENCRYPTION: cache["encryption"], + WifiInfo.CHANNEL: cache["channel"], + WifiInfo.FREQUENCY: cache["frequency"], + WifiInfo.RSSI: cache["rssi"], + } try: pcap_data = extract_from_pcap( pcap_filename, @@ -236,31 +287,46 @@ class Wigle(plugins.Plugin): self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) display.on_normal() - def get_statistics(self, force=False): - if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30: - return - self.last_stat = datetime.now(tz=UTC) + def request_statistics(self, url): try: - self.statistics["ready"] = False - json_res = requests.get( - "https://api.wigle.net/api/v2/stats/user", + return requests.get( + url, headers={ "Authorization": f"Basic {self.api_key}", "Accept": "application/json", }, timeout=self.timeout, ).json() - if not json_res["success"]: - return - self.statistics["ready"] = True - self.statistics["username"] = json_res["user"] - self.statistics["rank"] = json_res["rank"] - self.statistics["monthrank"] = json_res["monthRank"] - self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"] - last = json_res["statistics"]["last"] - self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}" except (requests.exceptions.RequestException, OSError) as exp: - pass + return None + + def get_user_statistics(self): + json_res = self.request_statistics( + "https://api.wigle.net/api/v2/stats/user", + ) + if json_res and json_res["success"]: + self.statistics.update_user(json_res) + + def get_usergroup_statistics(self): + if not self.statistics.username or self.statistics.groupID: + return + url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}" + if json_res := self.request_statistics(url): + self.statistics.update_user_group(json_res) + + def get_group_statistics(self): + if not self.statistics.groupID: + return + json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group") + if json_res and json_res["success"]: + self.statistics.update_group(json_res) + + def get_statistics(self, force=False): + if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30: + self.last_stat = datetime.now(tz=UTC) + self.get_user_statistics() + self.get_usergroup_statistics() + self.get_group_statistics() def on_internet_available(self, agent): if not self.ready: @@ -272,6 +338,21 @@ class Wigle(plugins.Plugin): else: self.get_statistics() + def cache_ap(self, ap): + mac = ap["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) + filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") + with open(filename, "w") as f: + json.dump(ap, f) + + def on_unfiltered_ap_list(self, agent, aps): + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.cache_ap(ap) + + def on_handshake(self, agent, filename, access_point, client_station): + logging.info(f"[WIGLE] on_handshake") + self.cache_ap(access_point) + def on_ui_setup(self, ui): with ui._lock: ui.add_element( @@ -284,20 +365,22 @@ class Wigle(plugins.Plugin): ui.remove_element("wigle") def on_ui_update(self, ui): - if not self.ready: - return with ui._lock: - if not self.statistics["ready"]: + if not (self.ready and self.statistics.ready): ui.set("wigle", "We Will Wait Wigle") return msg = "-" - self.ui_counter = (self.ui_counter + 1) % 4 + self.ui_counter = (self.ui_counter + 1) % 6 if self.ui_counter == 0: - msg = f"User:{self.statistics['username']}" + msg = f"User:{self.statistics.username}" if self.ui_counter == 1: - msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}" + msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}" elif self.ui_counter == 2: - msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" + msg = f"{self.statistics.discoveredwiFi} discovered WiFis" elif self.ui_counter == 3: - msg = f"Last upl.:{self.statistics['last']}" + msg = f"Last upl.:{self.statistics.last}" + elif self.ui_counter == 4: + msg = f"Grp:{self.statistics.groupname}" + elif self.ui_counter == 5: + msg = f"Grp rank:{self.statistics.grouprank}" ui.set("wigle", msg) From 9d0df49fb6e0d79082c3a4485f7595ef3338f8ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:04:14 +0100 Subject: [PATCH 25/33] add cache.py --- pwnagotchi/plugins/default/cache.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 92feff12..0ab0368d 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -8,6 +8,8 @@ from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts +def get_cache(): + return None class Cache(plugins.Plugin): __author__ = "fmatray" From 5d668ae34ef0ee94af5f85118bd9eae936ed32d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 13:04:37 +0100 Subject: [PATCH 26/33] update wigle.py --- pwnagotchi/plugins/default/wigle.py | 32 +---------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 2d9cb7a7..ec120a67 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -57,12 +57,11 @@ class WigleStatistics: def update_group(self, json_res): rank = 1 for group in json_res["groups"]: - if group['groupId'] == self.groupID: + if group["groupId"] == self.groupID: self.grouprank = rank rank += 1 - class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" __version__ = "4.0.0" @@ -89,9 +88,6 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") - self.cache_dir = os.path.join(self.handshake_dir, "cache") - if not (os.path.exists(self.cache_dir)): - os.mkdir(self.cache_dir) self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -117,17 +113,6 @@ class Wigle(plugins.Plugin): return None return pcap_filename - def get_cache(self, pcap_file): - cache_filename = os.path.basename(pcap_file.replace(".pcap", ".cache")) - cache_filename = os.path.join(self.cache_dir, cache_filename) - if not os.path.exists(cache_filename): - return None - try: - with open(cache_filename, "r") as f: - return json.load(f) - except Exception as e: - return None - @staticmethod def extract_gps_data(path): """ @@ -338,21 +323,6 @@ class Wigle(plugins.Plugin): else: self.get_statistics() - def cache_ap(self, ap): - mac = ap["mac"].replace(":", "") - hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) - filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") - with open(filename, "w") as f: - json.dump(ap, f) - - def on_unfiltered_ap_list(self, agent, aps): - for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): - self.cache_ap(ap) - - def on_handshake(self, agent, filename, access_point, client_station): - logging.info(f"[WIGLE] on_handshake") - self.cache_ap(access_point) - def on_ui_setup(self, ui): with ui._lock: ui.add_element( From 1d635d955bd6c90e4660d54348eacd5056aaf41c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Wed, 12 Feb 2025 14:10:33 +0100 Subject: [PATCH 27/33] BT-Tether: add a check to DNS config --- pwnagotchi/plugins/default/bt-tether.py | 107 +++++++++++++++--------- 1 file changed, 67 insertions(+), 40 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 7d797c1f..ecd0ed06 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -123,6 +123,7 @@ MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$" DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$" + class BTTether(plugins.Plugin): __author__ = "Jayofelony, modified my fmatray" __version__ = "1.4" @@ -138,8 +139,7 @@ class BTTether(plugins.Plugin): @staticmethod def exec_cmd(cmd, args, pattern=None): try: - result = subprocess.run([cmd] + args, - check=True, capture_output=True, text=True) + result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True) if pattern: return result.stdout.find(pattern) return result @@ -152,7 +152,7 @@ class BTTether(plugins.Plugin): def nmcli(self, args, pattern=None): return self.exec_cmd("nmcli", args, pattern) - + def on_loaded(self): logging.info("[BT-Tether] plugin loaded.") @@ -162,7 +162,7 @@ class BTTether(plugins.Plugin): return if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])): logging.error("[BT-Tether] Error with mac address") - return + return if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]): logging.error("[BT-Tether] Phone type not supported") @@ -181,23 +181,30 @@ class BTTether(plugins.Plugin): self.mac = self.options["mac"] dns = self.options.get("dns", "8.8.8.8 1.1.1.1") if not re.match(DNS_PTTRN, dns): - logging.error(f"[BT-Tether] DNS error: {dns}") + if dns == "": + logging.error(f"[BT-Tether] Empty DNS setting") + else: + logging.error(f"[BT-Tether] Wrong DNS setting: '{dns}'") return - dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning + dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning try: # Configure connection. Metric is set to 200 to prefer connection over USB - self.nmcli(["connection", "modify", f"{self.phone_name}", - "connection.type", "bluetooth", - "bluetooth.type", "panu", - "bluetooth.bdaddr", f"{self.mac}", - "connection.autoconnect", "yes", - "connection.autoconnect-retries", "0", - "ipv4.method", "manual", - "ipv4.dns", f"{dns}", - "ipv4.addresses", f"{address}/24", - "ipv4.gateway", f"{gateway}", - "ipv4.route-metric", "200" ]) + self.nmcli( + [ + "connection", "modify", f"{self.phone_name}", + "connection.type", "bluetooth", + "bluetooth.type", "panu", + "bluetooth.bdaddr", f"{self.mac}", + "connection.autoconnect", "yes", + "connection.autoconnect-retries", "0", + "ipv4.method", "manual", + "ipv4.dns", f"{dns}", + "ipv4.addresses", f"{address}/24", + "ipv4.gateway", f"{gateway}", + "ipv4.route-metric", "200", + ] + ) self.nmcli(["connection", "reload"]) self.ready = True logging.info(f"[BT-Tether] Connection {self.phone_name} configured") @@ -205,7 +212,7 @@ class BTTether(plugins.Plugin): logging.error(f"[BT-Tether] Error while configuring: {e}") return try: - time.sleep(5) # Give some delay to configure before going up + time.sleep(5) # Give some delay to configure before going up self.nmcli(["connection", "up", f"{self.phone_name}"]) except Exception as e: logging.error(f"[BT-Tether] Failed to connect to device: {e}") @@ -230,9 +237,18 @@ class BTTether(plugins.Plugin): def on_ui_setup(self, ui): with ui._lock: - ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', - position=(ui.width() / 2 - 10, 0), - label_font=fonts.Bold, text_font=fonts.Medium)) + ui.add_element( + "bluetooth", + LabeledValue( + color=BLACK, + label="BT", + value="-", + position=(ui.width() / 2 - 10, 0), + label_font=fonts.Bold, + text_font=fonts.Medium, + ), + ) + def on_ui_update(self, ui): if not self.ready: return @@ -240,17 +256,26 @@ class BTTether(plugins.Plugin): status = "" try: # Checking connection - if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name], - "activated") != -1: + if ( + self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name], + "activated", + ) + != -1 + ): ui.set("bluetooth", "U") return else: ui.set("bluetooth", "D") status = "BT Conn. down" - + # Checking device - if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac], - "(connected)") != -1: + if ( + self.nmcli( + ["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac], + "(connected)", + ) + != -1 + ): ui.set("bluetooth", "C") status += "\nBT dev conn." else: @@ -269,26 +294,28 @@ class BTTether(plugins.Plugin): if path == "/" or not path: try: bluetooth = self.bluetoothctl(["info", self.mac]) - bluetooth = bluetooth.stdout.replace('\n', '
') + bluetooth = bluetooth.stdout.replace("\n", "
") except Exception as e: bluetooth = "Error while checking bluetoothctl" - - try: - device = self.nmcli(["-w", "0","device", "show", self.mac]) - device = device.stdout.replace('\n', '
') + + try: + device = self.nmcli(["-w", "0", "device", "show", self.mac]) + device = device.stdout.replace("\n", "
") except Exception as e: device = "Error while checking nmcli device" - + try: - connection = self.nmcli(["-w", "0","connection", "show", self.phone_name]) - connection = connection.stdout.replace('\n', '
') + connection = self.nmcli(["-w", "0", "connection", "show", self.phone_name]) + connection = connection.stdout.replace("\n", "
") except Exception as e: connection = "Error while checking nmcli connection" logging.debug(device) - return render_template_string(TEMPLATE, - title="BT-Tether", - bluetooth=bluetooth, - device=device, - connection=connection) - abort(404) \ No newline at end of file + return render_template_string( + TEMPLATE, + title="BT-Tether", + bluetooth=bluetooth, + device=device, + connection=connection, + ) + abort(404) From 29d1ca6728f0f9fb6cb4b54aba2e185249587ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Thu, 13 Feb 2025 22:17:09 +0100 Subject: [PATCH 28/33] Bt-tether: add autoconnect to device Cache: - add cache to all hools with access point - add cleaning Wigle: use cache --- pwnagotchi/plugins/default/bt-tether.py | 11 ++- pwnagotchi/plugins/default/cache.py | 125 ++++++++++++++++++------ pwnagotchi/plugins/default/wigle.py | 47 +++++---- 3 files changed, 133 insertions(+), 50 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index ecd0ed06..afced4d3 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -144,7 +144,10 @@ class BTTether(plugins.Plugin): return result.stdout.find(pattern) return result except Exception as exp: - logging.error(f"[BT-Tether] Error with {cmd} : {exp}") + logging.error(f"[BT-Tether] Error with {cmd}") + logging.error(f"[BT-Tether] Exception : {exp}") + logging.error(f"[BT-Tether] STDOUT : {result.stdout}") + logging.error(f"[BT-Tether] STDERR : {result.stderr}") raise exp def bluetoothctl(self, args, pattern=None): @@ -205,6 +208,12 @@ class BTTether(plugins.Plugin): "ipv4.route-metric", "200", ] ) + # Configure Device to autoconnect + self.nmcli([ + "device", "set", f"{self.mac}", + "autoconnect", "yes", + "managed", "yes" + ]) self.nmcli(["connection", "reload"]) self.ready = True logging.info(f"[BT-Tether] Connection {self.phone_name} configured") diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 0ab0368d..a94bc3dc 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -2,14 +2,25 @@ import logging import json import os import re - +import pathlib import pwnagotchi.plugins as plugins -from pwnagotchi.ui.components import LabeledValue -from pwnagotchi.ui.view import BLACK -import pwnagotchi.ui.fonts as fonts +from datetime import datetime, UTC +from threading import Lock + + +def read_ap_cache(cache_dir, file): + cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file)) + cache_filename = os.path.join(cache_dir, cache_filename) + if not os.path.exists(cache_filename): + logging.info("Cache not exist") + return None + try: + with open(cache_filename, "r") as f: + return json.load(f) + except Exception as e: + logging.info(f"Exception {e}") + return None -def get_cache(): - return None class Cache(plugins.Plugin): __author__ = "fmatray" @@ -19,37 +30,87 @@ class Cache(plugins.Plugin): def __init__(self): self.options = dict() + self.ready = False + self.lock = Lock() + + def on_loaded(self): + logging.info("[CACHE] plugin loaded.") def on_config_changed(self, config): - self.handshake_dir = config["bettercap"].get("handshakes") - self.cache_dir = os.path.join(self.handshake_dir, "cache") - if not (os.path.exists(self.cache_dir)): - os.mkdir(self.cache_dir) - - def get_cache(self, file): - cache_filename = os.path.basename( - re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file) - ) - cache_filename = os.path.join(self.cache_dir, cache_filename) - if not os.path.exists(cache_filename): - return None try: - with open(cache_filename, "r") as f: - return json.load(f) - except Exception as e: - return None + handshake_dir = config["bettercap"].get("handshakes") + self.cache_dir = os.path.join(handshake_dir, "cache") + os.makedirs(self.cache_dir, exist_ok=True) + except Exception: + logging.info(f"[CACHE] Cannot access to the cache directory") + return + self.last_clean = datetime.now(tz=UTC) + self.ready = True + logging.info(f"[CACHE] Cache plugin configured") + self.clean_ap_cache() - def cache_ap(self, ap): - mac = ap["mac"].replace(":", "") - hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"]) - filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache") - with open(filename, "w") as f: - json.dump(ap, f) + def on_unload(self, ui): + self.clean_ap_cache() + + def clean_ap_cache(self): + if not self.ready: + return + with self.lock: + ctime = datetime.now(tz=UTC) + cache_to_delete = list() + for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"): + mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) + if (ctime - mtime).total_seconds() > 60 * 5: + cache_to_delete.append(cache_file) + if cache_to_delete: + logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files") + for cache_file in cache_to_delete: + try: + cache_file.unlink() + except Exception as e: + logging.error(f"[CACHE] Cannot delete {cache_file}: {e}") + + def write_ap_cache(self, access_point): + with self.lock: + try: + mac = access_point["mac"].replace(":", "") + hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"]) + except KeyError: + return + cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache") + try: + with open(cache_file, "w") as f: + json.dump(access_point, f) + except Exception as e: + logging.error(f"[CACHE] Cannot write {cache_file}: {e}") + pass + + def on_wifi_update(self, agent, access_points): + if self.ready: + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], access_points): + self.write_ap_cache(ap) def on_unfiltered_ap_list(self, agent, aps): - for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): - self.cache_ap(ap) + if self.ready: + for ap in filter(lambda ap: ap["hostname"] not in ["", ""], aps): + self.write_ap_cache(ap) + + def on_association(self, agent, access_point): + if self.ready: + self.write_ap_cache(access_point) + + def on_deauthentication(self, agent, access_point, client_station): + if self.ready: + self.write_ap_cache(access_point) def on_handshake(self, agent, filename, access_point, client_station): - logging.info(f"[WIGLE] on_handshake") - self.cache_ap(access_point) + if self.ready: + self.write_ap_cache(access_point) + + def on_ui_update(self, ui): + if not self.ready: + return + current_time = datetime.now(tz=UTC) + if (current_time - self.last_clean).total_seconds() > 60: + self.clean_ap_cache() + self.last_clean = current_time diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 2b5a367c..5000629d 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -20,6 +20,7 @@ from pwnagotchi.utils import ( remove_whitelisted, ) from pwnagotchi import plugins +from pwnagotchi.plugins.default.cache import read_ap_cache from pwnagotchi._version import __version__ as __pwnagotchi_version__ import pwnagotchi.ui.fonts as fonts @@ -64,7 +65,7 @@ class WigleStatistics: class Wigle(plugins.Plugin): __author__ = "Dadav and updated by Jayofelony and fmatray" - __version__ = "4.0.0" + __version__ = "4.1.0" __license__ = "GPL3" __description__ = "This plugin automatically uploads collected WiFi to wigle.net" LABEL_SPACING = 0 @@ -79,6 +80,9 @@ class Wigle(plugins.Plugin): self.last_stat = datetime.now(tz=UTC) self.ui_counter = 0 + def on_loaded(self): + logging.info("[WIGLE] plugin loaded.") + def on_config_changed(self, config): self.api_key = self.options.get("api_key", None) if not self.api_key: @@ -88,6 +92,7 @@ class Wigle(plugins.Plugin): self.handshake_dir = config["bettercap"].get("handshakes") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") self.report = StatusFile(report_filename, data_format="json") + self.cache_dir = os.path.join(self.handshake_dir, "cache") self.cvs_dir = self.options.get("cvs_dir", None) self.whitelist = config["main"].get("whitelist", []) self.timeout = self.options.get("timeout", 30) @@ -148,16 +153,19 @@ class Wigle(plugins.Plugin): return gps_data def get_pcap_data(self, pcap_filename): - if cache := self.get_cache(pcap_filename): - logging.info(f"[WIGLE] Using cache for {pcap_filename}") - return { - WifiInfo.BSSID: cache["mac"], - WifiInfo.ESSID: cache["hostname"], - WifiInfo.ENCRYPTION: cache["encryption"], - WifiInfo.CHANNEL: cache["channel"], - WifiInfo.FREQUENCY: cache["frequency"], - WifiInfo.RSSI: cache["rssi"], - } + try: + if cache := read_ap_cache(self.cache_dir, self.pcap_filename): + logging.info(f"[WIGLE] Using cache for {pcap_filename}") + return { + WifiInfo.BSSID: cache["mac"], + WifiInfo.ESSID: cache["hostname"], + WifiInfo.ENCRYPTION: cache["encryption"], + WifiInfo.CHANNEL: cache["channel"], + WifiInfo.FREQUENCY: cache["frequency"], + WifiInfo.RSSI: cache["rssi"], + } + except (AttributeError, KeyError): + pass try: pcap_data = extract_from_pcap( pcap_filename, @@ -190,14 +198,16 @@ class Wigle(plugins.Plugin): f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n" ) - writer = csv.writer( - content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\" - ) + writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") for gps_data, pcap_data in data: # write WIFIs try: - timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + timestamp = datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") except ValueError: - timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") + timestamp = datetime.strptime( + gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") writer.writerow( [ pcap_data[WifiInfo.BSSID], @@ -334,7 +344,10 @@ class Wigle(plugins.Plugin): def on_unload(self, ui): with ui._lock: - ui.remove_element("wigle") + try: + ui.remove_element("wigle") + except KeyError: + pass def on_ui_update(self, ui): with ui._lock: From 2dc45bc4b49a63df0144ee13d5802e6a1c2f8708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Sun, 16 Feb 2025 20:07:22 +0100 Subject: [PATCH 29/33] - Add more sentences to voice.py, with some geek references --- pwnagotchi/voice.py | 69 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/pwnagotchi/voice.py b/pwnagotchi/voice.py index fad1eea8..38dbaefd 100644 --- a/pwnagotchi/voice.py +++ b/pwnagotchi/voice.py @@ -27,11 +27,19 @@ class Voice: self._('Hack the Planet!'), self._('No more mister Wi-Fi!!'), self._('Pretty fly 4 a Wi-Fi!'), + self._('Good Pwning!'), # Battlestar Galactica + self._('Ensign, Engage!'), # Star trek + self._('Free your Wi-Fi!'), # Matrix + self._('Chevron Seven, locked.'), # Stargate + self._('May the Wi-fi be with you'), # Star wars ]) def on_keys_generation(self): return random.choice([ - self._('Generating keys, do not turn off ...')]) + self._('Generating keys, do not turn off ...'), + self._('Are you the keymaster?'), # Ghostbusters + self._('I am the keymaster!'), # Ghostbusters + ]) def on_normal(self): return random.choice([ @@ -44,8 +52,7 @@ class Voice: def on_reading_logs(self, lines_so_far=0): if lines_so_far == 0: return self._('Reading last session logs ...') - else: - return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far) + return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far) def on_bored(self): return random.choice([ @@ -53,7 +60,11 @@ class Voice: self._('Let\'s go for a walk!')]) def on_motivated(self, reward): - return self._('This is the best day of my life!') + return random.choice([ + self._('This is the best day of my life!'), + self._('All your base are belong to us'), + self._('Fascinating!'), # Star trek + ]) def on_demotivated(self, reward): return self._('Shitty day :/') @@ -63,6 +74,8 @@ class Voice: self._('I\'m extremely bored ...'), self._('I\'m very sad ...'), self._('I\'m sad'), + self._('I\'m so happy ...'), # Marvin in H2G2 + self._('Life? Don\'t talk to me about life.'), # Also Marvin in H2G2 '...']) def on_angry(self): @@ -78,17 +91,17 @@ class Voice: self._('I pwn therefore I am.'), self._('So many networks!!!'), self._('I\'m having so much fun!'), + self._('It\'s a Wi-Fi system! I know this!'), # Jurassic park self._('My crime is that of curiosity ...')]) def on_new_peer(self, peer): if peer.first_encounter(): return random.choice([ self._('Hello {name}! Nice to meet you.').format(name=peer.name())]) - else: - return random.choice([ - self._('Yo {name}! Sup?').format(name=peer.name()), - self._('Hey {name} how are you doing?').format(name=peer.name()), - self._('Unit {name} is nearby!').format(name=peer.name())]) + return random.choice([ + self._('Yo {name}! Sup?').format(name=peer.name()), + self._('Hey {name} how are you doing?').format(name=peer.name()), + self._('Unit {name} is nearby!').format(name=peer.name())]) def on_lost_peer(self, peer): return random.choice([ @@ -104,19 +117,23 @@ class Voice: def on_grateful(self): return random.choice([ self._('Good friends are a blessing!'), - self._('I love my friends!')]) + self._('I love my friends!') + ]) def on_lonely(self): return random.choice([ self._('Nobody wants to play with me ...'), self._('I feel so alone ...'), + self._('Let\'s find friends'), self._('Where\'s everybody?!')]) def on_napping(self, secs): return random.choice([ self._('Napping for {secs}s ...').format(secs=secs), self._('Zzzzz'), - self._('ZzzZzzz ({secs}s)').format(secs=secs)]) + self._('Snoring ...'), + self._('ZzzZzzz ({secs}s)').format(secs=secs), + ]) def on_shutdown(self): return random.choice([ @@ -124,12 +141,17 @@ class Voice: self._('Zzz')]) def on_awakening(self): - return random.choice(['...', '!']) + return random.choice([ + '...', + '!', + 'Hello World!', + self._('I dreamed of electric sheep'), + ]) def on_waiting(self, secs): return random.choice([ - self._('Waiting for {secs}s ...').format(secs=secs), '...', + self._('Waiting for {secs}s ...').format(secs=secs), self._('Looking around ({secs}s)').format(secs=secs)]) def on_assoc(self, ap): @@ -138,12 +160,16 @@ class Voice: return random.choice([ self._('Hey {what} let\'s be friends!').format(what=what), self._('Associating to {what}').format(what=what), - self._('Yo {what}!').format(what=what)]) + self._('Yo {what}!').format(what=what), + self._('Rise and Shine Mr. {what}!').format(what=what), # Half Life + ]) def on_deauth(self, sta): return random.choice([ - self._('Just decided that {mac} needs no WiFi!').format(mac=sta['mac']), + self._('Just decided that {mac} needs no Wi-Fi!').format(mac=sta['mac']), self._('Deauthenticating {mac}').format(mac=sta['mac']), + self._('No more Wi-Fi for {mac}').format(mac=sta['mac']), + self._('It\'s a trap! {mac}').format(mac=sta['mac']), # Star wars self._('Kickbanning {mac}!').format(mac=sta['mac'])]) def on_handshakes(self, new_shakes): @@ -155,10 +181,19 @@ class Voice: return self._('You have {count} new message{plural}!').format(count=count, plural=s) def on_rebooting(self): - return self._("Oops, something went wrong ... Rebooting ...") + return random.choice([ + self._("Oops, something went wrong ... Rebooting ..."), + self._("Have you tried turning it off and on again?"), # The IT crew + self._("I\'m afraid Dave"), # 2001 Space Odyssey + self._("I\'m dead, Jim!"), # Star Trek + self._("I have a bad feeling about this"), # Star wars + ]) def on_uploading(self, to): - return self._("Uploading data to {to} ...").format(to=to) + return random.choice([ + self._("Uploading data to {to} ...").format(to=to), + self._("Beam me up to {to}").format(to=to), + ]) def on_downloading(self, name): return self._("Downloading from {name} ...").format(name=name) From d7f7dac0d743c205b999edc42c9d72aa84de8a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Sun, 16 Feb 2025 20:13:01 +0100 Subject: [PATCH 30/33] - Catch an exception on lstat() --- pwnagotchi/plugins/default/cache.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index a94bc3dc..64c8f566 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -59,9 +59,12 @@ class Cache(plugins.Plugin): ctime = datetime.now(tz=UTC) cache_to_delete = list() for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"): - mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) - if (ctime - mtime).total_seconds() > 60 * 5: - cache_to_delete.append(cache_file) + try: + mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC) + if (ctime - mtime).total_seconds() > 60 * 5: + cache_to_delete.append(cache_file) + except FileNotFoundError: + pass if cache_to_delete: logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files") for cache_file in cache_to_delete: From 7dd809afe01acfd5e4bd1c57d0d75f4eff618d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Sun, 16 Feb 2025 20:29:06 +0100 Subject: [PATCH 31/33] add otion for cache --- pwnagotchi/defaults.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pwnagotchi/defaults.toml b/pwnagotchi/defaults.toml index 532e9d6a..8bdb5955 100644 --- a/pwnagotchi/defaults.toml +++ b/pwnagotchi/defaults.toml @@ -33,6 +33,8 @@ main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". main.plugins.fix_services.enabled = true +main.plugins.cache.enabled = true + main.plugins.gdrivesync.enabled = false main.plugins.gdrivesync.backupfiles = [''] main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups" From c433a6c2d54c56c63d38e7529a36321cc006f0c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Mon, 17 Feb 2025 00:49:46 +0100 Subject: [PATCH 32/33] remove debug messages --- pwnagotchi/plugins/default/bt-tether.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index afced4d3..dc12e71d 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -146,8 +146,6 @@ class BTTether(plugins.Plugin): except Exception as exp: logging.error(f"[BT-Tether] Error with {cmd}") logging.error(f"[BT-Tether] Exception : {exp}") - logging.error(f"[BT-Tether] STDOUT : {result.stdout}") - logging.error(f"[BT-Tether] STDERR : {result.stderr}") raise exp def bluetoothctl(self, args, pattern=None): From 393981e0ba27a0cd63aa66fb479292f2bccb23df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?fr=C3=A9d=C3=A9ric?= Date: Mon, 17 Feb 2025 11:01:16 +0100 Subject: [PATCH 33/33] Remove debgging mesasge in cache --- pwnagotchi/plugins/default/cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pwnagotchi/plugins/default/cache.py b/pwnagotchi/plugins/default/cache.py index 64c8f566..f3b826ed 100644 --- a/pwnagotchi/plugins/default/cache.py +++ b/pwnagotchi/plugins/default/cache.py @@ -70,8 +70,8 @@ class Cache(plugins.Plugin): for cache_file in cache_to_delete: try: cache_file.unlink() - except Exception as e: - logging.error(f"[CACHE] Cannot delete {cache_file}: {e}") + except FileNotFoundError as e: + pass def write_ap_cache(self, access_point): with self.lock: