This commit is contained in:
hmax42
2019-12-08 16:25:23 +01:00
64 changed files with 29189 additions and 263 deletions

View File

@ -7,6 +7,7 @@ import platform
import shutil
import glob
import pkg_resources
from threading import Lock
import pwnagotchi
import pwnagotchi.plugins as plugins
@ -150,6 +151,7 @@ class AutoUpdate(plugins.Plugin):
def __init__(self):
self.ready = False
self.status = StatusFile('/root/.auto-update')
self.lock = Lock()
def on_loaded(self):
if 'interval' not in self.options or ('interval' in self.options and self.options['interval'] is None):
@ -159,60 +161,61 @@ class AutoUpdate(plugins.Plugin):
logging.info("[update] plugin loaded.")
def on_internet_available(self, agent):
logging.debug("[update] internet connectivity is available (ready %s)" % self.ready)
with self.lock:
logging.debug("[update] internet connectivity is available (ready %s)" % self.ready)
if not self.ready:
return
if not self.ready:
return
if self.status.newer_then_hours(self.options['interval']):
logging.debug("[update] last check happened less than %d hours ago" % self.options['interval'])
return
if self.status.newer_then_hours(self.options['interval']):
logging.debug("[update] last check happened less than %d hours ago" % self.options['interval'])
return
logging.info("[update] checking for updates ...")
logging.info("[update] checking for updates ...")
display = agent.view()
prev_status = display.get('status')
display = agent.view()
prev_status = display.get('status')
try:
display.update(force=True, new_data={'status': 'Checking for updates ...'})
try:
display.update(force=True, new_data={'status': 'Checking for updates ...'})
to_install = []
to_check = [
('bettercap/bettercap', parse_version('bettercap -version'), True, 'bettercap'),
('evilsocket/pwngrid', parse_version('pwngrid -version'), True, 'pwngrid-peer'),
('evilsocket/pwnagotchi', pwnagotchi.version, False, 'pwnagotchi')
]
to_install = []
to_check = [
('bettercap/bettercap', parse_version('bettercap -version'), True, 'bettercap'),
('evilsocket/pwngrid', parse_version('pwngrid -version'), True, 'pwngrid-peer'),
('evilsocket/pwnagotchi', pwnagotchi.version, False, 'pwnagotchi')
]
for repo, local_version, is_native, svc_name in to_check:
info = check(local_version, repo, is_native)
if info['url'] is not None:
logging.warning(
"update for %s available (local version is '%s'): %s" % (
repo, info['current'], info['url']))
info['service'] = svc_name
to_install.append(info)
for repo, local_version, is_native, svc_name in to_check:
info = check(local_version, repo, is_native)
if info['url'] is not None:
logging.warning(
"update for %s available (local version is '%s'): %s" % (
repo, info['current'], info['url']))
info['service'] = svc_name
to_install.append(info)
num_updates = len(to_install)
num_installed = 0
num_updates = len(to_install)
num_installed = 0
if num_updates > 0:
if self.options['install']:
for update in to_install:
plugins.on('updating')
if install(display, update):
num_installed += 1
else:
prev_status = '%d new update%c available!' % (num_updates, 's' if num_updates > 1 else '')
if num_updates > 0:
if self.options['install']:
for update in to_install:
plugins.on('updating')
if install(display, update):
num_installed += 1
else:
prev_status = '%d new update%c available!' % (num_updates, 's' if num_updates > 1 else '')
logging.info("[update] done")
logging.info("[update] done")
self.status.update()
self.status.update()
if num_installed > 0:
display.update(force=True, new_data={'status': 'Rebooting ...'})
pwnagotchi.reboot()
if num_installed > 0:
display.update(force=True, new_data={'status': 'Rebooting ...'})
pwnagotchi.reboot()
except Exception as e:
logging.error("[update] %s" % e)
except Exception as e:
logging.error("[update] %s" % e)
display.update(force=True, new_data={'status': prev_status if prev_status is not None else ''})
display.update(force=True, new_data={'status': prev_status if prev_status is not None else ''})

View File

@ -2,6 +2,7 @@ import logging
import os
import subprocess
import time
from threading import Lock
import dbus
@ -426,6 +427,7 @@ class BTTether(plugins.Plugin):
self.ready = False
self.options = dict()
self.devices = dict()
self.lock = Lock()
def on_loaded(self):
# new config
@ -466,114 +468,116 @@ class BTTether(plugins.Plugin):
logging.info("BT-TETHER: Successfully loaded ...")
self.ready = True
def on_unload(self):
self.ui.remove_element('bluetooth')
def on_unload(self, ui):
with ui._lock:
ui.remove_element('bluetooth')
def on_ui_setup(self, ui):
self.ui = ui
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0),
label_font=fonts.Bold, text_font=fonts.Medium))
with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0),
label_font=fonts.Bold, text_font=fonts.Medium))
def on_ui_update(self, ui):
if not self.ready:
return
devices_to_try = list()
connected_priorities = list()
any_device_connected = False # if this is true, last status on screen should be C
with self.lock:
devices_to_try = list()
connected_priorities = list()
any_device_connected = False # if this is true, last status on screen should be C
for _, device in self.devices.items():
if device.connected():
connected_priorities.append(device.priority)
any_device_connected = True
continue
for _, device in self.devices.items():
if device.connected():
connected_priorities.append(device.priority)
any_device_connected = True
continue
if not device.max_tries or (device.max_tries > device.tries):
if not device.status.newer_then_minutes(device.interval):
devices_to_try.append(device)
device.status.update()
device.tries += 1
if not device.max_tries or (device.max_tries > device.tries):
if not device.status.newer_then_minutes(device.interval):
devices_to_try.append(device)
device.status.update()
device.tries += 1
sorted_devices = sorted(devices_to_try, key=lambda x: x.search_order)
sorted_devices = sorted(devices_to_try, key=lambda x: x.search_order)
for device in sorted_devices:
bt = BTNap(device.mac)
for device in sorted_devices:
bt = BTNap(device.mac)
try:
logging.debug('BT-TETHER: Search %d secs for %s ...', device.scantime, device.name)
dev_remote = bt.wait_for_device(timeout=device.scantime)
if dev_remote is None:
logging.debug('BT-TETHER: Could not find %s, try again in %d minutes.', device.name, device.interval)
try:
logging.debug('BT-TETHER: Search %d secs for %s ...', device.scantime, device.name)
dev_remote = bt.wait_for_device(timeout=device.scantime)
if dev_remote is None:
logging.debug('BT-TETHER: Could not find %s, try again in %d minutes.', device.name, device.interval)
ui.set('bluetooth', 'NF')
continue
except Exception as bt_ex:
logging.error(bt_ex)
ui.set('bluetooth', 'NF')
continue
except Exception as bt_ex:
logging.error(bt_ex)
ui.set('bluetooth', 'NF')
continue
paired = bt.is_paired()
if not paired:
if BTNap.pair(dev_remote):
logging.debug('BT-TETHER: Paired with %s.', device.name)
paired = bt.is_paired()
if not paired:
if BTNap.pair(dev_remote):
logging.debug('BT-TETHER: Paired with %s.', device.name)
else:
logging.debug('BT-TETHER: Pairing with %s failed ...', device.name)
ui.set('bluetooth', 'PE')
continue
else:
logging.debug('BT-TETHER: Pairing with %s failed ...', device.name)
ui.set('bluetooth', 'PE')
continue
else:
logging.debug('BT-TETHER: Already paired.')
logging.debug('BT-TETHER: Already paired.')
logging.debug('BT-TETHER: Try to create nap connection with %s ...', device.name)
device.network, success = BTNap.nap(dev_remote)
interface = None
logging.debug('BT-TETHER: Try to create nap connection with %s ...', device.name)
device.network, success = BTNap.nap(dev_remote)
interface = None
if success:
try:
interface = device.interface()
except Exception:
if success:
try:
interface = device.interface()
except Exception:
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
continue
if interface is None:
ui.set('bluetooth', 'BE')
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
continue
logging.debug('BT-TETHER: Created interface (%s)', interface)
ui.set('bluetooth', 'C')
any_device_connected = True
device.tries = 0 # reset tries
else:
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
ui.set('bluetooth', 'NF')
continue
if interface is None:
ui.set('bluetooth', 'BE')
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
addr = f"{device.ip}/{device.netmask}"
if device.gateway:
gateway = device.gateway
else:
gateway = ".".join(device.ip.split('.')[:-1] + ['1'])
wrapped_interface = IfaceWrapper(interface)
logging.debug('BT-TETHER: Add ip to %s', interface)
if not wrapped_interface.set_addr(addr):
ui.set('bluetooth', 'AE')
logging.debug("BT-TETHER: Could not add ip to %s", interface)
continue
logging.debug('BT-TETHER: Created interface (%s)', interface)
if device.share_internet:
if not connected_priorities or device.priority > max(connected_priorities):
logging.debug('BT-TETHER: Set default route to %s via %s', gateway, interface)
IfaceWrapper.set_route(gateway, interface)
connected_priorities.append(device.priority)
logging.debug('BT-TETHER: Change resolv.conf if necessary ...')
with open('/etc/resolv.conf', 'r+') as resolv:
nameserver = resolv.read()
if 'nameserver 9.9.9.9' not in nameserver:
logging.debug('BT-TETHER: Added nameserver')
resolv.seek(0)
resolv.write(nameserver + 'nameserver 9.9.9.9\n')
if any_device_connected:
ui.set('bluetooth', 'C')
any_device_connected = True
device.tries = 0 # reset tries
else:
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
ui.set('bluetooth', 'NF')
continue
addr = f"{device.ip}/{device.netmask}"
if device.gateway:
gateway = device.gateway
else:
gateway = ".".join(device.ip.split('.')[:-1] + ['1'])
wrapped_interface = IfaceWrapper(interface)
logging.debug('BT-TETHER: Add ip to %s', interface)
if not wrapped_interface.set_addr(addr):
ui.set('bluetooth', 'AE')
logging.debug("BT-TETHER: Could not add ip to %s", interface)
continue
if device.share_internet:
if not connected_priorities or device.priority > max(connected_priorities):
logging.debug('BT-TETHER: Set default route to %s via %s', gateway, interface)
IfaceWrapper.set_route(gateway, interface)
connected_priorities.append(device.priority)
logging.debug('BT-TETHER: Change resolv.conf if necessary ...')
with open('/etc/resolv.conf', 'r+') as resolv:
nameserver = resolv.read()
if 'nameserver 9.9.9.9' not in nameserver:
logging.debug('BT-TETHER: Added nameserver')
resolv.seek(0)
resolv.write(nameserver + 'nameserver 9.9.9.9\n')
if any_device_connected:
ui.set('bluetooth', 'C')

View File

@ -26,7 +26,7 @@ class Example(plugins.Plugin):
logging.warning("WARNING: this plugin should be disabled! options = " % self.options)
# called before the plugin is unloaded
def on_unload(self):
def on_unload(self, ui):
pass
# called hen there's internet connectivity

View File

@ -63,6 +63,11 @@ class GPS(plugins.Plugin):
lat_pos = (112, 30)
lon_pos = (112, 49)
alt_pos = (87, 63)
elif ui.is_waveshare144lcd():
# guessed values, add tested ones if you can
lat_pos = (67, 73)
lon_pos = (62, 83)
alt_pos = (67, 93)
else:
# guessed values, add tested ones if you can
lat_pos = (127, 51)

View File

@ -47,6 +47,9 @@ class MemTemp(plugins.Plugin):
elif ui.is_waveshare_v1():
h_pos = (170, 80)
v_pos = (170, 61)
elif ui.is_waveshare144lcd():
h_pos = (53, 77)
v_pos = (78, 67)
elif ui.is_inky():
h_pos = (140, 68)
v_pos = (165, 54)

View File

@ -1,6 +1,7 @@
import logging
import json
import os
import threading
import requests
import time
import pwnagotchi.plugins as plugins
@ -11,7 +12,7 @@ MOZILLA_API_URL = 'https://location.services.mozilla.com/v1/geolocate?key={api}'
class NetPos(plugins.Plugin):
__author__ = 'zenzen san'
__version__ = '2.0.1'
__version__ = '2.0.2'
__license__ = 'GPL3'
__description__ = """Saves a json file with the access points with more signal
whenever a handshake is captured.
@ -22,6 +23,7 @@ class NetPos(plugins.Plugin):
self.report = StatusFile('/root/.net_pos_saved', data_format='json')
self.skip = list()
self.ready = False
self.lock = threading.Lock()
def on_loaded(self):
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
@ -45,60 +47,64 @@ class NetPos(plugins.Plugin):
saved_file.write(x + "\n")
def on_internet_available(self, agent):
if self.ready:
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
with self.lock:
if self.ready:
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_np_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.net-pos.json')]
new_np_files = set(all_np_files) - set(reported) - set(self.skip)
all_files = os.listdir(handshake_dir)
all_np_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.net-pos.json')]
new_np_files = set(all_np_files) - set(reported) - set(self.skip)
if new_np_files:
logging.info("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files))
display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...")
display.update(force=True)
for idx, np_file in enumerate(new_np_files):
if new_np_files:
logging.debug("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files))
display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...")
display.update(force=True)
for idx, np_file in enumerate(new_np_files):
geo_file = np_file.replace('.net-pos.json', '.geo.json')
if os.path.exists(geo_file):
# got already the position
reported.append(np_file)
self.report.update(data={'reported': reported})
continue
try:
geo_data = self._get_geo_data(np_file) # returns json obj
except requests.exceptions.RequestException as req_e:
logging.error("NET-POS: %s - RequestException: %s", np_file, req_e)
self.skip += np_file
continue
except json.JSONDecodeError as js_e:
logging.error("NET-POS: %s - JSONDecodeError: %s, removing it...", np_file, js_e)
os.remove(np_file)
continue
except OSError as os_e:
logging.error("NET-POS: %s - OSError: %s", np_file, os_e)
self.skip += np_file
continue
with open(geo_file, 'w+t') as sf:
json.dump(geo_data, sf)
geo_file = np_file.replace('.net-pos.json', '.geo.json')
if os.path.exists(geo_file):
# got already the position
reported.append(np_file)
self.report.update(data={'reported': reported})
continue
try:
geo_data = self._get_geo_data(np_file) # returns json obj
except requests.exceptions.RequestException as req_e:
logging.error("NET-POS: %s - RequestException: %s", np_file, req_e)
self.skip += np_file
continue
except json.JSONDecodeError as js_e:
logging.error("NET-POS: %s - JSONDecodeError: %s", np_file, js_e)
self.skip += np_file
continue
except OSError as os_e:
logging.error("NET-POS: %s - OSError: %s", np_file, os_e)
self.skip += np_file
continue
with open(geo_file, 'w+t') as sf:
json.dump(geo_data, sf)
reported.append(np_file)
self.report.update(data={'reported': reported})
display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})")
display.update(force=True)
display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})")
display.update(force=True)
def on_handshake(self, agent, filename, access_point, client_station):
netpos = self._get_netpos(agent)
if not netpos['wifiAccessPoints']:
return
netpos["ts"] = int("%.0f" % time.time())
netpos_filename = filename.replace('.pcap', '.net-pos.json')
logging.info("NET-POS: Saving net-location to %s", netpos_filename)
logging.debug("NET-POS: Saving net-location to %s", netpos_filename)
try:
with open(netpos_filename, 'w+t') as net_pos_file:
@ -106,6 +112,7 @@ class NetPos(plugins.Plugin):
except OSError as os_e:
logging.error("NET-POS: %s", os_e)
def _get_netpos(self, agent):
aps = agent.get_access_points()
netpos = dict()

View File

@ -2,20 +2,27 @@ import os
import logging
import re
import requests
from threading import Lock
from pwnagotchi.utils import StatusFile
import pwnagotchi.plugins as plugins
from json.decoder import JSONDecodeError
class OnlineHashCrack(plugins.Plugin):
__author__ = '33197631+dadav@users.noreply.github.com'
__version__ = '2.0.0'
__version__ = '2.0.1'
__license__ = 'GPL3'
__description__ = 'This plugin automatically uploads handshakes to https://onlinehashcrack.com'
def __init__(self):
self.ready = False
self.report = StatusFile('/root/.ohc_uploads', data_format='json')
try:
self.report = StatusFile('/root/.ohc_uploads', data_format='json')
except JSONDecodeError as json_err:
os.remove('/root/.ohc_uploads')
self.report = StatusFile('/root/.ohc_uploads', data_format='json')
self.skip = list()
self.lock = Lock()
def on_loaded(self):
"""
@ -68,38 +75,40 @@ class OnlineHashCrack(plugins.Plugin):
"""
Called in manual mode when there's internet connectivity
"""
if self.ready:
display = agent.view()
config = agent.config()
reported = self.report.data_field_or('reported', default=list())
with self.lock:
if self.ready:
display = agent.view()
config = agent.config()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
handshake_filenames = os.listdir(handshake_dir)
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if
filename.endswith('.pcap')]
handshake_dir = config['bettercap']['handshakes']
handshake_filenames = os.listdir(handshake_dir)
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if
filename.endswith('.pcap')]
# pull out whitelisted APs
handshake_paths = filter(lambda path: self._filter_handshake_file(path), handshake_paths)
# pull out whitelisted APs
handshake_paths = filter(lambda path: self._filter_handshake_file(path), handshake_paths)
handshake_new = set(handshake_paths) - set(reported) - set(self.skip)
handshake_new = set(handshake_paths) - set(reported) - set(self.skip)
if handshake_new:
logging.info("OHC: Internet connectivity detected. Uploading new handshakes to onelinehashcrack.com")
if handshake_new:
logging.info("OHC: Internet connectivity detected. Uploading new handshakes to onelinehashcrack.com")
for idx, handshake in enumerate(handshake_new):
display.set('status',
f"Uploading handshake to onlinehashcrack.com ({idx + 1}/{len(handshake_new)})")
display.update(force=True)
try:
self._upload_to_ohc(handshake)
reported.append(handshake)
self.report.update(data={'reported': reported})
logging.info(f"OHC: Successfully uploaded {handshake}")
except requests.exceptions.RequestException as req_e:
self.skip.append(handshake)
logging.error("OHC: %s", req_e)
continue
except OSError as os_e:
self.skip.append(handshake)
logging.error("OHC: %s", os_e)
continue
for idx, handshake in enumerate(handshake_new):
display.set('status',
f"Uploading handshake to onlinehashcrack.com ({idx + 1}/{len(handshake_new)})")
display.update(force=True)
try:
self._upload_to_ohc(handshake)
if handshake not in reported:
reported.append(handshake)
self.report.update(data={'reported': reported})
logging.info(f"OHC: Successfully uploaded {handshake}")
except requests.exceptions.RequestException as req_e:
self.skip.append(handshake)
logging.error("OHC: %s", req_e)
continue
except OSError as os_e:
self.skip.append(handshake)
logging.error("OHC: %s", os_e)
continue

View File

@ -0,0 +1,195 @@
import os
import logging
import threading
from datetime import datetime
from pwnagotchi import plugins
from flask import render_template_string
from flask import jsonify
TEMPLATE = """
{% extends "base.html" %}
{% set active_page = "plugins" %}
{% block title %}
Session stats
{% endblock %}
{% block styles %}
<link rel="stylesheet" href="/css/jquery.jqplot.min.css"/>
<link rel="stylesheet" href="/css/jquery.jqplot.css"/>
{% endblock %}
{% block scripts %}
<script type="text/javascript" src="/js/jquery.jqplot.min.js"></script>
<script type="text/javascript" src="/js/jquery.jqplot.js"></script>
<script type="text/javascript" src="/js/plugins/jqplot.mobile.js"></script>
<script type="text/javascript" src="/js/plugins/jqplot.json2.js"></script>
<script type="text/javascript" src="/js/plugins/jqplot.dateAxisRenderer.js"></script>
<script type="text/javascript" src="/js/plugins/jqplot.highlighter.js"></script>
<script type="text/javascript" src="/js/plugins/jqplot.cursor.js"></script>
<script type="text/javascript" src="/js/plugins/jqplot.enhancedLegendRenderer.js"></script>
{% endblock %}
{% block script %}
$(document).ready(function(){
var ajaxDataRenderer = function(url, plot, options) {
var ret = null;
$.ajax({
async: false,
url: url,
dataType:"json",
success: function(data) {
ret = data;
}
});
return ret;
};
function loadData(url, elm, title) {
var data = ajaxDataRenderer(url);
var plot_os = $.jqplot(elm, data.values,{
title: title,
stackSeries: true,
seriesDefaults: {
showMarker: false,
fill: true,
fillAndStroke: true
},
legend: {
show: true,
renderer: $.jqplot.EnhancedLegendRenderer,
placement: 'outsideGrid',
labels: data.labels,
location: 's',
rendererOptions: {
numberRows: '2',
},
rowSpacing: '0px'
},
axes:{
xaxis:{
renderer:$.jqplot.DateAxisRenderer,
tickOptions:{formatString:'%H:%M:%S'}
},
yaxis:{
min: 0,
tickOptions:{formatString:'%.2f'}
}
},
highlighter: {
show: true,
sizeAdjust: 7.5
},
cursor:{
show: true,
tooltipLocation:'sw'
}
}).replot({
axes:{
xaxis:{
renderer:$.jqplot.DateAxisRenderer,
tickOptions:{formatString:'%H:%M:%S'}
},
yaxis:{
min: 0,
tickOptions:{formatString:'%.2f'}
}
}
});
}
function loadAll() {
loadData('/plugins/session-stats/os', 'chart_os', 'OS')
loadData('/plugins/session-stats/temp', 'chart_temp', 'Temp')
loadData('/plugins/session-stats/nums', 'chart_nums', 'Wifi')
loadData('/plugins/session-stats/duration', 'chart_duration', 'Sleeping')
loadData('/plugins/session-stats/epoch', 'chart_epoch', 'Epochs')
}
loadAll();
setInterval(loadAll, 60000);
});
{% endblock %}
{% block content %}
<div id="chart_os" style="height:400px;width:100%; "></div>
<div id="chart_temp" style="height:400px;width:100%; "></div>
<div id="chart_nums" style="height:400px;width:100%; "></div>
<div id="chart_duration" style="height:400px;width:100%; "></div>
<div id="chart_epoch" style="height:400px;width:100%; "></div>
{% endblock %}
"""
class SessionStats(plugins.Plugin):
__author__ = '33197631+dadav@users.noreply.github.com'
__version__ = '0.1.0'
__license__ = 'GPL3'
__description__ = 'This plugin displays stats of the current session.'
def __init__(self):
self.ready = False
self.lock = threading.Lock()
self.options = dict()
self.stats = dict()
def on_loaded(self):
"""
Gets called when the plugin gets loaded
"""
logging.info("Session-stats plugin loaded.")
self.ready = True
def on_unloaded(self, ui):
pass
def on_epoch(self, agent, epoch, epoch_data):
"""
Save the epoch_data to self.stats
"""
with self.lock:
self.stats[datetime.now().strftime("%H:%M:%S")] = epoch_data
@staticmethod
def extract_key_values(data, subkeys):
result = dict()
result['values'] = list()
result['labels'] = subkeys
for plot_key in subkeys:
v = [ [ts,d[plot_key]] for ts, d in data.items()]
result['values'].append(v)
return result
def on_webhook(self, path, request):
if not path or path == "/":
return render_template_string(TEMPLATE)
if path == "os":
extract_keys = ['cpu_load','mem_usage',]
elif path == "temp":
extract_keys = ['temperature']
elif path == "nums":
extract_keys = [
'missed_interactions',
'num_hops',
'num_peers',
'tot_bond',
'avg_bond',
'num_deauths',
'num_associations',
'num_handshakes',
]
elif path == "duration":
extract_keys = [
'duration_secs',
'slept_for_secs',
]
elif path == "epoch":
extract_keys = [
'blind_for_epochs',
'inactive_for_epochs',
'active_for_epochs',
]
with self.lock:
return jsonify(SessionStats.extract_key_values(self.stats, extract_keys))

View File

@ -1,8 +1,8 @@
import os
import logging
import threading
import requests
from datetime import datetime
from threading import Lock
from pwnagotchi.utils import StatusFile
from pwnagotchi import plugins
from json.decoder import JSONDecodeError
@ -16,7 +16,7 @@ class WpaSec(plugins.Plugin):
def __init__(self):
self.ready = False
self.lock = threading.Lock()
self.lock = Lock()
try:
self.report = StatusFile('/root/.wpa_sec_uploads', data_format='json')
except JSONDecodeError as json_err: