mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
New bt-tether.py plugin, easier set up.
This commit is contained in:
@ -25,28 +25,9 @@ main.plugins.auto-update.install = true
|
|||||||
main.plugins.auto-update.interval = 1
|
main.plugins.auto-update.interval = 1
|
||||||
|
|
||||||
main.plugins.bt-tether.enabled = false
|
main.plugins.bt-tether.enabled = false
|
||||||
|
main.plugins.bt-tether.mac = ""
|
||||||
main.plugins.bt-tether.devices.android-phone.enabled = false
|
main.plugins.bt-tether.phone = "" # android or ios
|
||||||
main.plugins.bt-tether.devices.android-phone.search_order = 1
|
main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios
|
||||||
main.plugins.bt-tether.devices.android-phone.mac = ""
|
|
||||||
main.plugins.bt-tether.devices.android-phone.ip = "192.168.44.44"
|
|
||||||
main.plugins.bt-tether.devices.android-phone.netmask = 24
|
|
||||||
main.plugins.bt-tether.devices.android-phone.interval = 1
|
|
||||||
main.plugins.bt-tether.devices.android-phone.scantime = 10
|
|
||||||
main.plugins.bt-tether.devices.android-phone.max_tries = 10
|
|
||||||
main.plugins.bt-tether.devices.android-phone.share_internet = true
|
|
||||||
main.plugins.bt-tether.devices.android-phone.priority = 1
|
|
||||||
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.enabled = false
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.search_order = 2
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.mac = ""
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.ip = "172.20.10.6"
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.netmask = 24
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.interval = 5
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.scantime = 20
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.max_tries = 0
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.share_internet = true
|
|
||||||
main.plugins.bt-tether.devices.ios-phone.priority = 999
|
|
||||||
|
|
||||||
main.plugins.fix_services.enabled = true
|
main.plugins.fix_services.enabled = true
|
||||||
|
|
||||||
|
@ -1,579 +1,63 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
|
||||||
from threading import Lock
|
|
||||||
|
|
||||||
import dbus
|
|
||||||
|
|
||||||
import pwnagotchi.plugins as plugins
|
import pwnagotchi.plugins as plugins
|
||||||
import pwnagotchi.ui.fonts as fonts
|
import pwnagotchi.ui.fonts as fonts
|
||||||
from pwnagotchi.ui.components import LabeledValue
|
from pwnagotchi.ui.components import LabeledValue
|
||||||
from pwnagotchi.ui.view import BLACK
|
from pwnagotchi.ui.view import BLACK
|
||||||
from pwnagotchi.utils import StatusFile
|
|
||||||
|
|
||||||
|
|
||||||
class BTError(Exception):
|
|
||||||
"""
|
|
||||||
Custom bluetooth exception
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class BTNap:
|
|
||||||
"""
|
|
||||||
This class creates a bluetooth connection to the specified bt-mac
|
|
||||||
|
|
||||||
see https://github.com/bablokb/pi-btnap/blob/master/files/usr/local/sbin/btnap.service.py
|
|
||||||
"""
|
|
||||||
|
|
||||||
IFACE_BASE = 'org.bluez'
|
|
||||||
IFACE_DEV = 'org.bluez.Device1'
|
|
||||||
IFACE_ADAPTER = 'org.bluez.Adapter1'
|
|
||||||
IFACE_PROPS = 'org.freedesktop.DBus.Properties'
|
|
||||||
|
|
||||||
def __init__(self, mac):
|
|
||||||
self._mac = mac
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_bus():
|
|
||||||
"""
|
|
||||||
Get systembus obj
|
|
||||||
"""
|
|
||||||
bus = getattr(BTNap.get_bus, 'cached_obj', None)
|
|
||||||
if not bus:
|
|
||||||
bus = BTNap.get_bus.cached_obj = dbus.SystemBus()
|
|
||||||
return bus
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_manager():
|
|
||||||
"""
|
|
||||||
Get manager obj
|
|
||||||
"""
|
|
||||||
manager = getattr(BTNap.get_manager, 'cached_obj', None)
|
|
||||||
if not manager:
|
|
||||||
manager = BTNap.get_manager.cached_obj = dbus.Interface(
|
|
||||||
BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'),
|
|
||||||
'org.freedesktop.DBus.ObjectManager')
|
|
||||||
return manager
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def prop_get(obj, k, iface=None):
|
|
||||||
"""
|
|
||||||
Get a property of the obj
|
|
||||||
"""
|
|
||||||
if iface is None:
|
|
||||||
iface = obj.dbus_interface
|
|
||||||
return obj.Get(iface, k, dbus_interface=BTNap.IFACE_PROPS)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def prop_set(obj, k, v, iface=None):
|
|
||||||
"""
|
|
||||||
Set a property of the obj
|
|
||||||
"""
|
|
||||||
if iface is None:
|
|
||||||
iface = obj.dbus_interface
|
|
||||||
return obj.Set(iface, k, v, dbus_interface=BTNap.IFACE_PROPS)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def find_adapter(pattern=None):
|
|
||||||
"""
|
|
||||||
Find the bt adapter
|
|
||||||
"""
|
|
||||||
|
|
||||||
return BTNap.find_adapter_in_objects(BTNap.get_manager().GetManagedObjects(), pattern)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def find_adapter_in_objects(objects, pattern=None):
|
|
||||||
"""
|
|
||||||
Finds the obj with a pattern
|
|
||||||
"""
|
|
||||||
bus, obj = BTNap.get_bus(), None
|
|
||||||
for path, ifaces in objects.items():
|
|
||||||
adapter = ifaces.get(BTNap.IFACE_ADAPTER)
|
|
||||||
if adapter is None:
|
|
||||||
continue
|
|
||||||
if not pattern or pattern == adapter['Address'] or path.endswith(pattern):
|
|
||||||
obj = bus.get_object(BTNap.IFACE_BASE, path)
|
|
||||||
yield dbus.Interface(obj, BTNap.IFACE_ADAPTER)
|
|
||||||
if obj is None:
|
|
||||||
raise BTError('Bluetooth adapter not found')
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def find_device(device_address, adapter_pattern=None):
|
|
||||||
"""
|
|
||||||
Finds the device
|
|
||||||
"""
|
|
||||||
return BTNap.find_device_in_objects(BTNap.get_manager().GetManagedObjects(),
|
|
||||||
device_address, adapter_pattern)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def find_device_in_objects(objects, device_address, adapter_pattern=None):
|
|
||||||
"""
|
|
||||||
Finds the device in objects
|
|
||||||
"""
|
|
||||||
bus = BTNap.get_bus()
|
|
||||||
path_prefix = ''
|
|
||||||
if adapter_pattern:
|
|
||||||
if not isinstance(adapter_pattern, str):
|
|
||||||
adapter = adapter_pattern
|
|
||||||
else:
|
|
||||||
adapter = BTNap.find_adapter_in_objects(objects, adapter_pattern)
|
|
||||||
path_prefix = adapter.object_path
|
|
||||||
for path, ifaces in objects.items():
|
|
||||||
device = ifaces.get(BTNap.IFACE_DEV)
|
|
||||||
if device is None:
|
|
||||||
continue
|
|
||||||
if str(device['Address']).lower() == device_address.lower() and path.startswith(path_prefix):
|
|
||||||
obj = bus.get_object(BTNap.IFACE_BASE, path)
|
|
||||||
return dbus.Interface(obj, BTNap.IFACE_DEV)
|
|
||||||
raise BTError('Bluetooth device not found')
|
|
||||||
|
|
||||||
def power(self, on=True):
|
|
||||||
"""
|
|
||||||
Set power of devices to on/off
|
|
||||||
"""
|
|
||||||
logging.debug("BT-TETHER: Changing bluetooth device to %s", str(on))
|
|
||||||
|
|
||||||
try:
|
|
||||||
devs = list(BTNap.find_adapter())
|
|
||||||
devs = dict((BTNap.prop_get(dev, 'Address'), dev) for dev in devs)
|
|
||||||
except BTError as bt_err:
|
|
||||||
logging.error(bt_err)
|
|
||||||
return None
|
|
||||||
|
|
||||||
for dev_addr, dev in devs.items():
|
|
||||||
BTNap.prop_set(dev, 'Powered', on)
|
|
||||||
logging.debug('Set power of %s (addr %s) to %s', dev.object_path, dev_addr, str(on))
|
|
||||||
|
|
||||||
if devs:
|
|
||||||
return list(devs.values())[0]
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def is_paired(self):
|
|
||||||
"""
|
|
||||||
Check if already connected
|
|
||||||
"""
|
|
||||||
logging.debug("BT-TETHER: Checking if device is paired")
|
|
||||||
|
|
||||||
bt_dev = self.power(True)
|
|
||||||
|
|
||||||
if not bt_dev:
|
|
||||||
logging.debug("BT-TETHER: No bluetooth device found.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
dev_remote = BTNap.find_device(self._mac, bt_dev)
|
|
||||||
return bool(BTNap.prop_get(dev_remote, 'Paired'))
|
|
||||||
except BTError:
|
|
||||||
logging.debug("BT-TETHER: Device is not paired.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def wait_for_device(self, timeout=15):
|
|
||||||
"""
|
|
||||||
Wait for device
|
|
||||||
|
|
||||||
returns device if found, None if not
|
|
||||||
"""
|
|
||||||
logging.debug("BT-TETHER: Waiting for device")
|
|
||||||
|
|
||||||
bt_dev = self.power(True)
|
|
||||||
|
|
||||||
if not bt_dev:
|
|
||||||
logging.debug("BT-TETHER: No bluetooth device found.")
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
logging.debug("BT-TETHER: Starting discovery ...")
|
|
||||||
bt_dev.StartDiscovery()
|
|
||||||
except Exception as bt_ex:
|
|
||||||
logging.error(bt_ex)
|
|
||||||
raise bt_ex
|
|
||||||
|
|
||||||
dev_remote = None
|
|
||||||
|
|
||||||
# could be set to 0, so check if > -1
|
|
||||||
while timeout > -1:
|
|
||||||
try:
|
|
||||||
dev_remote = BTNap.find_device(self._mac, bt_dev)
|
|
||||||
logging.debug("BT-TETHER: Using remote device (addr: %s): %s",
|
|
||||||
BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path)
|
|
||||||
break
|
|
||||||
except BTError:
|
|
||||||
logging.debug("BT-TETHER: Not found yet ...")
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
timeout -= 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
logging.debug("BT-TETHER: Stopping Discovery ...")
|
|
||||||
bt_dev.StopDiscovery()
|
|
||||||
except Exception as bt_ex:
|
|
||||||
logging.error(bt_ex)
|
|
||||||
raise bt_ex
|
|
||||||
|
|
||||||
return dev_remote
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def pair(device):
|
|
||||||
logging.debug('BT-TETHER: Trying to pair ...')
|
|
||||||
try:
|
|
||||||
device.Pair()
|
|
||||||
logging.debug('BT-TETHER: Successful paired with device ;)')
|
|
||||||
return True
|
|
||||||
except dbus.exceptions.DBusException as err:
|
|
||||||
if err.get_dbus_name() == 'org.bluez.Error.AlreadyExists':
|
|
||||||
logging.debug('BT-TETHER: Already paired ...')
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def nap(device):
|
|
||||||
logging.debug('BT-TETHER: Trying to nap ...')
|
|
||||||
|
|
||||||
try:
|
|
||||||
logging.debug('BT-TETHER: Connecting to profile ...')
|
|
||||||
device.ConnectProfile('nap')
|
|
||||||
except Exception: # raises exception, but still works
|
|
||||||
pass
|
|
||||||
|
|
||||||
net = dbus.Interface(device, 'org.bluez.Network1')
|
|
||||||
|
|
||||||
try:
|
|
||||||
logging.debug('BT-TETHER: Connecting to nap network ...')
|
|
||||||
net.Connect('nap')
|
|
||||||
return net, True
|
|
||||||
except dbus.exceptions.DBusException as err:
|
|
||||||
if err.get_dbus_name() == 'org.bluez.Error.AlreadyConnected':
|
|
||||||
return net, True
|
|
||||||
|
|
||||||
connected = BTNap.prop_get(net, 'Connected')
|
|
||||||
if not connected:
|
|
||||||
return None, False
|
|
||||||
return net, True
|
|
||||||
|
|
||||||
|
|
||||||
class SystemdUnitWrapper:
|
|
||||||
"""
|
|
||||||
systemd wrapper
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, unit):
|
|
||||||
self.unit = unit
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _action_on_unit(action, unit):
|
|
||||||
process = subprocess.Popen(f"systemctl {action} {unit}", shell=True, stdin=None,
|
|
||||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
|
||||||
process.wait()
|
|
||||||
if process.returncode > 0:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def daemon_reload():
|
|
||||||
"""
|
|
||||||
Calls systemctl daemon-reload
|
|
||||||
"""
|
|
||||||
process = subprocess.Popen("systemctl daemon-reload", shell=True, stdin=None,
|
|
||||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
|
||||||
process.wait()
|
|
||||||
if process.returncode > 0:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def is_active(self):
|
|
||||||
"""
|
|
||||||
Checks if unit is active
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('is-active', self.unit)
|
|
||||||
|
|
||||||
def is_enabled(self):
|
|
||||||
"""
|
|
||||||
Checks if unit is enabled
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('is-enabled', self.unit)
|
|
||||||
|
|
||||||
def is_failed(self):
|
|
||||||
"""
|
|
||||||
Checks if unit is failed
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('is-failed', self.unit)
|
|
||||||
|
|
||||||
def enable(self):
|
|
||||||
"""
|
|
||||||
Enables the unit
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('enable', self.unit)
|
|
||||||
|
|
||||||
def disable(self):
|
|
||||||
"""
|
|
||||||
Disables the unit
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('disable', self.unit)
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""
|
|
||||||
Starts the unit
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('start', self.unit)
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""
|
|
||||||
Stops the unit
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('stop', self.unit)
|
|
||||||
|
|
||||||
def restart(self):
|
|
||||||
"""
|
|
||||||
Restarts the unit
|
|
||||||
"""
|
|
||||||
return SystemdUnitWrapper._action_on_unit('restart', self.unit)
|
|
||||||
|
|
||||||
|
|
||||||
class IfaceWrapper:
|
|
||||||
"""
|
|
||||||
Small wrapper to check and manage ifaces
|
|
||||||
|
|
||||||
see: https://github.com/rlisagor/pynetlinux/blob/master/pynetlinux/ifconfig.py
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, iface):
|
|
||||||
self.iface = iface
|
|
||||||
self.path = f"/sys/class/net/{iface}"
|
|
||||||
|
|
||||||
def exists(self):
|
|
||||||
"""
|
|
||||||
Checks if iface exists
|
|
||||||
"""
|
|
||||||
return os.path.exists(self.path)
|
|
||||||
|
|
||||||
def is_up(self):
|
|
||||||
"""
|
|
||||||
Checks if iface is ip
|
|
||||||
"""
|
|
||||||
return open(f"{self.path}/operstate", 'r').read().rsplit('\n') == 'up'
|
|
||||||
|
|
||||||
def set_addr(self, addr):
|
|
||||||
"""
|
|
||||||
Set the netmask
|
|
||||||
"""
|
|
||||||
process = subprocess.Popen(f"ip addr add {addr} dev {self.iface}", shell=True, stdin=None,
|
|
||||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
|
||||||
process.wait()
|
|
||||||
|
|
||||||
if process.returncode == 2 or process.returncode == 0: # 2 = already set
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def set_route(gateway, device):
|
|
||||||
process = subprocess.Popen(f"ip route replace default via {gateway} dev {device}", shell=True, stdin=None,
|
|
||||||
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
|
||||||
process.wait()
|
|
||||||
|
|
||||||
if process.returncode > 0:
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class Device:
|
|
||||||
def __init__(self, name, share_internet, mac, ip, netmask, interval, gateway=None, priority=10, scantime=15, search_order=0, max_tries=0, **kwargs):
|
|
||||||
self.name = name
|
|
||||||
self.status = StatusFile(f'/root/.bt-tether-{name}')
|
|
||||||
self.status.update()
|
|
||||||
self.tries = 0
|
|
||||||
self.network = None
|
|
||||||
|
|
||||||
self.max_tries = max_tries
|
|
||||||
self.search_order = search_order
|
|
||||||
self.share_internet = share_internet
|
|
||||||
self.ip = ip
|
|
||||||
self.netmask = netmask
|
|
||||||
self.gateway = gateway
|
|
||||||
self.interval = interval
|
|
||||||
self.mac = mac
|
|
||||||
self.scantime = scantime
|
|
||||||
self.priority = priority
|
|
||||||
|
|
||||||
def connected(self):
|
|
||||||
"""
|
|
||||||
Checks if device is connected
|
|
||||||
"""
|
|
||||||
return self.network and BTNap.prop_get(self.network, 'Connected')
|
|
||||||
|
|
||||||
def interface(self):
|
|
||||||
"""
|
|
||||||
Returns the interface name or None
|
|
||||||
"""
|
|
||||||
if not self.connected():
|
|
||||||
return None
|
|
||||||
return BTNap.prop_get(self.network, 'Interface')
|
|
||||||
|
|
||||||
|
|
||||||
class BTTether(plugins.Plugin):
|
class BTTether(plugins.Plugin):
|
||||||
__author__ = '33197631+dadav@users.noreply.github.com'
|
__author__ = 'Jayofelony'
|
||||||
__version__ = '1.1.0'
|
__version__ = '1.0'
|
||||||
__license__ = 'GPL3'
|
__license__ = 'GPL3'
|
||||||
__description__ = 'This makes the display reachable over bluetooth'
|
__description__ = 'A new BT-Tether plugin'
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.ready = False
|
self.ready = False
|
||||||
self.options = dict()
|
self.options = dict()
|
||||||
self.devices = dict()
|
|
||||||
self.lock = Lock()
|
|
||||||
self.running = True
|
|
||||||
self.status = '-'
|
self.status = '-'
|
||||||
|
|
||||||
def on_loaded(self):
|
def on_loaded(self):
|
||||||
# new config
|
logging.info("[BT-Tether] plugin loaded.")
|
||||||
if 'devices' in self.options:
|
|
||||||
for device, options in self.options['devices'].items():
|
|
||||||
if 'enabled' in options and options['enabled']:
|
|
||||||
for device_opt in ['enabled', 'priority', 'scantime', 'search_order',
|
|
||||||
'max_tries', 'share_internet', 'mac', 'ip',
|
|
||||||
'netmask', 'interval']:
|
|
||||||
if device_opt not in options or options[device_opt] is None:
|
|
||||||
logging.error("BT-TETHER: Please specify the %s for device %s.",
|
|
||||||
device_opt, device)
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
if options['enabled']:
|
|
||||||
self.devices[device] = Device(name=device, **options)
|
|
||||||
|
|
||||||
# legacy
|
|
||||||
if 'mac' in self.options:
|
|
||||||
for opt in ['share_internet', 'mac', 'ip', 'netmask', 'interval']:
|
|
||||||
if opt not in self.options or self.options[opt] is None:
|
|
||||||
logging.error("BT-TETHER: Please specify the %s in your config.toml.", opt)
|
|
||||||
return
|
|
||||||
|
|
||||||
self.devices['legacy'] = Device(name='legacy', **self.options)
|
|
||||||
|
|
||||||
if not self.devices:
|
|
||||||
logging.error("BT-TETHER: No valid devices found")
|
|
||||||
return
|
|
||||||
|
|
||||||
# ensure bluetooth is running
|
|
||||||
bt_unit = SystemdUnitWrapper('bluetooth.service')
|
|
||||||
if not bt_unit.is_active():
|
|
||||||
if not bt_unit.start():
|
|
||||||
logging.error("BT-TETHER: Can't start bluetooth.service")
|
|
||||||
return
|
|
||||||
|
|
||||||
logging.info("BT-TETHER: Successfully loaded ...")
|
|
||||||
|
|
||||||
while self.running:
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
devices_to_try = list()
|
|
||||||
connected_priorities = list()
|
|
||||||
any_device_connected = False # if this is true, last status on screen should be C
|
|
||||||
|
|
||||||
for _, device in self.devices.items():
|
|
||||||
if device.connected():
|
|
||||||
connected_priorities.append(device.priority)
|
|
||||||
any_device_connected = True
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not device.max_tries or (device.max_tries > device.tries):
|
|
||||||
if not device.status.newer_then_minutes(device.interval):
|
|
||||||
devices_to_try.append(device)
|
|
||||||
device.status.update()
|
|
||||||
device.tries += 1
|
|
||||||
|
|
||||||
sorted_devices = sorted(devices_to_try, key=lambda x: x.search_order)
|
|
||||||
|
|
||||||
for device in sorted_devices:
|
|
||||||
bt = BTNap(device.mac.lower())
|
|
||||||
|
|
||||||
|
def on_ready(self, agent):
|
||||||
|
ip = self.options['ip']
|
||||||
|
if self.options['phone'] == 'android':
|
||||||
|
address = f'{ip}/24,192.168.44.1'
|
||||||
|
route = '192.168.44.0/24,192.168.44.1'
|
||||||
|
elif self.options['phone'] == 'ios':
|
||||||
|
address = f'{ip}/24,172.20.10.1'
|
||||||
|
route = '172.20.10.0/24,172.20.10.1'
|
||||||
|
file = f'''
|
||||||
|
[connection]
|
||||||
|
id=bluetooth
|
||||||
|
type=bluetooth
|
||||||
|
autoconnect=yes
|
||||||
|
[bluetooth]
|
||||||
|
bdaddr={self.options['mac']}
|
||||||
|
type=panu
|
||||||
|
[ipv4]
|
||||||
|
address1={address}
|
||||||
|
route1={route}
|
||||||
|
dns=8.8.8.8;1.1.1.1;
|
||||||
|
method=manual
|
||||||
|
[ipv6]
|
||||||
|
addr-gen-mode=default
|
||||||
|
method=auto
|
||||||
|
[proxy]
|
||||||
|
'''
|
||||||
try:
|
try:
|
||||||
logging.debug('BT-TETHER: Search %d secs for %s ...', device.scantime, device.name)
|
file = '\n'.join(line.strip() for line in file.strip().splitlines() if line.strip())
|
||||||
dev_remote = bt.wait_for_device(timeout=device.scantime)
|
with open('/etc/NetworkManager/system-connections/bluetooth.nmconnection', 'w+') as bt_file:
|
||||||
if dev_remote is None:
|
bt_file.write(file)
|
||||||
logging.debug('BT-TETHER: Could not find %s, try again in %d minutes.', device.name, device.interval)
|
subprocess.run(['chmod', '600', '/etc/NetworkManager/system-connections/bluetooth.nmconnection'], check=True)
|
||||||
self.status = 'NF'
|
|
||||||
continue
|
|
||||||
except Exception as bt_ex:
|
|
||||||
logging.error(bt_ex)
|
|
||||||
self.status = 'NF'
|
|
||||||
continue
|
|
||||||
|
|
||||||
paired = bt.is_paired()
|
|
||||||
if not paired:
|
|
||||||
if BTNap.pair(dev_remote):
|
|
||||||
logging.debug('BT-TETHER: Paired with %s.', device.name)
|
|
||||||
else:
|
|
||||||
logging.debug('BT-TETHER: Pairing with %s failed ...', device.name)
|
|
||||||
self.status = 'PE'
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
logging.debug('BT-TETHER: Already paired.')
|
|
||||||
|
|
||||||
logging.debug('BT-TETHER: Try to create nap connection with %s ...', device.name)
|
|
||||||
device.network, success = BTNap.nap(dev_remote)
|
|
||||||
interface = None
|
|
||||||
|
|
||||||
if success:
|
|
||||||
try:
|
try:
|
||||||
interface = device.interface()
|
mac = self.options['mac']
|
||||||
except Exception:
|
subprocess.run(['nmcli', 'device', 'connect', f'{mac}'], check=True)
|
||||||
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
|
except Exception as e:
|
||||||
continue
|
logging.error(f"[BT-Tether] Failed to connect to device: {e}")
|
||||||
|
except Exception as e:
|
||||||
if interface is None:
|
logging.error(f"[BT-Tether] Failed to save Bluetooth connection file: {e}")
|
||||||
self.status = 'BE'
|
self.ready = True
|
||||||
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
|
|
||||||
continue
|
|
||||||
|
|
||||||
logging.debug('BT-TETHER: Created interface (%s)', interface)
|
|
||||||
self.status = 'C'
|
|
||||||
any_device_connected = True
|
|
||||||
device.tries = 0 # reset tries
|
|
||||||
else:
|
|
||||||
logging.debug('BT-TETHER: Could not establish nap connection with %s', device.name)
|
|
||||||
self.status = 'NF'
|
|
||||||
continue
|
|
||||||
|
|
||||||
addr = f"{device.ip}/{device.netmask}"
|
|
||||||
if device.gateway:
|
|
||||||
gateway = device.gateway
|
|
||||||
else:
|
|
||||||
gateway = ".".join(device.ip.split('.')[:-1] + ['1'])
|
|
||||||
|
|
||||||
wrapped_interface = IfaceWrapper(interface)
|
|
||||||
logging.debug('BT-TETHER: Add ip to %s', interface)
|
|
||||||
if not wrapped_interface.set_addr(addr):
|
|
||||||
self.status = 'AE'
|
|
||||||
logging.debug("BT-TETHER: Could not add ip to %s", interface)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if device.share_internet:
|
|
||||||
if not connected_priorities or device.priority > max(connected_priorities):
|
|
||||||
logging.debug('BT-TETHER: Set default route to %s via %s', gateway, interface)
|
|
||||||
IfaceWrapper.set_route(gateway, interface)
|
|
||||||
connected_priorities.append(device.priority)
|
|
||||||
|
|
||||||
logging.debug('BT-TETHER: Change resolv.conf if necessary ...')
|
|
||||||
with open('/etc/resolv.conf', 'r+') as resolv:
|
|
||||||
nameserver = resolv.read()
|
|
||||||
if 'nameserver 9.9.9.9' not in nameserver:
|
|
||||||
logging.debug('BT-TETHER: Added nameserver')
|
|
||||||
resolv.seek(0)
|
|
||||||
resolv.write(nameserver + 'nameserver 9.9.9.9\n')
|
|
||||||
|
|
||||||
if any_device_connected:
|
|
||||||
self.status = 'C'
|
|
||||||
|
|
||||||
def on_unload(self, ui):
|
|
||||||
self.running = False
|
|
||||||
with ui._lock:
|
|
||||||
ui.remove_element('bluetooth')
|
|
||||||
|
|
||||||
def on_ui_setup(self, ui):
|
def on_ui_setup(self, ui):
|
||||||
with ui._lock:
|
with ui._lock:
|
||||||
@ -582,4 +66,17 @@ class BTTether(plugins.Plugin):
|
|||||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||||
|
|
||||||
def on_ui_update(self, ui):
|
def on_ui_update(self, ui):
|
||||||
|
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
|
||||||
|
self.status = 'C'
|
||||||
|
else:
|
||||||
|
self.status = '-'
|
||||||
|
try:
|
||||||
|
mac = self.options['mac']
|
||||||
|
subprocess.run(['nmcli', 'device', 'connect', f'{mac}'], check=True)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"[BT-Tether] Failed to connect to device: {e}")
|
||||||
ui.set('bluetooth', self.status)
|
ui.set('bluetooth', self.status)
|
||||||
|
|
||||||
|
def on_unload(self, ui):
|
||||||
|
with ui._lock:
|
||||||
|
ui.remove_element('bluetooth')
|
Reference in New Issue
Block a user