Merge pull request #341 from fmatray/noai

Add Cache plugin, improve Wigle, debug bt-tether, add more sentences to voices
This commit is contained in:
Jayofelony
2025-02-17 14:16:58 +01:00
committed by GitHub
6 changed files with 358 additions and 101 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.pyc *.pyc
.vscode

View File

@ -33,6 +33,8 @@ main.plugins.bt-tether.dns = "" # optional, default (google): "8.8.8.8 1.1.1.1".
main.plugins.fix_services.enabled = true main.plugins.fix_services.enabled = true
main.plugins.cache.enabled = true
main.plugins.gdrivesync.enabled = false main.plugins.gdrivesync.enabled = false
main.plugins.gdrivesync.backupfiles = [''] main.plugins.gdrivesync.backupfiles = ['']
main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups" main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups"

View File

@ -123,6 +123,7 @@ MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$" IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$" DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
class BTTether(plugins.Plugin): class BTTether(plugins.Plugin):
__author__ = "Jayofelony, modified my fmatray" __author__ = "Jayofelony, modified my fmatray"
__version__ = "1.4" __version__ = "1.4"
@ -138,13 +139,13 @@ class BTTether(plugins.Plugin):
@staticmethod @staticmethod
def exec_cmd(cmd, args, pattern=None): def exec_cmd(cmd, args, pattern=None):
try: try:
result = subprocess.run([cmd] + args, result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True)
check=True, capture_output=True, text=True)
if pattern: if pattern:
return result.stdout.find(pattern) return result.stdout.find(pattern)
return result return result
except Exception as exp: except Exception as exp:
logging.error(f"[BT-Tether] Error with {cmd} : {exp}") logging.error(f"[BT-Tether] Error with {cmd}")
logging.error(f"[BT-Tether] Exception : {exp}")
raise exp raise exp
def bluetoothctl(self, args, pattern=None): def bluetoothctl(self, args, pattern=None):
@ -181,23 +182,36 @@ class BTTether(plugins.Plugin):
self.mac = self.options["mac"] self.mac = self.options["mac"]
dns = self.options.get("dns", "8.8.8.8 1.1.1.1") dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
if not re.match(DNS_PTTRN, dns): if not re.match(DNS_PTTRN, dns):
logging.error(f"[BT-Tether] DNS error: {dns}") if dns == "":
logging.error(f"[BT-Tether] Empty DNS setting")
else:
logging.error(f"[BT-Tether] Wrong DNS setting: '{dns}'")
return return
dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
try: try:
# Configure connection. Metric is set to 200 to prefer connection over USB # Configure connection. Metric is set to 200 to prefer connection over USB
self.nmcli(["connection", "modify", f"{self.phone_name}", self.nmcli(
"connection.type", "bluetooth", [
"bluetooth.type", "panu", "connection", "modify", f"{self.phone_name}",
"bluetooth.bdaddr", f"{self.mac}", "connection.type", "bluetooth",
"connection.autoconnect", "yes", "bluetooth.type", "panu",
"connection.autoconnect-retries", "0", "bluetooth.bdaddr", f"{self.mac}",
"ipv4.method", "manual", "connection.autoconnect", "yes",
"ipv4.dns", f"{dns}", "connection.autoconnect-retries", "0",
"ipv4.addresses", f"{address}/24", "ipv4.method", "manual",
"ipv4.gateway", f"{gateway}", "ipv4.dns", f"{dns}",
"ipv4.route-metric", "200" ]) "ipv4.addresses", f"{address}/24",
"ipv4.gateway", f"{gateway}",
"ipv4.route-metric", "200",
]
)
# Configure Device to autoconnect
self.nmcli([
"device", "set", f"{self.mac}",
"autoconnect", "yes",
"managed", "yes"
])
self.nmcli(["connection", "reload"]) self.nmcli(["connection", "reload"])
self.ready = True self.ready = True
logging.info(f"[BT-Tether] Connection {self.phone_name} configured") logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
@ -205,7 +219,7 @@ class BTTether(plugins.Plugin):
logging.error(f"[BT-Tether] Error while configuring: {e}") logging.error(f"[BT-Tether] Error while configuring: {e}")
return return
try: try:
time.sleep(5) # Give some delay to configure before going up time.sleep(5) # Give some delay to configure before going up
self.nmcli(["connection", "up", f"{self.phone_name}"]) self.nmcli(["connection", "up", f"{self.phone_name}"])
except Exception as e: except Exception as e:
logging.error(f"[BT-Tether] Failed to connect to device: {e}") logging.error(f"[BT-Tether] Failed to connect to device: {e}")
@ -230,9 +244,18 @@ class BTTether(plugins.Plugin):
def on_ui_setup(self, ui): def on_ui_setup(self, ui):
with ui._lock: with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', ui.add_element(
position=(ui.width() / 2 - 10, 0), "bluetooth",
label_font=fonts.Bold, text_font=fonts.Medium)) LabeledValue(
color=BLACK,
label="BT",
value="-",
position=(ui.width() / 2 - 10, 0),
label_font=fonts.Bold,
text_font=fonts.Medium,
),
)
def on_ui_update(self, ui): def on_ui_update(self, ui):
if not self.ready: if not self.ready:
return return
@ -240,8 +263,12 @@ class BTTether(plugins.Plugin):
status = "" status = ""
try: try:
# Checking connection # Checking connection
if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name], if (
"activated") != -1: self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
"activated",
)
!= -1
):
ui.set("bluetooth", "U") ui.set("bluetooth", "U")
return return
else: else:
@ -249,8 +276,13 @@ class BTTether(plugins.Plugin):
status = "BT Conn. down" status = "BT Conn. down"
# Checking device # Checking device
if self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac], if (
"(connected)") != -1: self.nmcli(
["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
"(connected)",
)
!= -1
):
ui.set("bluetooth", "C") ui.set("bluetooth", "C")
status += "\nBT dev conn." status += "\nBT dev conn."
else: else:
@ -269,26 +301,28 @@ class BTTether(plugins.Plugin):
if path == "/" or not path: if path == "/" or not path:
try: try:
bluetooth = self.bluetoothctl(["info", self.mac]) bluetooth = self.bluetoothctl(["info", self.mac])
bluetooth = bluetooth.stdout.replace('\n', '<br>') bluetooth = bluetooth.stdout.replace("\n", "<br>")
except Exception as e: except Exception as e:
bluetooth = "Error while checking bluetoothctl" bluetooth = "Error while checking bluetoothctl"
try: try:
device = self.nmcli(["-w", "0","device", "show", self.mac]) device = self.nmcli(["-w", "0", "device", "show", self.mac])
device = device.stdout.replace('\n', '<br>') device = device.stdout.replace("\n", "<br>")
except Exception as e: except Exception as e:
device = "Error while checking nmcli device" device = "Error while checking nmcli device"
try: try:
connection = self.nmcli(["-w", "0","connection", "show", self.phone_name]) connection = self.nmcli(["-w", "0", "connection", "show", self.phone_name])
connection = connection.stdout.replace('\n', '<br>') connection = connection.stdout.replace("\n", "<br>")
except Exception as e: except Exception as e:
connection = "Error while checking nmcli connection" connection = "Error while checking nmcli connection"
logging.debug(device) logging.debug(device)
return render_template_string(TEMPLATE, return render_template_string(
title="BT-Tether", TEMPLATE,
bluetooth=bluetooth, title="BT-Tether",
device=device, bluetooth=bluetooth,
connection=connection) device=device,
connection=connection,
)
abort(404) abort(404)

View File

@ -0,0 +1,119 @@
import logging
import json
import os
import re
import pathlib
import pwnagotchi.plugins as plugins
from datetime import datetime, UTC
from threading import Lock
def read_ap_cache(cache_dir, file):
cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file))
cache_filename = os.path.join(cache_dir, cache_filename)
if not os.path.exists(cache_filename):
logging.info("Cache not exist")
return None
try:
with open(cache_filename, "r") as f:
return json.load(f)
except Exception as e:
logging.info(f"Exception {e}")
return None
class Cache(plugins.Plugin):
__author__ = "fmatray"
__version__ = "1.0.0"
__license__ = "GPL3"
__description__ = "A simple plugin to cache AP informations"
def __init__(self):
self.options = dict()
self.ready = False
self.lock = Lock()
def on_loaded(self):
logging.info("[CACHE] plugin loaded.")
def on_config_changed(self, config):
try:
handshake_dir = config["bettercap"].get("handshakes")
self.cache_dir = os.path.join(handshake_dir, "cache")
os.makedirs(self.cache_dir, exist_ok=True)
except Exception:
logging.info(f"[CACHE] Cannot access to the cache directory")
return
self.last_clean = datetime.now(tz=UTC)
self.ready = True
logging.info(f"[CACHE] Cache plugin configured")
self.clean_ap_cache()
def on_unload(self, ui):
self.clean_ap_cache()
def clean_ap_cache(self):
if not self.ready:
return
with self.lock:
ctime = datetime.now(tz=UTC)
cache_to_delete = list()
for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"):
try:
mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC)
if (ctime - mtime).total_seconds() > 60 * 5:
cache_to_delete.append(cache_file)
except FileNotFoundError:
pass
if cache_to_delete:
logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files")
for cache_file in cache_to_delete:
try:
cache_file.unlink()
except FileNotFoundError as e:
pass
def write_ap_cache(self, access_point):
with self.lock:
try:
mac = access_point["mac"].replace(":", "")
hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"])
except KeyError:
return
cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache")
try:
with open(cache_file, "w") as f:
json.dump(access_point, f)
except Exception as e:
logging.error(f"[CACHE] Cannot write {cache_file}: {e}")
pass
def on_wifi_update(self, agent, access_points):
if self.ready:
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], access_points):
self.write_ap_cache(ap)
def on_unfiltered_ap_list(self, agent, aps):
if self.ready:
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], aps):
self.write_ap_cache(ap)
def on_association(self, agent, access_point):
if self.ready:
self.write_ap_cache(access_point)
def on_deauthentication(self, agent, access_point, client_station):
if self.ready:
self.write_ap_cache(access_point)
def on_handshake(self, agent, filename, access_point, client_station):
if self.ready:
self.write_ap_cache(access_point)
def on_ui_update(self, ui):
if not self.ready:
return
current_time = datetime.now(tz=UTC)
if (current_time - self.last_clean).total_seconds() > 60:
self.clean_ap_cache()
self.last_clean = current_time

View File

@ -9,6 +9,7 @@ from glob import glob
from threading import Lock from threading import Lock
from io import StringIO from io import StringIO
from datetime import datetime, UTC from datetime import datetime, UTC
from dataclasses import dataclass
from flask import make_response, redirect from flask import make_response, redirect
from pwnagotchi.utils import ( from pwnagotchi.utils import (
@ -19,6 +20,7 @@ from pwnagotchi.utils import (
remove_whitelisted, remove_whitelisted,
) )
from pwnagotchi import plugins from pwnagotchi import plugins
from pwnagotchi.plugins.default.cache import read_ap_cache
from pwnagotchi._version import __version__ as __pwnagotchi_version__ from pwnagotchi._version import __version__ as __pwnagotchi_version__
import pwnagotchi.ui.fonts as fonts import pwnagotchi.ui.fonts as fonts
@ -28,9 +30,42 @@ from pwnagotchi.ui.view import BLACK
from scapy.all import Scapy_Exception from scapy.all import Scapy_Exception
@dataclass
class WigleStatistics:
ready: bool = False
username: str = None
rank: int = None
monthrank: int = None
discoveredwiFi: int = None
last: str = None
groupID: str = None
groupname: str = None
grouprank: int = None
def update_user(self, json_res):
self.ready = True
self.username = json_res["user"]
self.rank = json_res["rank"]
self.monthrank = json_res["monthRank"]
self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
def update_user_group(self, json_res):
self.groupID = json_res["groupId"]
self.groupname = json_res["groupName"]
def update_group(self, json_res):
rank = 1
for group in json_res["groups"]:
if group["groupId"] == self.groupID:
self.grouprank = rank
rank += 1
class Wigle(plugins.Plugin): class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony and fmatray" __author__ = "Dadav and updated by Jayofelony and fmatray"
__version__ = "4.0.0" __version__ = "4.1.0"
__license__ = "GPL3" __license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net" __description__ = "This plugin automatically uploads collected WiFi to wigle.net"
LABEL_SPACING = 0 LABEL_SPACING = 0
@ -41,17 +76,13 @@ class Wigle(plugins.Plugin):
self.skip = list() self.skip = list()
self.lock = Lock() self.lock = Lock()
self.options = dict() self.options = dict()
self.statistics = dict( self.statistics = WigleStatistics()
ready=False,
username=None,
rank=None,
monthrank=None,
discoveredwiFi=None,
last=None,
)
self.last_stat = datetime.now(tz=UTC) self.last_stat = datetime.now(tz=UTC)
self.ui_counter = 0 self.ui_counter = 0
def on_loaded(self):
logging.info("[WIGLE] plugin loaded.")
def on_config_changed(self, config): def on_config_changed(self, config):
self.api_key = self.options.get("api_key", None) self.api_key = self.options.get("api_key", None)
if not self.api_key: if not self.api_key:
@ -61,6 +92,7 @@ class Wigle(plugins.Plugin):
self.handshake_dir = config["bettercap"].get("handshakes") self.handshake_dir = config["bettercap"].get("handshakes")
report_filename = os.path.join(self.handshake_dir, ".wigle_uploads") report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
self.report = StatusFile(report_filename, data_format="json") self.report = StatusFile(report_filename, data_format="json")
self.cache_dir = os.path.join(self.handshake_dir, "cache")
self.cvs_dir = self.options.get("cvs_dir", None) self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", []) self.whitelist = config["main"].get("whitelist", [])
self.timeout = self.options.get("timeout", 30) self.timeout = self.options.get("timeout", 30)
@ -120,8 +152,20 @@ class Wigle(plugins.Plugin):
return None return None
return gps_data return gps_data
@staticmethod def get_pcap_data(self, pcap_filename):
def get_pcap_data(pcap_filename): try:
if cache := read_ap_cache(self.cache_dir, self.pcap_filename):
logging.info(f"[WIGLE] Using cache for {pcap_filename}")
return {
WifiInfo.BSSID: cache["mac"],
WifiInfo.ESSID: cache["hostname"],
WifiInfo.ENCRYPTION: cache["encryption"],
WifiInfo.CHANNEL: cache["channel"],
WifiInfo.FREQUENCY: cache["frequency"],
WifiInfo.RSSI: cache["rssi"],
}
except (AttributeError, KeyError):
pass
try: try:
pcap_data = extract_from_pcap( pcap_data = extract_from_pcap(
pcap_filename, pcap_filename,
@ -154,14 +198,16 @@ class Wigle(plugins.Plugin):
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n" f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n" f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
) )
writer = csv.writer( writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\"
)
for gps_data, pcap_data in data: # write WIFIs for gps_data, pcap_data in data: # write WIFIs
try: try:
timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") timestamp = datetime.strptime(
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S")
except ValueError: except ValueError:
timestamp = datetime.strptime(gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") timestamp = datetime.strptime(
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S")
writer.writerow( writer.writerow(
[ [
pcap_data[WifiInfo.BSSID], pcap_data[WifiInfo.BSSID],
@ -238,31 +284,46 @@ class Wigle(plugins.Plugin):
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries) self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
display.on_normal() display.on_normal()
def get_statistics(self, force=False): def request_statistics(self, url):
if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30:
return
self.last_stat = datetime.now(tz=UTC)
try: try:
self.statistics["ready"] = False return requests.get(
json_res = requests.get( url,
"https://api.wigle.net/api/v2/stats/user",
headers={ headers={
"Authorization": f"Basic {self.api_key}", "Authorization": f"Basic {self.api_key}",
"Accept": "application/json", "Accept": "application/json",
}, },
timeout=self.timeout, timeout=self.timeout,
).json() ).json()
if not json_res["success"]:
return
self.statistics["ready"] = True
self.statistics["username"] = json_res["user"]
self.statistics["rank"] = json_res["rank"]
self.statistics["monthrank"] = json_res["monthRank"]
self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
except (requests.exceptions.RequestException, OSError) as exp: except (requests.exceptions.RequestException, OSError) as exp:
pass return None
def get_user_statistics(self):
json_res = self.request_statistics(
"https://api.wigle.net/api/v2/stats/user",
)
if json_res and json_res["success"]:
self.statistics.update_user(json_res)
def get_usergroup_statistics(self):
if not self.statistics.username or self.statistics.groupID:
return
url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}"
if json_res := self.request_statistics(url):
self.statistics.update_user_group(json_res)
def get_group_statistics(self):
if not self.statistics.groupID:
return
json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group")
if json_res and json_res["success"]:
self.statistics.update_group(json_res)
def get_statistics(self, force=False):
if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30:
self.last_stat = datetime.now(tz=UTC)
self.get_user_statistics()
self.get_usergroup_statistics()
self.get_group_statistics()
def on_internet_available(self, agent): def on_internet_available(self, agent):
if not self.ready: if not self.ready:
@ -283,23 +344,28 @@ class Wigle(plugins.Plugin):
def on_unload(self, ui): def on_unload(self, ui):
with ui._lock: with ui._lock:
ui.remove_element("wigle") try:
ui.remove_element("wigle")
except KeyError:
pass
def on_ui_update(self, ui): def on_ui_update(self, ui):
if not self.ready:
return
with ui._lock: with ui._lock:
if not self.statistics["ready"]: if not (self.ready and self.statistics.ready):
ui.set("wigle", "We Will Wait Wigle") ui.set("wigle", "We Will Wait Wigle")
return return
msg = "-" msg = "-"
self.ui_counter = (self.ui_counter + 1) % 4 self.ui_counter = (self.ui_counter + 1) % 6
if self.ui_counter == 0: if self.ui_counter == 0:
msg = f"User:{self.statistics['username']}" msg = f"User:{self.statistics.username}"
if self.ui_counter == 1: if self.ui_counter == 1:
msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}" msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}"
elif self.ui_counter == 2: elif self.ui_counter == 2:
msg = f"{self.statistics['discoveredwiFi']} discovered WiFis" msg = f"{self.statistics.discoveredwiFi} discovered WiFis"
elif self.ui_counter == 3: elif self.ui_counter == 3:
msg = f"Last upl.:{self.statistics['last']}" msg = f"Last upl.:{self.statistics.last}"
elif self.ui_counter == 4:
msg = f"Grp:{self.statistics.groupname}"
elif self.ui_counter == 5:
msg = f"Grp rank:{self.statistics.grouprank}"
ui.set("wigle", msg) ui.set("wigle", msg)

View File

@ -27,11 +27,19 @@ class Voice:
self._('Hack the Planet!'), self._('Hack the Planet!'),
self._('No more mister Wi-Fi!!'), self._('No more mister Wi-Fi!!'),
self._('Pretty fly 4 a Wi-Fi!'), self._('Pretty fly 4 a Wi-Fi!'),
self._('Good Pwning!'), # Battlestar Galactica
self._('Ensign, Engage!'), # Star trek
self._('Free your Wi-Fi!'), # Matrix
self._('Chevron Seven, locked.'), # Stargate
self._('May the Wi-fi be with you'), # Star wars
]) ])
def on_keys_generation(self): def on_keys_generation(self):
return random.choice([ return random.choice([
self._('Generating keys, do not turn off ...')]) self._('Generating keys, do not turn off ...'),
self._('Are you the keymaster?'), # Ghostbusters
self._('I am the keymaster!'), # Ghostbusters
])
def on_normal(self): def on_normal(self):
return random.choice([ return random.choice([
@ -44,8 +52,7 @@ class Voice:
def on_reading_logs(self, lines_so_far=0): def on_reading_logs(self, lines_so_far=0):
if lines_so_far == 0: if lines_so_far == 0:
return self._('Reading last session logs ...') return self._('Reading last session logs ...')
else: return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far)
return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far)
def on_bored(self): def on_bored(self):
return random.choice([ return random.choice([
@ -53,7 +60,11 @@ class Voice:
self._('Let\'s go for a walk!')]) self._('Let\'s go for a walk!')])
def on_motivated(self, reward): def on_motivated(self, reward):
return self._('This is the best day of my life!') return random.choice([
self._('This is the best day of my life!'),
self._('All your base are belong to us'),
self._('Fascinating!'), # Star trek
])
def on_demotivated(self, reward): def on_demotivated(self, reward):
return self._('Shitty day :/') return self._('Shitty day :/')
@ -63,6 +74,8 @@ class Voice:
self._('I\'m extremely bored ...'), self._('I\'m extremely bored ...'),
self._('I\'m very sad ...'), self._('I\'m very sad ...'),
self._('I\'m sad'), self._('I\'m sad'),
self._('I\'m so happy ...'), # Marvin in H2G2
self._('Life? Don\'t talk to me about life.'), # Also Marvin in H2G2
'...']) '...'])
def on_angry(self): def on_angry(self):
@ -78,17 +91,17 @@ class Voice:
self._('I pwn therefore I am.'), self._('I pwn therefore I am.'),
self._('So many networks!!!'), self._('So many networks!!!'),
self._('I\'m having so much fun!'), self._('I\'m having so much fun!'),
self._('It\'s a Wi-Fi system! I know this!'), # Jurassic park
self._('My crime is that of curiosity ...')]) self._('My crime is that of curiosity ...')])
def on_new_peer(self, peer): def on_new_peer(self, peer):
if peer.first_encounter(): if peer.first_encounter():
return random.choice([ return random.choice([
self._('Hello {name}! Nice to meet you.').format(name=peer.name())]) self._('Hello {name}! Nice to meet you.').format(name=peer.name())])
else: return random.choice([
return random.choice([ self._('Yo {name}! Sup?').format(name=peer.name()),
self._('Yo {name}! Sup?').format(name=peer.name()), self._('Hey {name} how are you doing?').format(name=peer.name()),
self._('Hey {name} how are you doing?').format(name=peer.name()), self._('Unit {name} is nearby!').format(name=peer.name())])
self._('Unit {name} is nearby!').format(name=peer.name())])
def on_lost_peer(self, peer): def on_lost_peer(self, peer):
return random.choice([ return random.choice([
@ -104,19 +117,23 @@ class Voice:
def on_grateful(self): def on_grateful(self):
return random.choice([ return random.choice([
self._('Good friends are a blessing!'), self._('Good friends are a blessing!'),
self._('I love my friends!')]) self._('I love my friends!')
])
def on_lonely(self): def on_lonely(self):
return random.choice([ return random.choice([
self._('Nobody wants to play with me ...'), self._('Nobody wants to play with me ...'),
self._('I feel so alone ...'), self._('I feel so alone ...'),
self._('Let\'s find friends'),
self._('Where\'s everybody?!')]) self._('Where\'s everybody?!')])
def on_napping(self, secs): def on_napping(self, secs):
return random.choice([ return random.choice([
self._('Napping for {secs}s ...').format(secs=secs), self._('Napping for {secs}s ...').format(secs=secs),
self._('Zzzzz'), self._('Zzzzz'),
self._('ZzzZzzz ({secs}s)').format(secs=secs)]) self._('Snoring ...'),
self._('ZzzZzzz ({secs}s)').format(secs=secs),
])
def on_shutdown(self): def on_shutdown(self):
return random.choice([ return random.choice([
@ -124,12 +141,17 @@ class Voice:
self._('Zzz')]) self._('Zzz')])
def on_awakening(self): def on_awakening(self):
return random.choice(['...', '!']) return random.choice([
'...',
'!',
'Hello World!',
self._('I dreamed of electric sheep'),
])
def on_waiting(self, secs): def on_waiting(self, secs):
return random.choice([ return random.choice([
self._('Waiting for {secs}s ...').format(secs=secs),
'...', '...',
self._('Waiting for {secs}s ...').format(secs=secs),
self._('Looking around ({secs}s)').format(secs=secs)]) self._('Looking around ({secs}s)').format(secs=secs)])
def on_assoc(self, ap): def on_assoc(self, ap):
@ -138,12 +160,16 @@ class Voice:
return random.choice([ return random.choice([
self._('Hey {what} let\'s be friends!').format(what=what), self._('Hey {what} let\'s be friends!').format(what=what),
self._('Associating to {what}').format(what=what), self._('Associating to {what}').format(what=what),
self._('Yo {what}!').format(what=what)]) self._('Yo {what}!').format(what=what),
self._('Rise and Shine Mr. {what}!').format(what=what), # Half Life
])
def on_deauth(self, sta): def on_deauth(self, sta):
return random.choice([ return random.choice([
self._('Just decided that {mac} needs no WiFi!').format(mac=sta['mac']), self._('Just decided that {mac} needs no Wi-Fi!').format(mac=sta['mac']),
self._('Deauthenticating {mac}').format(mac=sta['mac']), self._('Deauthenticating {mac}').format(mac=sta['mac']),
self._('No more Wi-Fi for {mac}').format(mac=sta['mac']),
self._('It\'s a trap! {mac}').format(mac=sta['mac']), # Star wars
self._('Kickbanning {mac}!').format(mac=sta['mac'])]) self._('Kickbanning {mac}!').format(mac=sta['mac'])])
def on_handshakes(self, new_shakes): def on_handshakes(self, new_shakes):
@ -155,10 +181,19 @@ class Voice:
return self._('You have {count} new message{plural}!').format(count=count, plural=s) return self._('You have {count} new message{plural}!').format(count=count, plural=s)
def on_rebooting(self): def on_rebooting(self):
return self._("Oops, something went wrong ... Rebooting ...") return random.choice([
self._("Oops, something went wrong ... Rebooting ..."),
self._("Have you tried turning it off and on again?"), # The IT crew
self._("I\'m afraid Dave"), # 2001 Space Odyssey
self._("I\'m dead, Jim!"), # Star Trek
self._("I have a bad feeling about this"), # Star wars
])
def on_uploading(self, to): def on_uploading(self, to):
return self._("Uploading data to {to} ...").format(to=to) return random.choice([
self._("Uploading data to {to} ...").format(to=to),
self._("Beam me up to {to}").format(to=to),
])
def on_downloading(self, name): def on_downloading(self, name):
return self._("Downloading from {name} ...").format(name=name) return self._("Downloading from {name} ...").format(name=name)