new: using pwngrid for mesh advertising (we got rid of scapy loading times)

This commit is contained in:
Simone Margaritelli
2019-10-13 17:24:47 +02:00
parent 77c16c38f4
commit 5d28557608
10 changed files with 197 additions and 366 deletions

View File

@ -2,7 +2,11 @@ import _thread
import logging
import pwnagotchi
import pwnagotchi.utils as utils
import pwnagotchi.ui.faces as faces
import pwnagotchi.plugins as plugins
import pwnagotchi.grid as grid
from pwnagotchi.mesh.peer import Peer
class AsyncAdvertiser(object):
@ -10,38 +14,76 @@ class AsyncAdvertiser(object):
self._config = config
self._view = view
self._keypair = keypair
self._advertiser = None
self._advertisement = {
'name': pwnagotchi.name(),
'version': pwnagotchi.version,
'identity': self._keypair.fingerprint,
'face': faces.FRIEND,
'pwnd_run': 0,
'pwnd_tot': 0,
'uptime': 0,
'epoch': 0,
'policy': self._config['personality']
}
self._peers = {}
self._closest_peer = None
def keypair(self):
return self._keypair
_thread.start_new_thread(self._adv_poller, ())
def _update_advertisement(self, s):
self._advertisement['pwnd_run'] = len(self._handshakes)
self._advertisement['pwnd_tot'] = utils.total_unique_handshakes(self._config['bettercap']['handshakes'])
self._advertisement['uptime'] = pwnagotchi.uptime()
self._advertisement['epoch'] = self._epoch.epoch
grid.set_advertisement_data(self._advertisement)
def start_advertising(self):
_thread.start_new_thread(self._adv_worker, ())
def _adv_worker(self):
# this will take some time due to scapy being slow to be imported ...
from pwnagotchi.mesh.advertise import Advertiser
self._advertiser = Advertiser(
self._config['main']['iface'],
pwnagotchi.name(),
pwnagotchi.version,
self._keypair.fingerprint,
period=0.3,
data=self._config['personality'])
self._advertiser.on_peer(self._on_new_unit, self._on_lost_unit)
if self._config['personality']['advertise']:
self._advertiser.start()
self._view.on_state_change('face', self._advertiser.on_face_change)
grid.set_advertisement_data(self._advertisement)
grid.advertise(True)
self._view.on_state_change('face', self._on_face_change)
else:
logging.warning("advertising is disabled")
def _on_new_unit(self, peer):
self._view.on_new_peer(peer)
plugins.on('peer_detected', self, peer)
def _on_face_change(self, old, new):
self._advertisement['face'] = new
grid.set_advertisement_data(self._advertisement)
def _on_lost_unit(self, peer):
self._view.on_lost_peer(peer)
plugins.on('peer_lost', self, peer)
def _adv_poller(self):
while True:
logging.debug("polling pwngrid-peer for peers ...")
try:
grid_peers = grid.peers()
new_peers = {}
self._closest_peer = None
for obj in grid_peers:
peer = Peer(obj)
new_peers[peer.identity()] = peer
if self._closest_peer is None:
self._closest_peer = peer
# check who's gone
to_delete = []
for ident, peer in self._peers.items():
if ident not in new_peers:
self._view.on_lost_peer(peer)
plugins.on('peer_lost', self, peer)
to_delete.append(ident)
for ident in to_delete:
del self._peers[ident]
for ident, peer in new_peers:
# check who's new
if ident not in self._peers:
self._peers[ident] = peer
self._view.on_new_peer(peer)
plugins.on('peer_detected', self, peer)
# update the rest
else:
self._peers[ident].update(peer)
except Exception as e:
logging.error("error while polling pwngrid-peer: %s" % e)