mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
misc: refactored plugin system to use classes
This commit is contained in:
@ -1,9 +1,3 @@
|
||||
__author__ = '33197631+dadav@users.noreply.github.com'
|
||||
__version__ = '1.0.0'
|
||||
__name__ = 'bt-tether'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'This makes the display reachable over bluetooth'
|
||||
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
@ -14,11 +8,8 @@ from pwnagotchi.ui.components import LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
from pwnagotchi.utils import StatusFile
|
||||
import pwnagotchi.plugins as plugins
|
||||
|
||||
READY = False
|
||||
INTERVAL = StatusFile('/root/.bt-tether')
|
||||
OPTIONS = dict()
|
||||
NETWORK = None
|
||||
|
||||
class BTError(Exception):
|
||||
"""
|
||||
@ -26,6 +17,7 @@ class BTError(Exception):
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class BTNap:
|
||||
"""
|
||||
This class creates a bluetooth connection to the specified bt-mac
|
||||
@ -41,7 +33,6 @@ class BTNap:
|
||||
def __init__(self, mac):
|
||||
self._mac = mac
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_bus():
|
||||
"""
|
||||
@ -59,9 +50,9 @@ class BTNap:
|
||||
"""
|
||||
manager = getattr(BTNap.get_manager, 'cached_obj', None)
|
||||
if not manager:
|
||||
manager = BTNap.get_manager.cached_obj = dbus.Interface(
|
||||
BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'),
|
||||
'org.freedesktop.DBus.ObjectManager' )
|
||||
manager = BTNap.get_manager.cached_obj = dbus.Interface(
|
||||
BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'),
|
||||
'org.freedesktop.DBus.ObjectManager')
|
||||
return manager
|
||||
|
||||
@staticmethod
|
||||
@ -82,7 +73,6 @@ class BTNap:
|
||||
iface = obj.dbus_interface
|
||||
return obj.Set(iface, k, v, dbus_interface=BTNap.IFACE_PROPS)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def find_adapter(pattern=None):
|
||||
"""
|
||||
@ -98,14 +88,14 @@ class BTNap:
|
||||
"""
|
||||
bus, obj = BTNap.get_bus(), None
|
||||
for path, ifaces in objects.items():
|
||||
adapter = ifaces.get(BTNap.IFACE_ADAPTER)
|
||||
if adapter is None:
|
||||
continue
|
||||
if not pattern or pattern == adapter['Address'] or path.endswith(pattern):
|
||||
obj = bus.get_object(BTNap.IFACE_BASE, path)
|
||||
yield dbus.Interface(obj, BTNap.IFACE_ADAPTER)
|
||||
adapter = ifaces.get(BTNap.IFACE_ADAPTER)
|
||||
if adapter is None:
|
||||
continue
|
||||
if not pattern or pattern == adapter['Address'] or path.endswith(pattern):
|
||||
obj = bus.get_object(BTNap.IFACE_BASE, path)
|
||||
yield dbus.Interface(obj, BTNap.IFACE_ADAPTER)
|
||||
if obj is None:
|
||||
raise BTError('Bluetooth adapter not found')
|
||||
raise BTError('Bluetooth adapter not found')
|
||||
|
||||
@staticmethod
|
||||
def find_device(device_address, adapter_pattern=None):
|
||||
@ -178,7 +168,6 @@ class BTNap:
|
||||
logging.debug("BT-TETHER: Device is not connected.")
|
||||
return None, False
|
||||
|
||||
|
||||
def is_paired(self):
|
||||
"""
|
||||
Check if already connected
|
||||
@ -198,7 +187,6 @@ class BTNap:
|
||||
logging.debug("BT-TETHER: Device is not paired.")
|
||||
return False
|
||||
|
||||
|
||||
def wait_for_device(self, timeout=15):
|
||||
"""
|
||||
Wait for device
|
||||
@ -227,7 +215,7 @@ class BTNap:
|
||||
try:
|
||||
dev_remote = BTNap.find_device(self._mac, bt_dev)
|
||||
logging.debug("BT-TETHER: Using remote device (addr: %s): %s",
|
||||
BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path )
|
||||
BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path)
|
||||
break
|
||||
except BTError:
|
||||
logging.debug("BT-TETHER: Not found yet ...")
|
||||
@ -259,7 +247,6 @@ class BTNap:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
@staticmethod
|
||||
def nap(device):
|
||||
logging.debug('BT-TETHER: Trying to nap ...')
|
||||
@ -267,7 +254,7 @@ class BTNap:
|
||||
try:
|
||||
logging.debug('BT-TETHER: Connecting to profile ...')
|
||||
device.ConnectProfile('nap')
|
||||
except Exception: # raises exception, but still works
|
||||
except Exception: # raises exception, but still works
|
||||
pass
|
||||
|
||||
net = dbus.Interface(device, 'org.bluez.Network1')
|
||||
@ -297,7 +284,7 @@ class SystemdUnitWrapper:
|
||||
@staticmethod
|
||||
def _action_on_unit(action, unit):
|
||||
process = subprocess.Popen(f"systemctl {action} {unit}", shell=True, stdin=None,
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
process.wait()
|
||||
if process.returncode > 0:
|
||||
return False
|
||||
@ -309,7 +296,7 @@ class SystemdUnitWrapper:
|
||||
Calls systemctl daemon-reload
|
||||
"""
|
||||
process = subprocess.Popen("systemctl daemon-reload", shell=True, stdin=None,
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
process.wait()
|
||||
if process.returncode > 0:
|
||||
return False
|
||||
@ -387,16 +374,15 @@ class IfaceWrapper:
|
||||
"""
|
||||
return open(f"{self.path}/operstate", 'r').read().rsplit('\n') == 'up'
|
||||
|
||||
|
||||
def set_addr(self, addr):
|
||||
"""
|
||||
Set the netmask
|
||||
"""
|
||||
process = subprocess.Popen(f"ip addr add {addr} dev {self.iface}", shell=True, stdin=None,
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
process.wait()
|
||||
|
||||
if process.returncode == 2 or process.returncode == 0: # 2 = already set
|
||||
if process.returncode == 2 or process.returncode == 0: # 2 = already set
|
||||
return True
|
||||
|
||||
return False
|
||||
@ -404,7 +390,7 @@ class IfaceWrapper:
|
||||
@staticmethod
|
||||
def set_route(addr):
|
||||
process = subprocess.Popen(f"ip route replace default via {addr}", shell=True, stdin=None,
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||
process.wait()
|
||||
|
||||
if process.returncode > 0:
|
||||
@ -413,44 +399,47 @@ class IfaceWrapper:
|
||||
return True
|
||||
|
||||
|
||||
class BTTether(plugins.Plugin):
|
||||
__author__ = '33197631+dadav@users.noreply.github.com'
|
||||
__version__ = '1.0.0'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'This makes the display reachable over bluetooth'
|
||||
|
||||
def on_loaded():
|
||||
"""
|
||||
Gets called when the plugin gets loaded
|
||||
"""
|
||||
global READY
|
||||
global INTERVAL
|
||||
def __init__(self):
|
||||
self.ready = False
|
||||
self.interval = StatusFile('/root/.bt-tether')
|
||||
self.network = None
|
||||
|
||||
for opt in ['share_internet', 'mac', 'ip', 'netmask', 'interval']:
|
||||
if opt not in OPTIONS or (opt in OPTIONS and OPTIONS[opt] is None):
|
||||
logging.error("BT-TET: Please specify the %s in your config.yml.", opt)
|
||||
def on_loaded(self):
|
||||
for opt in ['share_internet', 'mac', 'ip', 'netmask', 'interval']:
|
||||
if opt not in self.options or (opt in self.options and self.options[opt] is None):
|
||||
logging.error("BT-TET: Please specify the %s in your config.yml.", opt)
|
||||
return
|
||||
|
||||
# ensure bluetooth is running
|
||||
bt_unit = SystemdUnitWrapper('bluetooth.service')
|
||||
if not bt_unit.is_active():
|
||||
if not bt_unit.start():
|
||||
logging.error("BT-TET: Can't start bluetooth.service")
|
||||
return
|
||||
|
||||
self.interval.update()
|
||||
self.ready = True
|
||||
|
||||
def on_ui_setup(self, ui):
|
||||
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
|
||||
def on_ui_update(self, ui):
|
||||
if not self.ready:
|
||||
return
|
||||
|
||||
# ensure bluetooth is running
|
||||
bt_unit = SystemdUnitWrapper('bluetooth.service')
|
||||
if not bt_unit.is_active():
|
||||
if not bt_unit.start():
|
||||
logging.error("BT-TET: Can't start bluetooth.service")
|
||||
if self.interval.newer_then_minutes(self.options['interval']):
|
||||
return
|
||||
|
||||
INTERVAL.update()
|
||||
READY = True
|
||||
self.interval.update()
|
||||
|
||||
|
||||
def on_ui_update(ui):
|
||||
"""
|
||||
Try to connect to device
|
||||
"""
|
||||
|
||||
if READY:
|
||||
global INTERVAL
|
||||
global NETWORK
|
||||
if INTERVAL.newer_then_minutes(OPTIONS['interval']):
|
||||
return
|
||||
|
||||
INTERVAL.update()
|
||||
|
||||
bt = BTNap(OPTIONS['mac'])
|
||||
bt = BTNap(self.options['mac'])
|
||||
|
||||
logging.debug('BT-TETHER: Check if already connected and paired')
|
||||
dev_remote, connected = bt.is_connected()
|
||||
@ -483,14 +472,13 @@ def on_ui_update(ui):
|
||||
else:
|
||||
logging.debug('BT-TETHER: Already paired.')
|
||||
|
||||
|
||||
btnap_iface = IfaceWrapper('bnep0')
|
||||
logging.debug('BT-TETHER: Check interface')
|
||||
if not btnap_iface.exists():
|
||||
# connected and paired but not napping
|
||||
logging.debug('BT-TETHER: Try to connect to nap ...')
|
||||
network, status = BTNap.nap(dev_remote)
|
||||
NETWORK = network
|
||||
self.network = network
|
||||
if status:
|
||||
logging.info('BT-TETHER: Napping!')
|
||||
ui.set('bluetooth', 'C')
|
||||
@ -504,7 +492,7 @@ def on_ui_update(ui):
|
||||
logging.debug('BT-TETHER: Interface found')
|
||||
|
||||
# check ip
|
||||
addr = f"{OPTIONS['ip']}/{OPTIONS['netmask']}"
|
||||
addr = f"{self.options['ip']}/{self.options['netmask']}"
|
||||
|
||||
logging.debug('BT-TETHER: Try to set ADDR to interface')
|
||||
if not btnap_iface.set_addr(addr):
|
||||
@ -515,9 +503,10 @@ def on_ui_update(ui):
|
||||
logging.debug('BT-TETHER: Set ADDR to interface')
|
||||
|
||||
# change route if sharking
|
||||
if OPTIONS['share_internet']:
|
||||
if self.options['share_internet']:
|
||||
logging.debug('BT-TETHER: Set routing and change resolv.conf')
|
||||
IfaceWrapper.set_route(".".join(OPTIONS['ip'].split('.')[:-1] + ['1'])) # im not proud about that
|
||||
IfaceWrapper.set_route(
|
||||
".".join(self.options['ip'].split('.')[:-1] + ['1'])) # im not proud about that
|
||||
# fix resolv.conf; dns over https ftw!
|
||||
with open('/etc/resolv.conf', 'r+') as resolv:
|
||||
nameserver = resolv.read()
|
||||
@ -530,8 +519,3 @@ def on_ui_update(ui):
|
||||
else:
|
||||
logging.error('BT-TETHER: bnep0 not found')
|
||||
ui.set('bluetooth', 'BE')
|
||||
|
||||
|
||||
def on_ui_setup(ui):
|
||||
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
|
Reference in New Issue
Block a user