mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
Added couple of plugins, and some other stuff to work on more than 1 raspberry
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com> Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
This commit is contained in:
@ -183,9 +183,9 @@ class AutoUpdate(plugins.Plugin):
|
||||
|
||||
to_install = []
|
||||
to_check = [
|
||||
('bettercap/bettercap', parse_version('bettercap -version'), True, 'bettercap'),
|
||||
('jayofelony/bettercap', parse_version('bettercap -version'), True, 'bettercap'),
|
||||
('evilsocket/pwngrid', parse_version('pwngrid -version'), True, 'pwngrid-peer'),
|
||||
('evilsocket/pwnagotchi', pwnagotchi.__version__, False, 'pwnagotchi')
|
||||
('jayofelony/pwnagotchi', pwnagotchi.__version__, False, 'pwnagotchi')
|
||||
]
|
||||
|
||||
for repo, local_version, is_native, svc_name in to_check:
|
||||
|
182
pwnagotchi/plugins/default/bluetoothsniffer.py
Normal file
182
pwnagotchi/plugins/default/bluetoothsniffer.py
Normal file
@ -0,0 +1,182 @@
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
|
||||
import pwnagotchi
|
||||
import pwnagotchi.agent
|
||||
import pwnagotchi.plugins as plugins
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
from pwnagotchi.ui.components import LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
from datetime import datetime
|
||||
|
||||
class BluetoothSniffer(plugins.Plugin):
|
||||
__author__ = 'diytechtinker'
|
||||
__version__ = '0.1.3'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'A plugin that sniffs Bluetooth devices and saves their MAC addresses, name and counts to a JSON file'
|
||||
|
||||
def __init__(self):
|
||||
# Defining the instance variables
|
||||
self.options = {
|
||||
'timer': 45,
|
||||
'devices_file': '/root/handshakes/bluetooth_devices.json',
|
||||
'count_interval': 86400,
|
||||
'bt_x_coord': 160,
|
||||
'bt_y_coord': 66
|
||||
}
|
||||
self.data = {}
|
||||
self.last_scan_time = 0
|
||||
|
||||
|
||||
def on_loaded(self):
|
||||
logging.info("[BtS] bluetoothsniffer plugin loaded.")
|
||||
logging.info("[BtS] Bluetooth devices file location: %s", self.options['devices_file'])
|
||||
# Creating the device file path if it does not exist
|
||||
if not os.path.exists(os.path.dirname(self.options['devices_file'])):
|
||||
os.makedirs(os.path.dirname(self.options['devices_file']))
|
||||
|
||||
# Creating the device file if it does not exist
|
||||
if not os.path.exists(self.options['devices_file']):
|
||||
with open(self.options['devices_file'], 'w') as f:
|
||||
json.dump({}, f)
|
||||
|
||||
# Loading the data from the device file
|
||||
with open(self.options['devices_file'], 'r') as f:
|
||||
self.data = json.load(f)
|
||||
|
||||
|
||||
def on_ui_setup(self, ui):
|
||||
ui.add_element('BtS', LabeledValue(color=BLACK,
|
||||
label='BT SNFD',
|
||||
value=" ",
|
||||
position=(int(self.options["bt_x_coord"]),
|
||||
int(self.options["bt_y_coord"])),
|
||||
label_font=fonts.Small,
|
||||
text_font=fonts.Small))
|
||||
|
||||
def on_unload(self, ui):
|
||||
with ui._lock:
|
||||
ui.remove_element('BtS')
|
||||
|
||||
|
||||
def on_ui_update(self, ui):
|
||||
current_time = time.time()
|
||||
# Checking the time elapsed since last scan
|
||||
if current_time - self.last_scan_time >= self.options['timer']:
|
||||
self.last_scan_time = current_time
|
||||
#logging.info("[BtS] Bluetooth sniffed: %s", str(self.bt_sniff_info()))
|
||||
ui.set('BtS', str(self.bt_sniff_info()))
|
||||
self.scan()
|
||||
|
||||
|
||||
# Method for scanning the nearby bluetooth devices
|
||||
def scan(self):
|
||||
logging.info("[BtS] Scanning for bluetooths...")
|
||||
current_time = time.time()
|
||||
changed = False
|
||||
|
||||
# Running the system command hcitool to scan nearby bluetooth devices
|
||||
cmd_inq = "hcitool inq --flush"
|
||||
try:
|
||||
inq_output = subprocess.check_output(cmd_inq.split())
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error("[BtS] Error running command: %s", e)
|
||||
|
||||
for line in inq_output.splitlines()[1:]:
|
||||
fields = line.split()
|
||||
mac_address = fields[0].decode()
|
||||
for i in range(len(fields)):
|
||||
if fields[i].decode() == "class:" and i+1 < len(fields):
|
||||
device_class = fields[i+1].decode()
|
||||
logging.info("[BtS] Found bluetooth %s", mac_address)
|
||||
|
||||
# Update the count, first_seen, and last_seen time of the device
|
||||
if mac_address in self.data and len(self.data) > 0:
|
||||
if 'Unknown' == self.data[mac_address]['name']:
|
||||
name = self.get_device_name(mac_address)
|
||||
self.data[mac_address]['name'] = name
|
||||
self.data[mac_address]['new_info'] = 2
|
||||
logging.info("[BtS] Updated bluetooth name: %s", name)
|
||||
changed = True
|
||||
|
||||
if 'Unknown' == self.data[mac_address]['manufacturer']:
|
||||
manufacturer = self.get_device_manufacturer(mac_address)
|
||||
self.data[mac_address]['manufacturer'] = manufacturer
|
||||
self.data[mac_address]['new_info'] = 2
|
||||
logging.info("[BtS] Updated bluetooth manufacturer: %s", manufacturer)
|
||||
changed = True
|
||||
|
||||
if device_class != self.data[mac_address]['class']:
|
||||
self.data[mac_address]['class'] = device_class
|
||||
self.data[mac_address]['new_info'] = 2
|
||||
logging.info("[BtS] Updated bluetooth class: %s", device_class)
|
||||
changed = True
|
||||
|
||||
last_seen_time = int(datetime.strptime(self.data[mac_address]['last_seen'], '%H:%M:%S %d-%m-%Y').timestamp())
|
||||
if current_time - last_seen_time >= self.options['count_interval']:
|
||||
self.data[mac_address]['count'] += 1
|
||||
self.data[mac_address]['last_seen'] = time.strftime('%H:%M:%S %d-%m-%Y', time.localtime(current_time))
|
||||
self.data[mac_address]['new_info'] = 2
|
||||
logging.info("[BtS] Updated bluetooth count.")
|
||||
changed = True
|
||||
else:
|
||||
name = self.get_device_name(mac_address)
|
||||
manufacturer = self.get_device_manufacturer(mac_address)
|
||||
self.data[mac_address] = {'name': name, 'count': 1, 'class': device_class, 'manufacturer': manufacturer, 'first_seen': time.strftime('%H:%M:%S %d-%m-%Y', time.localtime(current_time)), 'last_seen': time.strftime('%H:%M:%S %d-%m-%Y', time.localtime(current_time)), 'new_info': True}
|
||||
logging.info("[BtS] Added new bluetooth device %s with MAC: %s", name, mac_address)
|
||||
changed = True
|
||||
|
||||
# Save the updated devices to the JSON file
|
||||
if changed:
|
||||
with open(self.options['devices_file'], 'w') as f:
|
||||
logging.info("[BtS] Saving bluetooths %s into json.", name)
|
||||
json.dump(self.data, f)
|
||||
display.set('status', 'Bluetooth sniffed and stored!')
|
||||
display.update(force=True)
|
||||
|
||||
# Method to get the device name
|
||||
def get_device_name(self, mac_address):
|
||||
logging.info("[BtS] Trying to get name for %s", mac_address)
|
||||
name = 'Unknown'
|
||||
hcitool_process = subprocess.Popen(["hcitool", "name", mac_address], stdout=subprocess.PIPE)
|
||||
output, error = hcitool_process.communicate()
|
||||
if output.decode().strip() != '':
|
||||
name = output.decode().strip()
|
||||
logging.info("[BtS] Got name %s for %s", name, mac_address)
|
||||
return name
|
||||
|
||||
# Method to get the device manufacturer
|
||||
def get_device_manufacturer(self, mac_address):
|
||||
manufacturer = 'Unknown'
|
||||
cmd_info = f"hcitool info {mac_address} | grep 'Manufacturer:' | cut -d ' ' -f 2-"
|
||||
try:
|
||||
logging.info("[BtS] Trying to get manufacturer for %s", mac_address)
|
||||
start_time = time.time()
|
||||
process = subprocess.Popen(cmd_info, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
while process.poll() is None:
|
||||
time.sleep(0.1)
|
||||
if time.time() - start_time > 7:
|
||||
logging.info("[BtS] Timeout while trying to get manufacturer for %s", mac_address)
|
||||
process.kill()
|
||||
return manufacturer
|
||||
output, error = process.communicate(timeout=1)
|
||||
if output.decode().strip() != '':
|
||||
manufacturer = output.decode().strip()
|
||||
logging.info("[BtS] Got manufacturer %s for %s", manufacturer, mac_address)
|
||||
except Exception as e:
|
||||
logging.info("[BtS] Error while trying to get manufacturer for %s: %s", mac_address, str(e))
|
||||
return manufacturer
|
||||
|
||||
def bt_sniff_info(self):
|
||||
num_devices = len(self.data)
|
||||
if num_devices > 0:
|
||||
num_unknown = sum(1 for device in self.data.values() if device['name'] == 'Unknown' or device['manufacturer'] == 'Unknown')
|
||||
num_known = num_devices - num_unknown
|
||||
return_text = "%s|%s" % (num_devices, num_known)
|
||||
else:
|
||||
return_text = "0|0"
|
||||
return return_text
|
336
pwnagotchi/plugins/default/exp.py
Normal file
336
pwnagotchi/plugins/default/exp.py
Normal file
@ -0,0 +1,336 @@
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import json
|
||||
|
||||
import pwnagotchi
|
||||
import pwnagotchi.agent
|
||||
import pwnagotchi.plugins as plugins
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
from pwnagotchi.ui.components import LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
|
||||
# Static Variables
|
||||
MULTIPLIER_ASSOCIATION = 1
|
||||
MULTIPLIER_DEAUTH = 2
|
||||
MULTIPLIER_HANDSHAKE = 3
|
||||
MULTIPLIER_AI_BEST_REWARD = 5
|
||||
TAG = "[EXP Plugin]"
|
||||
FACE_LEVELUP = '(≧◡◡≦)'
|
||||
BAR_ERROR = "| error |"
|
||||
FILE_SAVE = "exp_stats"
|
||||
FILE_SAVE_LEGACY = "exp"
|
||||
JSON_KEY_LEVEL = "level"
|
||||
JSON_KEY_EXP = "exp"
|
||||
JSON_KEY_EXP_TOT = "exp_tot"
|
||||
|
||||
|
||||
class EXP(plugins.Plugin):
|
||||
__author__ = 'GaelicThunder'
|
||||
__version__ = '1.0.5'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'Get exp every time a handshake get captured.'
|
||||
|
||||
# Attention number masking
|
||||
def LogInfo(self, text):
|
||||
logging.info(TAG + " " + text)
|
||||
|
||||
# Attention number masking
|
||||
def LogDebug(self, text):
|
||||
logging.debug(TAG + " " + text)
|
||||
|
||||
def __init__(self):
|
||||
self.percent = 0
|
||||
self.calculateInitialXP = False
|
||||
self.exp = 0
|
||||
self.lv = 1
|
||||
self.exp_tot = 0
|
||||
# Sets the file type I recommend json
|
||||
self.save_file_mode = self.save_file_modes("json")
|
||||
self.save_file = self.getSaveFileName(self.save_file_mode)
|
||||
# Migrate from old save system
|
||||
self.migrateLegacySave()
|
||||
|
||||
# Create save file
|
||||
if not os.path.exists(self.save_file):
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
else:
|
||||
try:
|
||||
# Try loading
|
||||
self.Load(self.save_file, self.save_file_mode)
|
||||
except:
|
||||
# Likely throws an exception if json file is corrupted, so we need to calculate from scratch
|
||||
self.calculateInitialXP = True
|
||||
|
||||
# No previous data, try get it
|
||||
if self.lv == 1 and self.exp == 0:
|
||||
self.calculateInitialXP = True
|
||||
if self.exp_tot == 0:
|
||||
self.LogInfo("Need to calculate Total Exp")
|
||||
self.exp_tot = self.calcActualSum(self.lv, self.exp)
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
|
||||
self.expneeded = self.calcExpNeeded(self.lv)
|
||||
|
||||
def on_loaded(self):
|
||||
# logging.info("Exp plugin loaded for %s" % self.options['device'])
|
||||
self.LogInfo("Plugin Loaded")
|
||||
|
||||
def save_file_modes(self, argument):
|
||||
switcher = {
|
||||
"txt": 0,
|
||||
"json": 1,
|
||||
}
|
||||
return switcher.get(argument, 0)
|
||||
|
||||
def Save(self, file, save_file_mode):
|
||||
self.LogDebug('Saving Exp')
|
||||
if save_file_mode == 0:
|
||||
self.saveToTxtFile(file)
|
||||
if save_file_mode == 1:
|
||||
self.saveToJsonFile(file)
|
||||
|
||||
def saveToTxtFile(self, file):
|
||||
outfile = open(file, 'w')
|
||||
print(self.exp, file=outfile)
|
||||
print(self.lv, file=outfile)
|
||||
print(self.exp_tot, file=outfile)
|
||||
outfile.close()
|
||||
|
||||
def loadFromTxtFile(self, file):
|
||||
if os.path.exists(file):
|
||||
outfile = open(file, 'r+')
|
||||
lines = outfile.readlines()
|
||||
linecounter = 1
|
||||
for line in lines:
|
||||
if linecounter == 1:
|
||||
self.exp = int(line)
|
||||
elif linecounter == 2:
|
||||
self.lv == int(line)
|
||||
elif linecounter == 3:
|
||||
self.exp_tot == int(line)
|
||||
linecounter += 1
|
||||
outfile.close()
|
||||
|
||||
def saveToJsonFile(self, file):
|
||||
data = {
|
||||
JSON_KEY_LEVEL: self.lv,
|
||||
JSON_KEY_EXP: self.exp,
|
||||
JSON_KEY_EXP_TOT: self.exp_tot
|
||||
}
|
||||
|
||||
with open(file, 'w') as f:
|
||||
f.write(json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')))
|
||||
|
||||
def loadFromJsonFile(self, file):
|
||||
# Tot exp is introduced with json, no check needed
|
||||
data = {}
|
||||
with open(file, 'r') as f:
|
||||
data = json.loads(f.read())
|
||||
|
||||
if bool(data):
|
||||
self.lv = data[JSON_KEY_LEVEL]
|
||||
self.exp = data[JSON_KEY_EXP]
|
||||
self.exp_tot = data[JSON_KEY_EXP_TOT]
|
||||
else:
|
||||
self.LogInfo("Empty json")
|
||||
|
||||
# TODO: one day change save file mode to file date
|
||||
def Load(self, file, save_file_mode):
|
||||
self.LogDebug('Loading Exp')
|
||||
if save_file_mode == 0:
|
||||
self.loadFromTxtFile(file)
|
||||
if save_file_mode == 1:
|
||||
self.loadFromJsonFile(file)
|
||||
|
||||
def getSaveFileName(self, save_file_mode):
|
||||
file = os.path.dirname(os.path.realpath(__file__))
|
||||
file = file + "/" + FILE_SAVE
|
||||
if save_file_mode == 0:
|
||||
file = file + ".txt"
|
||||
elif save_file_mode == 1:
|
||||
file = file + ".json"
|
||||
else:
|
||||
# See switcher
|
||||
file = file + ".txt"
|
||||
return file
|
||||
|
||||
def migrateLegacySave(self):
|
||||
legacyFile = os.path.dirname(os.path.realpath(__file__))
|
||||
legacyFile = legacyFile + "/" + FILE_SAVE_LEGACY + ".txt"
|
||||
if os.path.exists(legacyFile):
|
||||
self.loadFromTxtFile(legacyFile)
|
||||
self.LogInfo("Migrating Legacy Save...")
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
os.remove(legacyFile)
|
||||
|
||||
def barString(self, symbols_count, p):
|
||||
if p > 100:
|
||||
return BAR_ERROR
|
||||
length = symbols_count - 2
|
||||
bar_char = '▥'
|
||||
blank_char = ' '
|
||||
bar_length = int(round((length / 100) * p))
|
||||
blank_length = length - bar_length
|
||||
res = '|' + bar_char * bar_length + blank_char * blank_length + '|'
|
||||
return res
|
||||
|
||||
def on_ui_setup(self, ui):
|
||||
ui.add_element('Lv', LabeledValue(color=BLACK, label='Lv', value=0,
|
||||
position=(int(self.options["lvl_x_coord"]),
|
||||
int(self.options["lvl_y_coord"])),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
ui.add_element('Exp', LabeledValue(color=BLACK, label='Exp', value=0,
|
||||
position=(int(self.options["exp_x_coord"]),
|
||||
int(self.options["exp_y_coord"])),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
|
||||
def on_ui_update(self, ui):
|
||||
self.expneeded = self.calcExpNeeded(self.lv)
|
||||
self.percent = int((self.exp / self.expneeded) * 100)
|
||||
symbols_count = int(self.options["bar_symbols_count"])
|
||||
bar = self.barString(symbols_count, self.percent)
|
||||
ui.set('Lv', "%d" % self.lv)
|
||||
ui.set('Exp', "%s" % bar)
|
||||
|
||||
def calcExpNeeded(self, level):
|
||||
# If the pwnagotchi is lvl <1 it causes the keys to be deleted
|
||||
if level == 1:
|
||||
return 5
|
||||
return int((level ** 3) / 2)
|
||||
|
||||
def exp_check(self, agent):
|
||||
self.LogDebug("EXP CHECK")
|
||||
if self.exp >= self.expneeded:
|
||||
self.exp = 1
|
||||
self.lv = self.lv + 1
|
||||
self.expneeded = self.calcExpNeeded(self.lv)
|
||||
self.displayLevelUp(agent)
|
||||
|
||||
def parseSessionStats(self):
|
||||
sum = 0
|
||||
dir = pwnagotchi.config['main']['plugins']['session-stats']['save_directory']
|
||||
# TODO: remove
|
||||
self.LogInfo("Session-Stats dir: " + dir)
|
||||
for filename in os.listdir(dir):
|
||||
self.LogInfo("Parsing " + filename + "...")
|
||||
if filename.endswith(".json") & filename.startswith("stats"):
|
||||
try:
|
||||
sum += self.parseSessionStatsFile(os.path.join(dir, filename))
|
||||
except:
|
||||
self.LogInfo("ERROR parsing File: " + filename)
|
||||
|
||||
return sum
|
||||
|
||||
def parseSessionStatsFile(self, path):
|
||||
sum = 0
|
||||
deauths = 0
|
||||
handshakes = 0
|
||||
associations = 0
|
||||
with open(path) as json_file:
|
||||
data = json.load(json_file)
|
||||
for entry in data["data"]:
|
||||
deauths += data["data"][entry]["num_deauths"]
|
||||
handshakes += data["data"][entry]["num_handshakes"]
|
||||
associations += data["data"][entry]["num_associations"]
|
||||
|
||||
sum += deauths * MULTIPLIER_DEAUTH
|
||||
sum += handshakes * MULTIPLIER_HANDSHAKE
|
||||
sum += associations * MULTIPLIER_ASSOCIATION
|
||||
|
||||
return sum
|
||||
|
||||
# If initial sum is 0, we try to parse it
|
||||
def calculateInitialSum(self, agent):
|
||||
sessionStatsActive = False
|
||||
sum = 0
|
||||
# Check if session stats is loaded
|
||||
for plugin in pwnagotchi.plugins.loaded:
|
||||
if plugin == "session-stats":
|
||||
sessionStatsActive = True
|
||||
break
|
||||
|
||||
if sessionStatsActive:
|
||||
try:
|
||||
self.LogInfo("parsing session-stats")
|
||||
sum = self.parseSessionStats()
|
||||
except:
|
||||
self.LogInfo("Error parsing session-stats")
|
||||
|
||||
|
||||
else:
|
||||
self.LogInfo("parsing last session")
|
||||
sum = self.lastSessionPoints(agent)
|
||||
|
||||
self.LogInfo(str(sum) + " Points calculated")
|
||||
return sum
|
||||
|
||||
# Get Last Sessions Points
|
||||
def lastSessionPoints(self, agent):
|
||||
summary = 0
|
||||
summary += agent.LastSession.handshakes * MULTIPLIER_HANDSHAKE
|
||||
summary += agent.LastSession.associated * MULTIPLIER_ASSOCIATION
|
||||
summary += agent.LastSession.deauthed * MULTIPLIER_DEAUTH
|
||||
return summary
|
||||
|
||||
# Helper function to calculate multiple Levels from a sum of EXPs
|
||||
def calcLevelFromSum(self, sum, agent):
|
||||
sum1 = sum
|
||||
level = 1
|
||||
while sum1 > self.calcExpNeeded(level):
|
||||
sum1 -= self.calcExpNeeded(level)
|
||||
level += 1
|
||||
self.lv = level
|
||||
self.exp = sum1
|
||||
self.expneeded = self.calcExpNeeded(level) - sum1
|
||||
if level > 1:
|
||||
# get Excited ;-)
|
||||
self.displayLevelUp(agent)
|
||||
|
||||
def calcActualSum(self, level, exp):
|
||||
lvlCounter = 1
|
||||
sum = exp
|
||||
# I know it wouldn't work if you change the lvl algorithm
|
||||
while lvlCounter < level:
|
||||
sum += self.calcExpNeeded(lvlCounter)
|
||||
lvlCounter += 1
|
||||
return sum
|
||||
|
||||
def displayLevelUp(self, agent):
|
||||
view = agent.view()
|
||||
view.set('face', FACE_LEVELUP)
|
||||
view.set('status', "Level Up!")
|
||||
view.update(force=True)
|
||||
|
||||
# Event Handling
|
||||
def on_association(self, agent, access_point):
|
||||
self.exp += MULTIPLIER_ASSOCIATION
|
||||
self.exp_tot += MULTIPLIER_ASSOCIATION
|
||||
self.exp_check(agent)
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
|
||||
def on_deauthentication(self, agent, access_point, client_station):
|
||||
self.exp += MULTIPLIER_DEAUTH
|
||||
self.exp_tot += MULTIPLIER_DEAUTH
|
||||
self.exp_check(agent)
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
|
||||
def on_handshake(self, agent, filename, access_point, client_station):
|
||||
self.exp += MULTIPLIER_HANDSHAKE
|
||||
self.exp_tot += MULTIPLIER_HANDSHAKE
|
||||
self.exp_check(agent)
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
|
||||
def on_ai_best_reward(self, agent, reward):
|
||||
self.exp += MULTIPLIER_AI_BEST_REWARD
|
||||
self.exp_tot += MULTIPLIER_AI_BEST_REWARD
|
||||
self.exp_check(agent)
|
||||
self.Save(self.save_file, self.save_file_mode)
|
||||
|
||||
def on_ready(self, agent):
|
||||
if self.calculateInitialXP:
|
||||
self.LogInfo("Initial point calculation")
|
||||
sum = self.calculateInitialSum(agent)
|
||||
self.exp_tot = sum
|
||||
self.calcLevelFromSum(sum, agent)
|
||||
self.Save(self.save_file, self.save_file_mode)
|
384
pwnagotchi/plugins/default/fix_brcmf_plugin.py
Normal file
384
pwnagotchi/plugins/default/fix_brcmf_plugin.py
Normal file
@ -0,0 +1,384 @@
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
import random
|
||||
from io import TextIOWrapper
|
||||
from pwnagotchi import plugins
|
||||
|
||||
import pwnagotchi.ui.faces as faces
|
||||
from pwnagotchi.bettercap import Client
|
||||
|
||||
from pwnagotchi.ui.components import Text, LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
|
||||
|
||||
class Fix_BRCMF(plugins.Plugin):
|
||||
__author__ = 'xBits'
|
||||
__version__ = '0.1.1'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'Reload brcmfmac module when blindbug is detected, instead of rebooting. Adapted from WATCHDOG.'
|
||||
__name__ = 'Fix_BRCMF'
|
||||
__help__ = """
|
||||
Reload brcmfmac module when blindbug is detected, instead of rebooting. Adapted from WATCHDOG.
|
||||
"""
|
||||
__dependencies__ = {
|
||||
'pip': ['scapy']
|
||||
}
|
||||
__defaults__ = {
|
||||
'enabled': False,
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.options = dict()
|
||||
self.pattern = re.compile(r'brcmf_cfg80211_nexmon_set_channel.*?Set Channel failed')
|
||||
self.pattern2 = re.compile(r'wifi error while hopping to channel')
|
||||
self.isReloadingMon = False
|
||||
self.connection = None
|
||||
self.LASTTRY = 0
|
||||
self._status = "--"
|
||||
self._count = 0
|
||||
|
||||
def on_loaded(self):
|
||||
"""
|
||||
Gets called when the plugin gets loaded
|
||||
"""
|
||||
self.pattern = re.compile(r'brcmf_cfg80211_nexmon_set_channel.*?Set Channel failed')
|
||||
self._status = "ld"
|
||||
logging.info("[FixBRCMF] plugin loaded.")
|
||||
|
||||
def on_ready(self, agent):
|
||||
try:
|
||||
cmd_output = subprocess.check_output("ip link show wlan0mon", shell=True)
|
||||
logging.info("[FixBRCMF ip link show wlan0mon]: %s" % repr(cmd_output))
|
||||
if ",UP," in str(cmd_output):
|
||||
logging.info("wlan0mon is up.")
|
||||
self._status = "up"
|
||||
last_lines = ''.join(list(TextIOWrapper(subprocess.Popen(['journalctl', '-n10', '-k'],
|
||||
stdout=subprocess.PIPE).stdout))[-10:])
|
||||
if len(self.pattern.findall(last_lines)) >= 3:
|
||||
self._status = "XX"
|
||||
if hasattr(agent, 'view'):
|
||||
display = agent.view()
|
||||
display.set('status', 'Blind-Bug detected. Restarting.')
|
||||
display.update(force=True)
|
||||
logging.info('[FixBRCMF] Blind-Bug detected. Restarting.\n%s' % repr(last_lines))
|
||||
try:
|
||||
self._tryTurningItOffAndOnAgain(agent)
|
||||
except Exception as err:
|
||||
logging.warning("[FixBRCMF turnOffAndfOn] %s" % repr(err))
|
||||
else:
|
||||
logging.info("[FixBRCMF] Logs look good, too:\n%s" % last_lines)
|
||||
self._status = ""
|
||||
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF ip link show wlan0mon]: %s" % repr(err))
|
||||
try:
|
||||
self._status = "xx"
|
||||
self._tryTurningItOffAndOnAgain(agent)
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF OffNOn]: %s" % repr(err))
|
||||
|
||||
# bettercap sys_log event
|
||||
# search syslog events for the brcmf channel fail, and reset when it shows up
|
||||
# apparently this only gets messages from bettercap going to syslog, not from syslog
|
||||
def on_bcap_sys_log(self, agent, event):
|
||||
if re.search('wifi error while hopping to channel', event['data']['Message']):
|
||||
logging.info("[FixBRCMF]SYSLOG MATCH: %s" % event['data']['Message'])
|
||||
logging.info("[FixBRCMF]**** restarting wifi.recon")
|
||||
try:
|
||||
result = agent.run("wifi.recon off; wifi.recon on")
|
||||
if result["success"]:
|
||||
logging.info("[FixBRCMF] wifi.recon flip: success!")
|
||||
if hasattr(agent, 'view'):
|
||||
display = agent.view()
|
||||
if display: display.update(force=True, new_data={"status": "Wifi recon flipped!",
|
||||
"face": faces.COOL})
|
||||
else:
|
||||
print("Wifi recon flipped")
|
||||
else:
|
||||
logging.warning("[FixBRCMF] wifi.recon flip: FAILED: %s" % repr(result))
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF]SYSLOG wifi.recon flip fail: %s" % err)
|
||||
|
||||
def on_epoch(self, agent, epoch, epoch_data):
|
||||
# don't check if we ran a reset recently
|
||||
logging.debug("[FixBRCMF]**** epoch")
|
||||
if time.time() - self.LASTTRY > 180:
|
||||
# get last 10 lines
|
||||
display = None
|
||||
last_lines = ''.join(list(TextIOWrapper(subprocess.Popen(['journalctl', '-n10', '-k'],
|
||||
stdout=subprocess.PIPE).stdout))[-10:])
|
||||
other_last_lines = ''.join(list(TextIOWrapper(subprocess.Popen(['journalctl', '-n10'],
|
||||
stdout=subprocess.PIPE).stdout))[-10:])
|
||||
logging.debug("[FixBRCMF]**** checking")
|
||||
if len(self.pattern.findall(last_lines)) >= 3:
|
||||
logging.info("[FixBRCMF]**** Should trigger a reload of the wlan0mon device:\n%s" % last_lines)
|
||||
if hasattr(agent, 'view'):
|
||||
display = agent.view()
|
||||
display.set('status', 'Blind-Bug detected. Restarting.')
|
||||
display.update(force=True)
|
||||
logging.info('[FixBRCMF] Blind-Bug detected. Restarting.')
|
||||
try:
|
||||
self._tryTurningItOffAndOnAgain(agent)
|
||||
except Exception as err:
|
||||
logging.warning("[FixBRCMF] TTOAOA: %s" % repr(err))
|
||||
elif len(self.pattern2.findall(other_last_lines)) >= 5:
|
||||
if hasattr(agent, 'view'):
|
||||
display = agent.view()
|
||||
display.set('status', 'Wifi channel stuck. Restarting recon.')
|
||||
display.update(force=True)
|
||||
logging.info('[FixBRCMF] Wifi channel stuck. Restarting recon.')
|
||||
|
||||
try:
|
||||
result = agent.run("wifi.recon off; wifi.recon on")
|
||||
if result["success"]:
|
||||
logging.info("[FixBRCMF] wifi.recon flip: success!")
|
||||
if display:
|
||||
display.update(force=True, new_data={"status": "Wifi recon flipped!",
|
||||
"brcmfmac_status": self._status,
|
||||
"face": faces.COOL})
|
||||
else:
|
||||
print("Wifi recon flipped\nthat was easy!")
|
||||
else:
|
||||
logging.warning("[FixBRCMF] wifi.recon flip: FAILED: %s" % repr(result))
|
||||
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF wifi.recon flip] %s" % repr(err))
|
||||
|
||||
else:
|
||||
print("logs look good")
|
||||
|
||||
def logPrintView(self, level, message, ui=None, displayData=None, force=True):
|
||||
try:
|
||||
if level is "error":
|
||||
logging.error(message)
|
||||
elif level is "warning":
|
||||
logging.warning(message)
|
||||
elif level is "debug":
|
||||
logging.debug(message)
|
||||
else:
|
||||
logging.info(message)
|
||||
|
||||
if ui:
|
||||
ui.update(force=force, new_data=displayData)
|
||||
elif displayData and "status" in displayData:
|
||||
print(displayData["status"])
|
||||
else:
|
||||
print("[%s] %s" % (level, message))
|
||||
except Exception as err:
|
||||
logging.error("[logPrintView] ERROR %s" % repr(err))
|
||||
|
||||
def _tryTurningItOffAndOnAgain(self, connection):
|
||||
# avoid overlapping restarts, but allow it if it's been a while
|
||||
# (in case the last attempt failed before resetting "isReloadingMon")
|
||||
if self.isReloadingMon and (time.time() - self.LASTTRY) < 180:
|
||||
logging.info("[FixBRCMF] Duplicate attempt ignored")
|
||||
else:
|
||||
self.isReloadingMon = True
|
||||
self.LASTTRY = time.time()
|
||||
|
||||
self._status = "BL"
|
||||
if hasattr(connection, 'view'):
|
||||
display = connection.view()
|
||||
if display: display.update(force=True, new_data={"status": "I'm blind! Try turning it off and on again",
|
||||
"brcmfmac_status": self._status,
|
||||
"face": faces.BORED})
|
||||
else:
|
||||
display = None
|
||||
|
||||
# main divergence from WATCHDOG starts here
|
||||
#
|
||||
# instead of rebooting, and losing all that energy loading up the AI
|
||||
# pause wifi.recon, close wlan0mon, reload the brcmfmac kernel module
|
||||
# then recreate wlan0mon, ..., and restart wifi.recon
|
||||
|
||||
# Turn it off
|
||||
|
||||
# attempt a sanity check. does wlan0mon exist?
|
||||
# is it up?
|
||||
try:
|
||||
cmd_output = subprocess.check_output("ip link show wlan0mon", shell=True)
|
||||
logging.info("[FixBRCMF ip link show wlan0mon]: %s" % repr(cmd_output))
|
||||
if ",UP," in str(cmd_output):
|
||||
logging.info("wlan0mon is up. Skip reset?")
|
||||
# not reliable, so don't skip just yet
|
||||
# print("wlan0mon is up. Skipping reset.")
|
||||
# self.isReloadingMon = False
|
||||
# return
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF ip link show wlan0mon]: %s" % repr(err))
|
||||
|
||||
try:
|
||||
result = connection.run("wifi.recon off")
|
||||
if "success" in result:
|
||||
self.logPrintView("info", "[FixBRCMF] wifi.recon off: %s!" % repr(result),
|
||||
display, {"status": "Wifi recon paused!", "face": faces.COOL})
|
||||
time.sleep(2)
|
||||
else:
|
||||
self.logPrintView("warning", "[FixBRCMF] wifi.recon off: FAILED: %s" % repr(result),
|
||||
display, {"status": "Recon was busted (probably)",
|
||||
"face": random.choice((faces.BROKEN, faces.DEBUG))})
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF wifi.recon off] error %s" % (repr(err)))
|
||||
|
||||
logging.info("[FixBRCMF] recon paused. Now trying wlan0mon reload")
|
||||
|
||||
try:
|
||||
cmd_output = subprocess.check_output("sudo ifconfig wlan0mon down && sudo iw dev wlan0mon del", shell=True)
|
||||
self._status = "dn"
|
||||
self.logPrintView("info", "[FixBRCMF] wlan0mon down and deleted: %s" % cmd_output,
|
||||
display, {"status": "wlan0mon d-d-d-down!", "face": faces.BORED})
|
||||
except Exception as nope:
|
||||
logging.error("[FixBRCMF delete wlan0mon] %s" % nope)
|
||||
pass
|
||||
|
||||
logging.debug("[FixBRCMF] Now trying modprobe -r")
|
||||
|
||||
# Try this sequence 3 times until it is reloaded
|
||||
#
|
||||
# Future: while "not fixed yet": blah blah blah. if "max_attemts", then reboot like the old days
|
||||
#
|
||||
tries = 0
|
||||
while tries < 3:
|
||||
try:
|
||||
# unload the module
|
||||
cmd_output = subprocess.check_output("sudo modprobe -r brcmfmac", shell=True)
|
||||
self.logPrintView("info", "[FixBRCMF] unloaded brcmfmac", display,
|
||||
{"status": "Turning it off #%d" % tries, "face": faces.SMART})
|
||||
self._status = "ul"
|
||||
time.sleep(1 + tries)
|
||||
|
||||
# reload the module
|
||||
try:
|
||||
# reload the brcmfmac kernel module
|
||||
cmd_output = subprocess.check_output("sudo modprobe brcmfmac", shell=True)
|
||||
|
||||
self.logPrintView("info", "[FixBRCMF] reloaded brcmfmac")
|
||||
self._status = "rl"
|
||||
time.sleep(10 + 4 * tries) # give it some time for wlan device to stabilize, or whatever
|
||||
|
||||
# success! now make the mon0
|
||||
try:
|
||||
cmd_output = subprocess.check_output(
|
||||
"sudo iw phy \"$(iw phy | head -1 | cut -d' ' -f2)\" interface add mon0 type monitor && sudo ifconfig mon0 up",
|
||||
shell=True)
|
||||
self.logPrintView("info",
|
||||
"[FixBRCMF interface add mon0] worked #%d: %s" % (tries, cmd_output))
|
||||
self._status = "up"
|
||||
time.sleep(tries + 5)
|
||||
try:
|
||||
# try accessing mon0 in bettercap
|
||||
result = connection.run("set wifi.interface mon0")
|
||||
if "success" in result:
|
||||
logging.info("[FixBRCMF set wifi.interface mon0] worked: %s" % repr(result))
|
||||
self._status = ""
|
||||
self._count = self._count + 1
|
||||
time.sleep(1)
|
||||
# stop looping and get back to recon
|
||||
break
|
||||
else:
|
||||
logging.info("[FixBRCMF set wifi.interfaceface mon0] failed? %s" % repr(result))
|
||||
except Exception as err:
|
||||
logging.info(
|
||||
"[FixBRCMF set wifi.interface mon0] except: %s" % (repr(result), repr(err)))
|
||||
except Exception as cerr: #
|
||||
if not display: print("failed loading mon0 attempt #%d: %s" % (tries, repr(cerr)))
|
||||
except Exception as err: # from modprobe
|
||||
if not display: print("Failed reloading brcmfmac")
|
||||
logging.error("[FixBRCMF] Failed reloading brcmfmac %s" % repr(err))
|
||||
|
||||
except Exception as nope: # from modprobe -r
|
||||
# fails if already unloaded, so probably fine
|
||||
logging.error("[FixBRCMF #%d modprobe -r] %s" % (tries, repr(nope)))
|
||||
if not display: print("[FixBRCMF #%d modprobe -r] %s" % (tries, repr(nope)))
|
||||
pass
|
||||
|
||||
tries = tries + 1
|
||||
if tries < 3:
|
||||
logging.info("[FixBRCMF] wlan0mon didn't make it. trying again")
|
||||
if not display: print(" wlan0mon didn't make it. trying again")
|
||||
|
||||
# exited the loop, so hopefully it loaded
|
||||
if tries < 3:
|
||||
if display:
|
||||
display.update(force=True, new_data={"status": "And back on again...",
|
||||
"brcmfmac_status": self._status,
|
||||
"face": faces.INTENSE})
|
||||
else:
|
||||
print("And back on again...")
|
||||
logging.info("[FixBRCMF] mon0 back up")
|
||||
else:
|
||||
self.LASTTRY = time.time()
|
||||
|
||||
time.sleep(8 + tries * 2) # give it a bit before restarting recon in bettercap
|
||||
self.isReloadingMon = False
|
||||
|
||||
logging.info("[FixBRCMF] re-enable recon")
|
||||
try:
|
||||
result = connection.run("wifi.clear; wifi.recon on")
|
||||
|
||||
if "success" in result: # and result["success"] is True:
|
||||
self._status = ""
|
||||
if display:
|
||||
display.update(force=True, new_data={"status": "I can see again! (probably): %s" % repr(result),
|
||||
"brcmfmac_status": self._status,
|
||||
"face": faces.HAPPY})
|
||||
else:
|
||||
print("I can see again")
|
||||
logging.info("[FixBRCMF] wifi.recon on %s" % repr(result))
|
||||
self.LASTTRY = time.time() + 120 # 2-minute pause until next time.
|
||||
else:
|
||||
logging.error("[FixBRCMF] wifi.recon did not start up: %s" % repr(result))
|
||||
self.LASTTRY = time.time() - 300 # failed, so try again ASAP
|
||||
self.isReloadingMon = False
|
||||
|
||||
except Exception as err:
|
||||
logging.error("[FixBRCMF wifi.recon on] %s" % repr(err))
|
||||
|
||||
# called to setup the ui elements
|
||||
def on_ui_setup(self, ui):
|
||||
# add custom UI elements
|
||||
if "position" in self.options:
|
||||
pos = self.options['position'].split(',')
|
||||
pos = [int(x.strip()) for x in pos]
|
||||
else:
|
||||
pos = (ui.width() / 2 + 35, ui.height() - 11)
|
||||
|
||||
logging.info("Got here")
|
||||
ui.add_element('brcmfmac_status', Text(color=BLACK, value='--', position=pos, font=fonts.Small))
|
||||
|
||||
# called when the ui is updated
|
||||
|
||||
def on_ui_update(self, ui):
|
||||
# update those elements
|
||||
if self._status:
|
||||
ui.set('brcmfmac_status', "wlan0mon %s" % self._status)
|
||||
else:
|
||||
ui.set('brcmfmac_status', "rst#%s" % self._count)
|
||||
|
||||
def on_unload(self, ui):
|
||||
try:
|
||||
ui.remove_element('brcmfmac_status')
|
||||
logging.info("[FixBRCMF] unloaded")
|
||||
except Exception as err:
|
||||
logging.info("[FixBRCMF] unload err %s " % repr(err))
|
||||
pass
|
||||
|
||||
|
||||
# run from command line to brute force a reload
|
||||
if __name__ == "__main__":
|
||||
print("Performing brcmfmac reload and restart mon0 in 5 seconds...")
|
||||
fb = Fix_BRCMF()
|
||||
|
||||
data = {'Message': "kernel: brcmfmac: brcmf_cfg80211_nexmon_set_channel: Set Channel failed: chspec=1234"}
|
||||
event = {'data': data}
|
||||
|
||||
agent = Client('localhost', port=8081, username="pwnagotchi", password="pwnagotchi");
|
||||
|
||||
time.sleep(2)
|
||||
print("3 seconds")
|
||||
time.sleep(3)
|
||||
# fb.on_epoch(agent, event, None)
|
||||
fb._tryTurningItOffAndOnAgain(agent)
|
Reference in New Issue
Block a user