Merge branch 'jayofelony:noai' into noai

This commit is contained in:
wpa-2
2025-02-24 15:57:12 +00:00
committed by GitHub
10 changed files with 1077 additions and 341 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.pyc *.pyc
.vscode

View File

@ -1 +1 @@
__version__ = '2.9.5.3' __version__ = '2.9.5.4'

View File

@ -12,7 +12,8 @@ main.custom_plugin_repos = [
"https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip", "https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip",
"https://github.com/NeonLightning/pwny/archive/master.zip", "https://github.com/NeonLightning/pwny/archive/master.zip",
"https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip", "https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip",
"https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip" "https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip",
"https://github.com/cyberartemio/wardriver-pwnagotchi-plugin/archive/main.zip",
] ]
main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/" main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/"
@ -27,14 +28,17 @@ main.plugins.bt-tether.enabled = false
main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone" main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
main.plugins.bt-tether.mac = "" main.plugins.bt-tether.mac = ""
main.plugins.bt-tether.phone = "" # android or ios main.plugins.bt-tether.phone = "" # android or ios
main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios main.plugins.bt-tether.ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios
main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-)
main.plugins.fix_services.enabled = true main.plugins.fix_services.enabled = true
main.plugins.cache.enabled = true
main.plugins.gdrivesync.enabled = false main.plugins.gdrivesync.enabled = false
main.plugins.gdrivesync.backupfiles = [''] main.plugins.gdrivesync.backupfiles = ['']
main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups" main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups"
main.plugin.gdrivesync.interval = 1 main.plugins.gdrivesync.interval = 1
main.plugins.gpio_buttons.enabled = false main.plugins.gpio_buttons.enabled = false
@ -65,6 +69,9 @@ main.plugins.pwndroid.display_altitude = false # show altitude on display
main.plugins.pisugarx.enabled = false main.plugins.pisugarx.enabled = false
main.plugins.pisugarx.rotation = false main.plugins.pisugarx.rotation = false
main.plugins.pisugarx.default_display = "percentage" main.plugins.pisugarx.default_display = "percentage"
main.plugins.pisugarx.lowpower_shutdown = true
main.plugins.pisugarx.lowpower_shutdown_level = 10 # battery percent at which the device will turn off
main.plugins.pisugarx.max_charge_voltage_protection = false #It will limit the battery voltage to about 80% to extend battery life
main.plugins.session-stats.enabled = false main.plugins.session-stats.enabled = false
main.plugins.session-stats.save_directory = "/var/tmp/pwnagotchi/sessions/" main.plugins.session-stats.save_directory = "/var/tmp/pwnagotchi/sessions/"
@ -83,8 +90,11 @@ main.plugins.webcfg.enabled = true
main.plugins.webgpsmap.enabled = false main.plugins.webgpsmap.enabled = false
main.plugins.wigle.enabled = false main.plugins.wigle.enabled = false
main.plugins.wigle.api_key = "" main.plugins.wigle.api_key = "" # mandatory
main.plugins.wigle.donate = false main.plugins.wigle.cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory
main.plugins.wigle.donate = false # default: off
main.plugins.wigle.timeout = 30 # default: 30
main.plugins.wigle.position = [7, 85] # optionnal
main.plugins.wpa-sec.enabled = false main.plugins.wpa-sec.enabled = false
main.plugins.wpa-sec.api_key = "" main.plugins.wpa-sec.api_key = ""
@ -176,10 +186,11 @@ bettercap.handshakes = "/home/pi/handshakes"
bettercap.silence = [ bettercap.silence = [
"ble.device.new", "ble.device.new",
"ble.device.lost", "ble.device.lost",
"ble.device.disconnected",
"ble.device.connected",
"ble.device.service.discovered", "ble.device.service.discovered",
"ble.device.characteristic.discovered", "ble.device.characteristic.discovered",
"ble.device.disconnected",
"ble.device.connected",
"ble.connection.timeout",
"wifi.client.new", "wifi.client.new",
"wifi.client.lost", "wifi.client.lost",
"wifi.client.probe", "wifi.client.probe",

View File

@ -1,93 +1,328 @@
import logging import logging
import subprocess import subprocess
import re
import time
from flask import abort, render_template_string
import pwnagotchi.plugins as plugins import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK from pwnagotchi.ui.view import BLACK
TEMPLATE = """
{% extends "base.html" %}
{% set active_page = "bt-tether" %}
{% block title %}
{{ title }}
{% endblock %}
{% block meta %}
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, user-scalable=0" />
{% endblock %}
{% block styles %}
{{ super() }}
<style>
#searchText {
width: 100%;
}
table {
table-layout: auto;
width: 100%;
}
table, th, td {
border: 1px solid;
border-collapse: collapse;
}
th, td {
padding: 15px;
text-align: left;
}
@media screen and (max-width:700px) {
table, tr, td {
padding:0;
border:1px solid;
}
table {
border:none;
}
tr:first-child, thead, th {
display:none;
border:none;
}
tr {
float: left;
width: 100%;
margin-bottom: 2em;
}
td {
float: left;
width: 100%;
padding:1em;
}
td::before {
content:attr(data-label);
word-wrap: break-word;
color: white;
border-right:2px solid;
width: 20%;
float:left;
padding:1em;
font-weight: bold;
margin:-1em 1em -1em -1em;
}
}
</style>
{% endblock %}
{% block script %}
var searchInput = document.getElementById("searchText");
searchInput.onkeyup = function() {
var filter, table, tr, td, i, txtValue;
filter = searchInput.value.toUpperCase();
table = document.getElementById("tableOptions");
if (table) {
tr = table.getElementsByTagName("tr");
for (i = 0; i < tr.length; i++) {
td = tr[i].getElementsByTagName("td")[0];
if (td) {
txtValue = td.textContent || td.innerText;
if (txtValue.toUpperCase().indexOf(filter) > -1) {
tr[i].style.display = "";
}else{
tr[i].style.display = "none";
}
}
}
}
}
{% endblock %}
{% block content %}
<input type="text" id="searchText" placeholder="Search for ..." title="Type in a filter">
<table id="tableOptions">
<tr>
<th>Item</th>
<th>Configuration</th>
</tr>
<tr>
<td data-label="bluetooth">Bluetooth</td>
<td>{{bluetooth|safe}}</td>
</tr>
<tr>
<td data-label="device">Device</td>
<td>{{device|safe}}</td>
</tr>
<tr>
<td data-label="connection">Connection</td>
<td>{{connection|safe}}</td>
</tr>
</table>
{% endblock %}
"""
# We all love crazy regex patterns
MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
class BTTether(plugins.Plugin): class BTTether(plugins.Plugin):
__author__ = 'Jayofelony' __author__ = "Jayofelony, modified my fmatray"
__version__ = '1.2' __version__ = "1.4"
__license__ = 'GPL3' __license__ = "GPL3"
__description__ = 'A new BT-Tether plugin' __description__ = "A new BT-Tether plugin"
def __init__(self): def __init__(self):
self.ready = False self.ready = False
self.options = dict() self.options = dict()
self.status = '-' self.phone_name = None
self.mac = None
@staticmethod
def exec_cmd(cmd, args, pattern=None):
try:
result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True)
if pattern:
return result.stdout.find(pattern)
return result
except Exception as exp:
logging.error(f"[BT-Tether] Error with {cmd}")
logging.error(f"[BT-Tether] Exception : {exp}")
raise exp
def bluetoothctl(self, args, pattern=None):
return self.exec_cmd("bluetoothctl", args, pattern)
def nmcli(self, args, pattern=None):
return self.exec_cmd("nmcli", args, pattern)
def on_loaded(self): def on_loaded(self):
logging.info("[BT-Tether] plugin loaded.") logging.info("[BT-Tether] plugin loaded.")
def on_config_changed(self, config): def on_config_changed(self, config):
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']): if "phone-name" not in self.options:
self.ready = False logging.error("[BT-Tether] Phone name not provided")
ip = self.options['ip'] return
mac = self.options['mac'] if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])):
phone_name = self.options['phone-name'] + ' Network' logging.error("[BT-Tether] Error with mac address")
if self.options['phone'].lower() == 'android': return
address = f'{ip}'
gateway = '192.168.44.1' if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]):
elif self.options['phone'].lower() == 'ios': logging.error("[BT-Tether] Phone type not supported")
address = f'{ip}' return
gateway = '172.20.10.1' if self.options["phone"].lower() == "android":
address = self.options.get("ip", "192.168.44.2")
gateway = "192.168.44.1"
elif self.options["phone"].lower() == "ios":
address = self.options.get("ip", "172.20.10.2")
gateway = "172.20.10.1"
if not re.match(IP_PTTRN, address):
logging.error(f"[BT-Tether] IP error: {address}")
return
self.phone_name = self.options["phone-name"] + " Network"
self.mac = self.options["mac"]
dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
if not re.match(DNS_PTTRN, dns):
if dns == "":
logging.error(f"[BT-Tether] Empty DNS setting")
else: else:
logging.error("[BT-Tether] Phone type not supported.") logging.error(f"[BT-Tether] Wrong DNS setting: '{dns}'")
return
dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
try:
# Configure connection. Metric is set to 200 to prefer connection over USB
self.nmcli(
[
"connection", "modify", f"{self.phone_name}",
"connection.type", "bluetooth",
"bluetooth.type", "panu",
"bluetooth.bdaddr", f"{self.mac}",
"connection.autoconnect", "yes",
"connection.autoconnect-retries", "0",
"ipv4.method", "manual",
"ipv4.dns", f"{dns}",
"ipv4.addresses", f"{address}/24",
"ipv4.gateway", f"{gateway}",
"ipv4.route-metric", "200",
]
)
# Configure Device to autoconnect
self.nmcli([
"device", "set", f"{self.mac}",
"autoconnect", "yes",
"managed", "yes"
])
self.nmcli(["connection", "reload"])
self.ready = True
logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
except Exception as e:
logging.error(f"[BT-Tether] Error while configuring: {e}")
return return
try: try:
subprocess.run([ time.sleep(5) # Give some delay to configure before going up
'nmcli', 'connection', 'modify', f'{phone_name}', self.nmcli(["connection", "up", f"{self.phone_name}"])
'connection.type', 'bluetooth',
'bluetooth.type', 'panu',
'bluetooth.bdaddr', f'{mac}',
'ipv4.method', 'manual',
'ipv4.dns', '8.8.8.8 1.1.1.1',
'ipv4.addresses', f'{address}/24',
'ipv4.gateway', f'{gateway}',
'ipv4.route-metric', '100'
], check=True)
subprocess.run(['nmcli', 'connection', 'reload'], check=True)
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
except Exception as e: except Exception as e:
logging.error(f"[BT-Tether] Failed to connect to device: {e}") logging.error(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?") logging.error(
self.ready = True f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?"
)
def on_ready(self, agent): def on_ready(self, agent):
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']): try:
self.ready = False logging.info(f"[BT-Tether] Disabling bettercap's BLE module")
self.ready = True agent.run("ble.recon off", verbose_errors=False)
except Exception as e:
logging.info(f"[BT-Tether] Bettercap BLE was already off.")
def on_unload(self, ui):
with ui._lock:
ui.remove_element("bluetooth")
try:
self.nmcli(["connection", "down", f"{self.phone_name}"])
except Exception as e:
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
def on_ui_setup(self, ui): def on_ui_setup(self, ui):
with ui._lock: with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', ui.add_element(
"bluetooth",
LabeledValue(
color=BLACK,
label="BT",
value="-",
position=(ui.width() / 2 - 10, 0), position=(ui.width() / 2 - 10, 0),
label_font=fonts.Bold, text_font=fonts.Medium)) label_font=fonts.Bold,
text_font=fonts.Medium,
),
)
def on_ui_update(self, ui): def on_ui_update(self, ui):
if self.ready: if not self.ready:
phone_name = self.options['phone-name'] + ' Network'
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
self.status = 'C'
else:
self.status = '-'
try:
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
except Exception as e:
logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
ui.set('bluetooth', self.status)
return return
def on_unload(self, ui):
phone_name = self.options['phone-name'] + ' Network'
with ui._lock: with ui._lock:
ui.remove_element('bluetooth') status = ""
try: try:
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1: # Checking connection
subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True) if (
logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}") self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
"activated",
)
!= -1
):
ui.set("bluetooth", "U")
return
else: else:
logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting") ui.set("bluetooth", "D")
status = "BT Conn. down"
# Checking device
if (
self.nmcli(
["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
"(connected)",
)
!= -1
):
ui.set("bluetooth", "C")
status += "\nBT dev conn."
else:
ui.set("bluetooth", "-")
status += "\nBT dev disconn."
ui.set("status", status)
except Exception as e: except Exception as e:
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}") logging.error(f"[BT-Tether] Error on update: {e}")
def on_webhook(self, path, request):
if not self.ready:
return """<html>
<head><title>BT-tether: Error</title></head>
<body><code>Plugin not ready</code></body>
</html>"""
if path == "/" or not path:
try:
bluetooth = self.bluetoothctl(["info", self.mac])
bluetooth = bluetooth.stdout.replace("\n", "<br>")
except Exception as e:
bluetooth = "Error while checking bluetoothctl"
try:
device = self.nmcli(["-w", "0", "device", "show", self.mac])
device = device.stdout.replace("\n", "<br>")
except Exception as e:
device = "Error while checking nmcli device"
try:
connection = self.nmcli(["-w", "0", "connection", "show", self.phone_name])
connection = connection.stdout.replace("\n", "<br>")
except Exception as e:
connection = "Error while checking nmcli connection"
logging.debug(device)
return render_template_string(
TEMPLATE,
title="BT-Tether",
bluetooth=bluetooth,
device=device,
connection=connection,
)
abort(404)

View File

@ -0,0 +1,119 @@
import logging
import json
import os
import re
import pathlib
import pwnagotchi.plugins as plugins
from datetime import datetime, UTC
from threading import Lock
def read_ap_cache(cache_dir, file):
cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file))
cache_filename = os.path.join(cache_dir, cache_filename)
if not os.path.exists(cache_filename):
logging.info("Cache not exist")
return None
try:
with open(cache_filename, "r") as f:
return json.load(f)
except Exception as e:
logging.info(f"Exception {e}")
return None
class Cache(plugins.Plugin):
__author__ = "fmatray"
__version__ = "1.0.0"
__license__ = "GPL3"
__description__ = "A simple plugin to cache AP informations"
def __init__(self):
self.options = dict()
self.ready = False
self.lock = Lock()
def on_loaded(self):
logging.info("[CACHE] plugin loaded.")
def on_config_changed(self, config):
try:
handshake_dir = config["bettercap"].get("handshakes")
self.cache_dir = os.path.join(handshake_dir, "cache")
os.makedirs(self.cache_dir, exist_ok=True)
except Exception:
logging.info(f"[CACHE] Cannot access to the cache directory")
return
self.last_clean = datetime.now(tz=UTC)
self.ready = True
logging.info(f"[CACHE] Cache plugin configured")
self.clean_ap_cache()
def on_unload(self, ui):
self.clean_ap_cache()
def clean_ap_cache(self):
if not self.ready:
return
with self.lock:
ctime = datetime.now(tz=UTC)
cache_to_delete = list()
for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"):
try:
mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC)
if (ctime - mtime).total_seconds() > 60 * 5:
cache_to_delete.append(cache_file)
except FileNotFoundError:
pass
if cache_to_delete:
logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files")
for cache_file in cache_to_delete:
try:
cache_file.unlink()
except FileNotFoundError as e:
pass
def write_ap_cache(self, access_point):
with self.lock:
try:
mac = access_point["mac"].replace(":", "")
hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"])
except KeyError:
return
cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache")
try:
with open(cache_file, "w") as f:
json.dump(access_point, f)
except Exception as e:
logging.error(f"[CACHE] Cannot write {cache_file}: {e}")
pass
def on_wifi_update(self, agent, access_points):
if self.ready:
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], access_points):
self.write_ap_cache(ap)
def on_unfiltered_ap_list(self, agent, aps):
if self.ready:
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], aps):
self.write_ap_cache(ap)
def on_association(self, agent, access_point):
if self.ready:
self.write_ap_cache(access_point)
def on_deauthentication(self, agent, access_point, client_station):
if self.ready:
self.write_ap_cache(access_point)
def on_handshake(self, agent, filename, access_point, client_station):
if self.ready:
self.write_ap_cache(access_point)
def on_ui_update(self, ui):
if not self.ready:
return
current_time = datetime.now(tz=UTC)
if (current_time - self.last_clean).total_seconds() > 60:
self.clean_ap_cache()
self.last_clean = current_time

View File

@ -75,20 +75,16 @@ class ohcapi(plugins.Plugin):
return return
# Check if the internet is still available by pinging Google # Check if the internet is still available by pinging Google
self.internet_active = False
try: try:
response = requests.get('https://www.google.com', timeout=5) response = requests.get('https://www.google.com', timeout=5)
except requests.ConnectionError:
self.internet_active = False
return
if response.status_code == 200: if response.status_code == 200:
self.internet_active = True self.internet_active = True
else: except requests.ConnectionError:
self.internet_active = False
return return
current_time = time.time() current_time = time.time()
if current_time - self.last_run >= self.options['sleep']: if self.internet_active and current_time - self.last_run >= self.options['sleep']:
self._run_tasks(agent) self._run_tasks(agent)
self.last_run = current_time self.last_run = current_time

View File

@ -29,7 +29,7 @@ curve1200 = [
(3.49, 3.2), (3.49, 3.2),
(3.1, 0.0), (3.1, 0.0),
] ]
curve1200_3= [ curve1200_3 = [
(4.2, 100.0), # 高电量阶段 (100%) (4.2, 100.0), # 高电量阶段 (100%)
(4.0, 80.0), # 中电量阶段 (80%) (4.0, 80.0), # 中电量阶段 (80%)
(3.7, 60.0), # 中电量阶段 (60%) (3.7, 60.0), # 中电量阶段 (60%)
@ -50,26 +50,42 @@ curve5000 = [
] ]
class PiSugarServer: class PiSugarServer:
def __init__(self): def __init__(self):
""" """
PiSugar initialization, if unable to connect to any version of PiSugar, return false PiSugar initialization, if unable to connect to any version of PiSugar, return false
""" """
self._bus = smbus.SMBus(1) self._bus = smbus.SMBus(1)
self.ready = False
self.modle = None self.modle = None
self.i2creg = [] self.i2creg = []
self.address = 0 self.address = 0
self.battery_voltage = 0 self.battery_voltage = 0.00
self.voltage_history = deque(maxlen=10) self.voltage_history = deque(maxlen=10)
self.battery_level = 0 self.battery_level = 0
self.battery_charging = 0 self.battery_charging = 0
self.temperature = 0 self.temperature = 0
self.power_plugged = False self.power_plugged = False
self.allow_charging = True self.allow_charging = True
self.lowpower_shutdown = False
self.lowpower_shutdown_level = 10
self.max_charge_voltage_protection = False
self.max_protection_level=80
# Start the device connection in a background thread
self.connection_thread = threading.Thread(
target=self._connect_device, daemon=True)
self.connection_thread.start()
def _connect_device(self):
"""
Attempt to connect to the PiSugar device in a background thread.
"""
while self.modle is None: while self.modle is None:
if self.check_device(PiSugar_addresses["PiSugar2"]) is not None: if self.check_device(PiSugar_addresses["PiSugar2"]) is not None:
self.address = PiSugar_addresses["PiSugar2"] self.address = PiSugar_addresses["PiSugar2"]
if self.check_device(PiSugar_addresses["PiSugar2"], 0Xc2) != 0: if self.check_device(PiSugar_addresses["PiSugar2"], 0xC2) != 0:
self.modle = "PiSugar2Plus" self.modle = "PiSugar2Plus"
else: else:
self.modle = "PiSugar2" self.modle = "PiSugar2"
@ -77,16 +93,20 @@ class PiSugarServer:
elif self.check_device(PiSugar_addresses["PiSugar3"]) is not None: elif self.check_device(PiSugar_addresses["PiSugar3"]) is not None:
self.modle = 'PiSugar3' self.modle = 'PiSugar3'
self.address = PiSugar_addresses["PiSugar3"] self.address = PiSugar_addresses["PiSugar3"]
self.device_init()
else: else:
self.modle = None self.modle = None
logging.error( logging.info(
"No PiSugar device was found. Please check if the PiSugar device is powered on.") "No PiSugar device was found. Please check if the PiSugar device is powered on."
)
time.sleep(5) time.sleep(5)
logging.info(f"{self.modle} is connected")
# self.update_value() # Once connected, start the timer
self.start_timer() self.start_timer()
while len(self.i2creg) < 256: while len(self.i2creg) < 256:
time.sleep(1) time.sleep(1)
self.ready = True
logging.info(f"{self.modle} is ready")
def start_timer(self): def start_timer(self):
@ -99,6 +119,9 @@ class PiSugarServer:
"""每三秒更新pisugar状态包括触发自动关机""" """每三秒更新pisugar状态包括触发自动关机"""
while True: while True:
try: try:
if( self.modle == 'PiSugar2') | (self.modle == 'PiSugar2Plus'):
self.set_battery_notallow_charging() #短暂关闭充电以获取准确电池电压
time.sleep(0.05)
self.i2creg = [] self.i2creg = []
for i in range(0, 256, 32): for i in range(0, 256, 32):
# 计算当前读取的起始寄存器地址 # 计算当前读取的起始寄存器地址
@ -121,6 +144,20 @@ class PiSugarServer:
ctr1 = self.i2creg[0x02] # 读取控制寄存器 1 ctr1 = self.i2creg[0x02] # 读取控制寄存器 1
self.power_plugged = (ctr1 & (1 << 7)) != 0 # 检查电源是否插入 self.power_plugged = (ctr1 & (1 << 7)) != 0 # 检查电源是否插入
self.allow_charging = (ctr1 & (1 << 6)) != 0 # 检查是否允许充电 self.allow_charging = (ctr1 & (1 << 6)) != 0 # 检查是否允许充电
if self.max_charge_voltage_protection:
self._bus.write_byte_data(
self.address, 0x0B, 0x29) # 关闭写保护
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) | 0b10000000)
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # 开启写保护
else:
self._bus.write_byte_data(
self.address, 0x0B, 0x29) # 关闭写保护
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) & 0b01111111)
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # 开启写保护
elif self.modle == 'PiSugar2': elif self.modle == 'PiSugar2':
high = self.i2creg[0xa3] high = self.i2creg[0xa3]
low = self.i2creg[0xa2] low = self.i2creg[0xa2]
@ -129,20 +166,60 @@ class PiSugarServer:
2600.0 + (((high & 0x1f) << 8) + low) * 0.26855) / 1000.0 2600.0 + (((high & 0x1f) << 8) + low) * 0.26855) / 1000.0
self.power_plugged = (self.i2creg[0x55] & 0b00010000) != 0 self.power_plugged = (self.i2creg[0x55] & 0b00010000) != 0
if self.max_charge_voltage_protection:
self.voltage_history.append(self.battery_voltage)
self.battery_level = self.convert_battery_voltage_to_level()
if (self.battery_level) > self.max_protection_level:
self.set_battery_notallow_charging()
else:
self.set_battery_allow_charging()
else:
self.set_battery_allow_charging()
elif self.modle == 'PiSugar2Plus': elif self.modle == 'PiSugar2Plus':
low = self.i2creg[0xd0] low = self.i2creg[0xd0]
high = self.i2creg[0xd1] high = self.i2creg[0xd1]
self.battery_voltage = ( self.battery_voltage = (
(((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000 (((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000
self.power_plugged = self.i2creg[0xdd] == 0x1f self.power_plugged = self.i2creg[0xdd] == 0x1f
if self.max_charge_voltage_protection:
self.voltage_history.append(self.battery_voltage)
self.battery_level = self.convert_battery_voltage_to_level()
if (self.battery_level) > self.max_protection_level:
self.set_battery_notallow_charging()
else:
self.set_battery_allow_charging()
else:
self.set_battery_allow_charging()
self.voltage_history.append(self.battery_voltage) self.voltage_history.append(self.battery_voltage)
self.battery_level=self.convert_battery_voltage_to_level() self.battery_level = self.convert_battery_voltage_to_level()
if self.lowpower_shutdown:
if self.battery_level < self.lowpower_shutdown_level:
logging.info("[PiSugarX] low power shutdown now.")
self.shutdown()
pwnagotchi.shutdown()
time.sleep(3) time.sleep(3)
except: except Exception as e:
logging.error(f"read error") logging.error(f"read error{e}")
time.sleep(3) time.sleep(3)
def shutdown(self):
# logging.info("[PiSugarX] PiSugar set shutdown .")
if self.modle == 'PiSugar3':
# 10秒后关闭电源
self._bus.write_byte_data(self.address, 0x0B, 0x29) # 关闭写保护
self._bus.write_byte_data(self.address, 0x09, 10)
self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data(
self.address, 0x02) & 0b11011111)
self._bus.write_byte_data(self.address, 0x0B, 0x00) # 开启写保护
logging.info("[PiSugarX] PiSugar shutdown in 10s.")
elif self.modle == 'PiSugar2':
pass
elif self.modle == 'PiSugar2Plus':
pass
def check_device(self, address, reg=0): def check_device(self, address, reg=0):
"""Check if a device is present at the specified address""" """Check if a device is present at the specified address"""
try: try:
@ -278,6 +355,58 @@ class PiSugarServer:
""" """
return self.allow_charging return self.allow_charging
def set_battery_allow_charging(self):
if self.modle == 'PiSugar3':
pass
elif self.modle == 'PiSugar2':
# 禁止 gpio2 输出
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) & 0b11111011)
# 开启充电
self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data(
self.address, 0x55) & 0b11111011)
# 开启 gpio2 输出
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) | 0b00000100)
elif self.modle == 'PiSugar2Plus':
# 禁止 gpio2 输出
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) & 0b11111011)
# 开启充电
self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data(
self.address, 0x58) & 0b11111011)
# 开启 gpio2 输出
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) | 0b00000100)
return
def set_battery_notallow_charging(self):
if self.modle == 'PiSugar3':
pass
elif self.modle == 'PiSugar2':
# 禁止 gpio2 输出
print(self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) & 0b11111011))
# 关闭充电
self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data(
self.address, 0x55) | 0b00000100)
# 开启 gpio2 输出
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) | 0b00000100)
elif self.modle == 'PiSugar2Plus':
# 禁止 gpio2 输出
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) & 0b11111011)
# 关闭充电
self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data(
self.address, 0x58) | 0b00000100)
# 开启 gpio2 输出
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) | 0b00000100)
return
def get_battery_charging_range(self): def get_battery_charging_range(self):
""" """
Get the battery charging range. Get the battery charging range.
@ -406,6 +535,8 @@ class PiSugarServer:
""" """
pass pass
class PiSugar(plugins.Plugin): class PiSugar(plugins.Plugin):
__author__ = "jayofelony" __author__ = "jayofelony"
__version__ = "1.2" __version__ = "1.2"
@ -418,14 +549,25 @@ class PiSugar(plugins.Plugin):
def __init__(self): def __init__(self):
self._agent = None self._agent = None
self.is_new_model = False
self.options = dict() self.options = dict()
"""
self.options = {
'enabled': True,
'rotation': False,
'default_display': 'percentage',
'lowpower_shutdown': True,
'lowpower_shutdown_level': 10,
'max_charge_voltage_protection': True
}
"""
self.ps = None self.ps = None
# logging.debug(f"[PiSugarX] {self.options}")
try: try:
self.ps = PiSugarServer() self.ps = PiSugarServer()
except Exception as e: except Exception as e:
# Log at debug to avoid clutter since it might be a false positive # Log at debug to avoid clutter since it might be a false positive
logging.debug("[PiSugarX] Unable to establish connection: %s", repr(e)) logging.debug(
"[PiSugarX] Unable to establish connection: %s", repr(e))
self.ready = False self.ready = False
self.lasttemp = 69 self.lasttemp = 69
@ -444,7 +586,8 @@ class PiSugar(plugins.Plugin):
try: try:
return func() return func()
except Exception as e: except Exception as e:
logging.debug("[PiSugarX] Failed to get data using %s: %s", func.__name__, e) logging.debug(
"[PiSugarX] Failed to get data using %s: %s", func.__name__, e)
return default return default
def on_loaded(self): def on_loaded(self):
@ -455,15 +598,25 @@ class PiSugar(plugins.Plugin):
valid_displays = ['voltage', 'percentage', 'temp'] valid_displays = ['voltage', 'percentage', 'temp']
if self.default_display not in valid_displays: if self.default_display not in valid_displays:
logging.warning(f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.") logging.warning(
f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.")
self.default_display = 'voltage' self.default_display = 'voltage'
logging.info(f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.") logging.info(
logging.info(f"[PiSugarX] Default display (when rotation disabled): {self.default_display}") f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.")
logging.info(
f"[PiSugarX] Default display (when rotation disabled): {self.default_display}")
self.ps.lowpower_shutdown = self.options['lowpower_shutdown']
self.ps.lowpower_shutdown_level = self.options['lowpower_shutdown_level']
self.ps.max_charge_voltage_protection = self.options['max_charge_voltage_protection']
def on_ready(self, agent): def on_ready(self, agent):
self.ready = True try:
self._agent = agent self.ready = self.ps.ready
except Exception as e:
# Log at debug to avoid clutter since it might be a false positive
logging.warning(f"[PiSugarX] {e}")
def on_internet_available(self, agent): def on_internet_available(self, agent):
self._agent = agent self._agent = agent
@ -477,30 +630,52 @@ class PiSugar(plugins.Plugin):
try: try:
if request.method == "GET": if request.method == "GET":
if path == "/" or not path: if path == "/" or not path:
version = self.safe_get(self.ps.get_version, default='Unknown') version = self.safe_get(
self.ps.get_version, default='Unknown')
model = self.safe_get(self.ps.get_model, default='Unknown') model = self.safe_get(self.ps.get_model, default='Unknown')
battery_level = self.safe_get(self.ps.get_battery_level, default='N/A') battery_level = self.safe_get(
battery_voltage = self.safe_get(self.ps.get_battery_voltage, default='N/A') self.ps.get_battery_level, default='N/A')
battery_current = self.safe_get(self.ps.get_battery_current, default='N/A') battery_voltage = self.safe_get(
battery_allow_charging = self.safe_get(self.ps.get_battery_allow_charging, default=False) self.ps.get_battery_voltage, default='N/A')
battery_charging_range = self.safe_get(self.ps.get_battery_charging_range, default='N/A') if self.is_new_model or model == 'Pisugar 3' else 'Not supported' battery_current = self.safe_get(
battery_full_charge_duration = getattr(self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')() self.ps.get_battery_current, default='N/A')
safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=None) battery_allow_charging = self.safe_get(
self.ps.get_battery_allow_charging, default=False)
battery_charging_range = self.safe_get(
self.ps.get_battery_charging_range, default='N/A')
battery_full_charge_duration = getattr(
self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')()
safe_shutdown_level = self.safe_get(
self.ps.get_battery_safe_shutdown_level, default=None)
battery_safe_shutdown_level = f"{safe_shutdown_level}%" if safe_shutdown_level is not None else 'Not set' battery_safe_shutdown_level = f"{safe_shutdown_level}%" if safe_shutdown_level is not None else 'Not set'
battery_safe_shutdown_delay = self.safe_get(self.ps.get_battery_safe_shutdown_delay, default='N/A') battery_safe_shutdown_delay = self.safe_get(
battery_auto_power_on = self.safe_get(self.ps.get_battery_auto_power_on, default=False) self.ps.get_battery_safe_shutdown_delay, default='N/A')
battery_soft_poweroff = self.safe_get(self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False battery_auto_power_on = self.safe_get(
system_time = self.safe_get(self.ps.get_system_time, default='N/A') self.ps.get_battery_auto_power_on, default=False)
rtc_adjust_ppm = self.safe_get(self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported' battery_soft_poweroff = self.safe_get(
rtc_alarm_repeat = self.safe_get(self.ps.get_rtc_alarm_repeat, default='N/A') self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False
single_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='single'), default=False) system_time = self.safe_get(
double_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='double'), default=False) self.ps.get_system_time, default='N/A')
long_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='long'), default=False) rtc_adjust_ppm = self.safe_get(
single_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='single'), default='N/A') self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported'
double_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='double'), default='N/A') rtc_alarm_repeat = self.safe_get(
long_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='long'), default='N/A') self.ps.get_rtc_alarm_repeat, default='N/A')
anti_mistouch = self.safe_get(self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False single_tap_enabled = self.safe_get(
temperature = self.safe_get(self.ps.get_temperature, default='N/A') lambda: self.ps.get_tap_enable(tap='single'), default=False)
double_tap_enabled = self.safe_get(
lambda: self.ps.get_tap_enable(tap='double'), default=False)
long_tap_enabled = self.safe_get(
lambda: self.ps.get_tap_enable(tap='long'), default=False)
single_tap_shell = self.safe_get(
lambda: self.ps.get_tap_shell(tap='single'), default='N/A')
double_tap_shell = self.safe_get(
lambda: self.ps.get_tap_shell(tap='double'), default='N/A')
long_tap_shell = self.safe_get(
lambda: self.ps.get_tap_shell(tap='long'), default='N/A')
anti_mistouch = self.safe_get(
self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False
temperature = self.safe_get(
self.ps.get_temperature, default='N/A')
ret = ''' ret = '''
<!DOCTYPE html> <!DOCTYPE html>
@ -561,7 +736,7 @@ class PiSugar(plugins.Plugin):
<tr><td>Battery Level</td><td>{battery_level}%</td></tr> <tr><td>Battery Level</td><td>{battery_level}%</td></tr>
<tr><td>Battery Voltage</td><td>{battery_voltage}V</td></tr> <tr><td>Battery Voltage</td><td>{battery_voltage}V</td></tr>
<tr><td>Battery Current</td><td>{battery_current}A</td></tr> <tr><td>Battery Current</td><td>{battery_current}A</td></tr>
<tr><td>Battery Allow Charging</td><td>{"Yes" if battery_allow_charging and self.is_new_model else "No"}</td></tr> <tr><td>Battery Allow Charging</td><td>{"Yes" if battery_allow_charging else "No"}</td></tr>
<tr><td>Battery Charging Range</td><td>{battery_charging_range}</td></tr> <tr><td>Battery Charging Range</td><td>{battery_charging_range}</td></tr>
<tr><td>Duration of Keep Charging When Full</td><td>{battery_full_charge_duration} seconds</td></tr> <tr><td>Duration of Keep Charging When Full</td><td>{battery_full_charge_duration} seconds</td></tr>
<tr><td>Battery Safe Shutdown Level</td><td>{battery_safe_shutdown_level}</td></tr> <tr><td>Battery Safe Shutdown Level</td><td>{battery_safe_shutdown_level}</td></tr>
@ -628,13 +803,25 @@ class PiSugar(plugins.Plugin):
# Make sure "bat" is in the UI state (guard to prevent KeyError) # Make sure "bat" is in the UI state (guard to prevent KeyError)
if 'bat' not in ui._state._state: if 'bat' not in ui._state._state:
return return
try:
self.ready = self.ps.ready
except Exception as e:
# Log at debug to avoid clutter since it might be a false positive
logging.warning(f"[PiSugarX] {e}")
if self.ready:
capacity = self.safe_get(self.ps.get_battery_level, default=0) capacity = self.safe_get(self.ps.get_battery_level, default=0)
voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00) voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00)
temp = self.safe_get(self.ps.get_temperature, default=0) temp = self.safe_get(self.ps.get_temperature, default=0)
else:
capacity = 0
voltage = 0.00
temp = 0
logging.info(f"[PiSugarX] PiSugar is not ready")
# Check if battery is plugged in # Check if battery is plugged in
battery_plugged = self.safe_get(self.ps.get_battery_power_plugged, default=False) battery_plugged = self.safe_get(
self.ps.get_battery_power_plugged, default=False)
if battery_plugged: if battery_plugged:
# If plugged in, display "CHG" # If plugged in, display "CHG"
@ -663,13 +850,3 @@ class PiSugar(plugins.Plugin):
ui.set('bat', f"{capacity:.0f}%") ui.set('bat', f"{capacity:.0f}%")
elif self.default_display == 'temp': elif self.default_display == 'temp':
ui.set('bat', f"{temp}°C") ui.set('bat', f"{temp}°C")
charging = self.safe_get(self.ps.get_battery_charging, default=None)
safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=0)
if charging is not None:
if capacity <= safe_shutdown_level:
logging.info(
f"[PiSugarX] Empty battery (<= {safe_shutdown_level}%): shutting down"
)
ui.update(force=True, new_data={"status": "Battery exhausted, bye ..."})

View File

@ -4,212 +4,368 @@ import json
import csv import csv
import requests import requests
import pwnagotchi import pwnagotchi
import re
from io import StringIO from glob import glob
from datetime import datetime
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
from threading import Lock from threading import Lock
from io import StringIO
from datetime import datetime, UTC
from dataclasses import dataclass
from flask import make_response, redirect
from pwnagotchi.utils import (
WifiInfo,
FieldNotFoundError,
extract_from_pcap,
StatusFile,
remove_whitelisted,
)
from pwnagotchi import plugins from pwnagotchi import plugins
from pwnagotchi.plugins.default.cache import read_ap_cache
from pwnagotchi._version import __version__ as __pwnagotchi_version__ from pwnagotchi._version import __version__ as __pwnagotchi_version__
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import Text
from pwnagotchi.ui.view import BLACK
def _extract_gps_data(path): from scapy.all import Scapy_Exception
"""
Extract data from gps-file
return json-obj
"""
try:
if path.endswith('.geo.json'):
with open(path, 'r') as json_file:
tempJson = json.load(json_file)
d = datetime.utcfromtimestamp(int(tempJson["ts"]))
return {"Latitude": tempJson["location"]["lat"],
"Longitude": tempJson["location"]["lng"],
"Altitude": 10,
"Accuracy": tempJson["accuracy"],
"Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
else:
with open(path, 'r') as json_file:
return json.load(json_file)
except OSError as os_err:
raise os_err
except json.JSONDecodeError as json_err:
raise json_err
def _format_auth(data): @dataclass
out = "" class WigleStatistics:
for auth in data: ready: bool = False
out = f"{out}[{auth}]" username: str = None
return [f"{auth}" for auth in data] rank: int = None
monthrank: int = None
discoveredwiFi: int = None
last: str = None
groupID: str = None
groupname: str = None
grouprank: int = None
def update_user(self, json_res):
self.ready = True
self.username = json_res["user"]
self.rank = json_res["rank"]
self.monthrank = json_res["monthRank"]
self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
def _transform_wigle_entry(gps_data, pcap_data, plugin_version): def update_user_group(self, json_res):
""" self.groupID = json_res["groupId"]
Transform to wigle entry in file self.groupname = json_res["groupName"]
"""
dummy = StringIO()
# write kismet header
dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
dummy.write(
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") def update_group(self, json_res):
writer.writerow([ rank = 1
pcap_data[WifiInfo.BSSID], for group in json_res["groups"]:
pcap_data[WifiInfo.ESSID], if group["groupId"] == self.groupID:
_format_auth(pcap_data[WifiInfo.ENCRYPTION]), self.grouprank = rank
datetime.strptime(gps_data['Updated'].rsplit('.')[0], rank += 1
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
gps_data['Latitude'],
gps_data['Longitude'],
gps_data['Altitude'],
gps_data['Accuracy'],
'WIFI'])
return dummy.getvalue()
def _send_to_wigle(lines, api_key, donate=True, timeout=30):
"""
Uploads the file to wigle-net
"""
dummy = StringIO()
for line in lines:
dummy.write(f"{line}")
dummy.seek(0)
headers = {"Authorization": f"Basic {api_key}",
"Accept": "application/json",
"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
data = {"donate": "on" if donate else "false"}
payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
try:
res = requests.post('https://api.wigle.net/api/v2/file/upload',
data=data,
headers=headers,
files=payload,
timeout=timeout)
json_res = res.json()
if not json_res['success']:
raise requests.exceptions.RequestException(json_res['message'])
except requests.exceptions.RequestException as re_e:
raise re_e
class Wigle(plugins.Plugin): class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony" __author__ = "Dadav and updated by Jayofelony and fmatray"
__version__ = "3.1.0" __version__ = "4.1.0"
__license__ = "GPL3" __license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net" __description__ = "This plugin automatically uploads collected WiFi to wigle.net"
LABEL_SPACING = 0
def __init__(self): def __init__(self):
self.ready = False self.ready = False
self.report = StatusFile('/root/.wigle_uploads', data_format='json') self.report = None
self.skip = list() self.skip = list()
self.lock = Lock() self.lock = Lock()
self.options = dict() self.options = dict()
self.statistics = WigleStatistics()
self.last_stat = datetime.now(tz=UTC)
self.ui_counter = 0
def on_loaded(self): def on_loaded(self):
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None): logging.info("[WIGLE] plugin loaded.")
logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
def on_config_changed(self, config):
self.api_key = self.options.get("api_key", None)
if not self.api_key:
logging.info("[WIGLE] api_key must be set.")
return return
self.donate = self.options.get("donate", False)
if 'donate' not in self.options: self.handshake_dir = config["bettercap"].get("handshakes")
self.options['donate'] = False report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
self.report = StatusFile(report_filename, data_format="json")
self.cache_dir = os.path.join(self.handshake_dir, "cache")
self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", [])
self.timeout = self.options.get("timeout", 30)
self.position = self.options.get("position", (10, 10))
self.ready = True self.ready = True
logging.info("WIGLE: ready") logging.info("[WIGLE] Ready for wardriving!!!")
self.get_statistics(force=True)
def on_webhook(self, path, request): def on_webhook(self, path, request):
from flask import make_response, redirect return make_response(redirect("https://www.wigle.net/", code=302))
response = make_response(redirect("https://www.wigle.net/", code=302))
return response
def on_internet_available(self, agent): def get_new_gps_files(self, reported):
""" all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
Called when there's internet connectivity all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
""" all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
if not self.ready or self.lock.locked(): return set(all_gps_files) - set(reported) - set(self.skip)
return
from scapy.all import Scapy_Exception @staticmethod
def get_pcap_filename(gps_file):
config = agent.config() pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_gps_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
if new_gps_files:
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
csv_entries = list()
no_err_entries = list()
for gps_file in new_gps_files:
if gps_file.endswith('.gps.json'):
pcap_filename = gps_file.replace('.gps.json', '.pcap')
if gps_file.endswith('.geo.json'):
pcap_filename = gps_file.replace('.geo.json', '.pcap')
if not os.path.exists(pcap_filename): if not os.path.exists(pcap_filename):
logging.debug("WIGLE: Can't find pcap for %s", gps_file) logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
self.skip.append(gps_file) return None
continue return pcap_filename
@staticmethod
def extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try: try:
gps_data = _extract_gps_data(gps_file) if path.endswith(".geo.json"):
except OSError as os_err: with open(path, "r") as json_file:
logging.debug("WIGLE: %s", os_err) tempJson = json.load(json_file)
self.skip.append(gps_file) d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
continue return {
except json.JSONDecodeError as json_err: "Latitude": tempJson["location"]["lat"],
logging.debug("WIGLE: %s", json_err) "Longitude": tempJson["location"]["lng"],
self.skip.append(gps_file) "Altitude": 10,
continue "Accuracy": tempJson["accuracy"],
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0: "Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file) }
self.skip.append(gps_file) with open(path, "r") as json_file:
continue return json.load(json_file)
except (OSError, json.JSONDecodeError) as exp:
raise exp
def get_gps_data(self, gps_file):
try: try:
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID, gps_data = self.extract_gps_data(gps_file)
except (OSError, json.JSONDecodeError) as exp:
logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
return None
if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
return None
return gps_data
def get_pcap_data(self, pcap_filename):
try:
if cache := read_ap_cache(self.cache_dir, self.pcap_filename):
logging.info(f"[WIGLE] Using cache for {pcap_filename}")
return {
WifiInfo.BSSID: cache["mac"],
WifiInfo.ESSID: cache["hostname"],
WifiInfo.ENCRYPTION: cache["encryption"],
WifiInfo.CHANNEL: cache["channel"],
WifiInfo.FREQUENCY: cache["frequency"],
WifiInfo.RSSI: cache["rssi"],
}
except (AttributeError, KeyError):
pass
try:
pcap_data = extract_from_pcap(
pcap_filename,
[
WifiInfo.BSSID,
WifiInfo.ESSID, WifiInfo.ESSID,
WifiInfo.ENCRYPTION, WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL, WifiInfo.CHANNEL,
WifiInfo.RSSI]) WifiInfo.FREQUENCY,
WifiInfo.RSSI,
],
)
logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
except FieldNotFoundError: except FieldNotFoundError:
logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file) logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
self.skip.append(gps_file) return None
continue
except Scapy_Exception as sc_e: except Scapy_Exception as sc_e:
logging.debug("WIGLE: %s", sc_e) logging.debug(f"[WIGLE] {sc_e}")
self.skip.append(gps_file) return None
continue return pcap_data
new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
csv_entries.append(new_entry)
no_err_entries.append(gps_file)
if csv_entries:
display.on_uploading('wigle.net')
def generate_csv(self, data):
date = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{pwnagotchi.name()}_{date}.csv"
content = StringIO()
# write kismet header + header
content.write(
f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
)
writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
for gps_data, pcap_data in data: # write WIFIs
try: try:
_send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate']) timestamp = datetime.strptime(
reported += no_err_entries gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
self.report.update(data={'reported': reported}) ).strftime("%Y-%m-%d %H:%M:%S")
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) except ValueError:
except requests.exceptions.RequestException as re_e: timestamp = datetime.strptime(
self.skip += no_err_entries gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S"
logging.debug("WIGLE: Got an exception while uploading %s", re_e) ).strftime("%Y-%m-%d %H:%M:%S")
except OSError as os_e: writer.writerow(
self.skip += no_err_entries [
logging.debug("WIGLE: Got the following error: %s", os_e) pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
timestamp,
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.FREQUENCY],
pcap_data[WifiInfo.RSSI],
gps_data["Latitude"],
gps_data["Longitude"],
gps_data["Altitude"],
gps_data["Accuracy"],
"", # RCOIs to populate
"", # MfgrId always empty
"WIFI",
]
)
content.seek(0)
return filename, content
def save_to_file(self, cvs_filename, cvs_content):
if not self.cvs_dir:
return
filename = os.path.join(self.cvs_dir, cvs_filename)
logging.info(f"[WIGLE] Saving to file {filename}")
try:
with open(filename, mode="w") as f:
f.write(cvs_content.getvalue())
except Exception as exp:
logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
try:
json_res = requests.post(
"https://api.wigle.net/api/v2/file/upload",
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
data={"donate": "on" if self.donate else "false"},
files=dict(file=(cvs_filename, cvs_content, "text/csv")),
timeout=self.timeout,
).json()
if not json_res["success"]:
raise requests.exceptions.RequestException(json_res["message"])
reported += no_err_entries
self.report.update(data={"reported": reported})
logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
except (requests.exceptions.RequestException, OSError) as exp:
self.skip += no_err_entries
logging.debug(f"[WIGLE] Exception while uploading: {exp}")
def upload_new_handshakes(self, reported, new_gps_files, agent):
logging.info("[WIGLE] Uploading new handshakes to wigle.net")
csv_entries, no_err_entries = list(), list()
for gps_file in new_gps_files:
logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
if (
(pcap_filename := self.get_pcap_filename(gps_file))
and (gps_data := self.get_gps_data(gps_file))
and (pcap_data := self.get_pcap_data(pcap_filename))
):
csv_entries.append((gps_data, pcap_data))
no_err_entries.append(gps_file)
else:
self.skip.append(gps_file)
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
if csv_entries:
cvs_filename, cvs_content = self.generate_csv(csv_entries)
self.save_to_file(cvs_filename, cvs_content)
display = agent.view()
display.on_uploading("wigle.net")
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
display.on_normal() display.on_normal()
def request_statistics(self, url):
try:
return requests.get(
url,
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
timeout=self.timeout,
).json()
except (requests.exceptions.RequestException, OSError) as exp:
return None
def get_user_statistics(self):
json_res = self.request_statistics(
"https://api.wigle.net/api/v2/stats/user",
)
if json_res and json_res["success"]:
self.statistics.update_user(json_res)
def get_usergroup_statistics(self):
if not self.statistics.username or self.statistics.groupID:
return
url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}"
if json_res := self.request_statistics(url):
self.statistics.update_user_group(json_res)
def get_group_statistics(self):
if not self.statistics.groupID:
return
json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group")
if json_res and json_res["success"]:
self.statistics.update_group(json_res)
def get_statistics(self, force=False):
if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30:
self.last_stat = datetime.now(tz=UTC)
self.get_user_statistics()
self.get_usergroup_statistics()
self.get_group_statistics()
def on_internet_available(self, agent):
if not self.ready:
return
with self.lock:
reported = self.report.data_field_or("reported", default=list())
if new_gps_files := self.get_new_gps_files(reported):
self.upload_new_handshakes(reported, new_gps_files, agent)
else:
self.get_statistics()
def on_ui_setup(self, ui):
with ui._lock:
ui.add_element(
"wigle",
Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
)
def on_unload(self, ui):
with ui._lock:
try:
ui.remove_element("wigle")
except KeyError:
pass
def on_ui_update(self, ui):
with ui._lock:
if not (self.ready and self.statistics.ready):
ui.set("wigle", "We Will Wait Wigle")
return
msg = "-"
self.ui_counter = (self.ui_counter + 1) % 6
if self.ui_counter == 0:
msg = f"User:{self.statistics.username}"
if self.ui_counter == 1:
msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}"
elif self.ui_counter == 2:
msg = f"{self.statistics.discoveredwiFi} discovered WiFis"
elif self.ui_counter == 3:
msg = f"Last upl.:{self.statistics.last}"
elif self.ui_counter == 4:
msg = f"Grp:{self.statistics.groupname}"
elif self.ui_counter == 5:
msg = f"Grp rank:{self.statistics.grouprank}"
ui.set("wigle", msg)

View File

@ -564,7 +564,8 @@ class WifiInfo(Enum):
ESSID = 1 ESSID = 1
ENCRYPTION = 2 ENCRYPTION = 2
CHANNEL = 3 CHANNEL = 3
RSSI = 4 FREQUENCY = 4
RSSI = 5
class FieldNotFoundError(Exception): class FieldNotFoundError(Exception):
@ -594,9 +595,6 @@ def extract_from_pcap(path, fields):
""" """
results = dict() results = dict()
for field in fields: for field in fields:
if not isinstance(field, WifiInfo):
raise TypeError("Invalid field")
subtypes = set() subtypes = set()
if field == WifiInfo.BSSID: if field == WifiInfo.BSSID:
@ -606,8 +604,7 @@ def extract_from_pcap(path, fields):
packets = sniff(offline=path, filter=bpf_filter) packets = sniff(offline=path, filter=bpf_filter)
try: try:
for packet in packets: for packet in packets:
if packet.haslayer(Dot11Beacon): if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'):
if hasattr(packet[Dot11], 'addr3'):
results[field] = packet[Dot11].addr3 results[field] = packet[Dot11].addr3
break break
else: # magic else: # magic
@ -654,6 +651,14 @@ def extract_from_pcap(path, fields):
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency) results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
except Exception: except Exception:
raise FieldNotFoundError("Could not find field [CHANNEL]") raise FieldNotFoundError("Could not find field [CHANNEL]")
elif field == WifiInfo.FREQUENCY:
from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
packets = sniff(offline=path, count=1)
try:
results[field] = packets[0][RadioTap].ChannelFrequency
except Exception:
raise FieldNotFoundError("Could not find field [FREQUENCY]")
elif field == WifiInfo.RSSI: elif field == WifiInfo.RSSI:
from scapy.layers.dot11 import sniff, RadioTap from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel from pwnagotchi.mesh.wifi import freq_to_channel
@ -662,7 +667,8 @@ def extract_from_pcap(path, fields):
results[field] = packets[0][RadioTap].dBm_AntSignal results[field] = packets[0][RadioTap].dBm_AntSignal
except Exception: except Exception:
raise FieldNotFoundError("Could not find field [RSSI]") raise FieldNotFoundError("Could not find field [RSSI]")
else:
raise TypeError("Invalid field")
return results return results

View File

@ -27,11 +27,19 @@ class Voice:
self._('Hack the Planet!'), self._('Hack the Planet!'),
self._('No more mister Wi-Fi!!'), self._('No more mister Wi-Fi!!'),
self._('Pretty fly 4 a Wi-Fi!'), self._('Pretty fly 4 a Wi-Fi!'),
self._('Good Pwning!'), # Battlestar Galactica
self._('Ensign, Engage!'), # Star trek
self._('Free your Wi-Fi!'), # Matrix
self._('Chevron Seven, locked.'), # Stargate
self._('May the Wi-fi be with you'), # Star wars
]) ])
def on_keys_generation(self): def on_keys_generation(self):
return random.choice([ return random.choice([
self._('Generating keys, do not turn off ...')]) self._('Generating keys, do not turn off ...'),
self._('Are you the keymaster?'), # Ghostbusters
self._('I am the keymaster!'), # Ghostbusters
])
def on_normal(self): def on_normal(self):
return random.choice([ return random.choice([
@ -44,7 +52,6 @@ class Voice:
def on_reading_logs(self, lines_so_far=0): def on_reading_logs(self, lines_so_far=0):
if lines_so_far == 0: if lines_so_far == 0:
return self._('Reading last session logs ...') return self._('Reading last session logs ...')
else:
return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far) return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far)
def on_bored(self): def on_bored(self):
@ -53,7 +60,11 @@ class Voice:
self._('Let\'s go for a walk!')]) self._('Let\'s go for a walk!')])
def on_motivated(self, reward): def on_motivated(self, reward):
return self._('This is the best day of my life!') return random.choice([
self._('This is the best day of my life!'),
self._('All your base are belong to us'),
self._('Fascinating!'), # Star trek
])
def on_demotivated(self, reward): def on_demotivated(self, reward):
return self._('Shitty day :/') return self._('Shitty day :/')
@ -63,6 +74,8 @@ class Voice:
self._('I\'m extremely bored ...'), self._('I\'m extremely bored ...'),
self._('I\'m very sad ...'), self._('I\'m very sad ...'),
self._('I\'m sad'), self._('I\'m sad'),
self._('I\'m so happy ...'), # Marvin in H2G2
self._('Life? Don\'t talk to me about life.'), # Also Marvin in H2G2
'...']) '...'])
def on_angry(self): def on_angry(self):
@ -78,13 +91,13 @@ class Voice:
self._('I pwn therefore I am.'), self._('I pwn therefore I am.'),
self._('So many networks!!!'), self._('So many networks!!!'),
self._('I\'m having so much fun!'), self._('I\'m having so much fun!'),
self._('It\'s a Wi-Fi system! I know this!'), # Jurassic park
self._('My crime is that of curiosity ...')]) self._('My crime is that of curiosity ...')])
def on_new_peer(self, peer): def on_new_peer(self, peer):
if peer.first_encounter(): if peer.first_encounter():
return random.choice([ return random.choice([
self._('Hello {name}! Nice to meet you.').format(name=peer.name())]) self._('Hello {name}! Nice to meet you.').format(name=peer.name())])
else:
return random.choice([ return random.choice([
self._('Yo {name}! Sup?').format(name=peer.name()), self._('Yo {name}! Sup?').format(name=peer.name()),
self._('Hey {name} how are you doing?').format(name=peer.name()), self._('Hey {name} how are you doing?').format(name=peer.name()),
@ -104,19 +117,23 @@ class Voice:
def on_grateful(self): def on_grateful(self):
return random.choice([ return random.choice([
self._('Good friends are a blessing!'), self._('Good friends are a blessing!'),
self._('I love my friends!')]) self._('I love my friends!')
])
def on_lonely(self): def on_lonely(self):
return random.choice([ return random.choice([
self._('Nobody wants to play with me ...'), self._('Nobody wants to play with me ...'),
self._('I feel so alone ...'), self._('I feel so alone ...'),
self._('Let\'s find friends'),
self._('Where\'s everybody?!')]) self._('Where\'s everybody?!')])
def on_napping(self, secs): def on_napping(self, secs):
return random.choice([ return random.choice([
self._('Napping for {secs}s ...').format(secs=secs), self._('Napping for {secs}s ...').format(secs=secs),
self._('Zzzzz'), self._('Zzzzz'),
self._('ZzzZzzz ({secs}s)').format(secs=secs)]) self._('Snoring ...'),
self._('ZzzZzzz ({secs}s)').format(secs=secs),
])
def on_shutdown(self): def on_shutdown(self):
return random.choice([ return random.choice([
@ -124,12 +141,17 @@ class Voice:
self._('Zzz')]) self._('Zzz')])
def on_awakening(self): def on_awakening(self):
return random.choice(['...', '!']) return random.choice([
'...',
'!',
'Hello World!',
self._('I dreamed of electric sheep'),
])
def on_waiting(self, secs): def on_waiting(self, secs):
return random.choice([ return random.choice([
self._('Waiting for {secs}s ...').format(secs=secs),
'...', '...',
self._('Waiting for {secs}s ...').format(secs=secs),
self._('Looking around ({secs}s)').format(secs=secs)]) self._('Looking around ({secs}s)').format(secs=secs)])
def on_assoc(self, ap): def on_assoc(self, ap):
@ -138,12 +160,16 @@ class Voice:
return random.choice([ return random.choice([
self._('Hey {what} let\'s be friends!').format(what=what), self._('Hey {what} let\'s be friends!').format(what=what),
self._('Associating to {what}').format(what=what), self._('Associating to {what}').format(what=what),
self._('Yo {what}!').format(what=what)]) self._('Yo {what}!').format(what=what),
self._('Rise and Shine Mr. {what}!').format(what=what), # Half Life
])
def on_deauth(self, sta): def on_deauth(self, sta):
return random.choice([ return random.choice([
self._('Just decided that {mac} needs no WiFi!').format(mac=sta['mac']), self._('Just decided that {mac} needs no Wi-Fi!').format(mac=sta['mac']),
self._('Deauthenticating {mac}').format(mac=sta['mac']), self._('Deauthenticating {mac}').format(mac=sta['mac']),
self._('No more Wi-Fi for {mac}').format(mac=sta['mac']),
self._('It\'s a trap! {mac}').format(mac=sta['mac']), # Star wars
self._('Kickbanning {mac}!').format(mac=sta['mac'])]) self._('Kickbanning {mac}!').format(mac=sta['mac'])])
def on_handshakes(self, new_shakes): def on_handshakes(self, new_shakes):
@ -155,10 +181,19 @@ class Voice:
return self._('You have {count} new message{plural}!').format(count=count, plural=s) return self._('You have {count} new message{plural}!').format(count=count, plural=s)
def on_rebooting(self): def on_rebooting(self):
return self._("Oops, something went wrong ... Rebooting ...") return random.choice([
self._("Oops, something went wrong ... Rebooting ..."),
self._("Have you tried turning it off and on again?"), # The IT crew
self._("I\'m afraid Dave"), # 2001 Space Odyssey
self._("I\'m dead, Jim!"), # Star Trek
self._("I have a bad feeling about this"), # Star wars
])
def on_uploading(self, to): def on_uploading(self, to):
return self._("Uploading data to {to} ...").format(to=to) return random.choice([
self._("Uploading data to {to} ...").format(to=to),
self._("Beam me up to {to}").format(to=to),
])
def on_downloading(self, name): def on_downloading(self, name):
return self._("Downloading from {name} ...").format(name=name) return self._("Downloading from {name} ...").format(name=name)