Merge pull request #1 from dadav/fix/wifips

Fix/wifips: improvement in the plugin style and renamed to net-pos
This commit is contained in:
Zenzen San
2019-10-08 19:00:28 +00:00
committed by GitHub
48 changed files with 1289 additions and 841 deletions

View File

@ -1,6 +1,10 @@
import subprocess
import os
import logging
import time
import pwnagotchi.ui.view as view
version = '1.0.0plz4'
version = '1.0.0RC1'
_name = None
@ -46,3 +50,13 @@ def temperature(celsius=True):
temp = int(fp.read().strip())
c = int(temp / 1000)
return c if celsius else ((c * (9 / 5)) + 32)
def shutdown():
logging.warning("shutting down ...")
if view.ROOT:
view.ROOT.on_shutdown()
# give it some time to refresh the ui
time.sleep(5)
os.system("sync")
os.system("halt")

View File

@ -9,6 +9,7 @@ import _thread
import pwnagotchi.utils as utils
import pwnagotchi.plugins as plugins
from pwnagotchi.log import LastSession
from pwnagotchi.bettercap import Client
from pwnagotchi.mesh.utils import AsyncAdvertiser
from pwnagotchi.ai.train import AsyncTrainer
@ -17,13 +18,13 @@ RECOVERY_DATA_FILE = '/root/.pwnagotchi-recovery'
class Agent(Client, AsyncAdvertiser, AsyncTrainer):
def __init__(self, view, config):
def __init__(self, view, config, keypair):
Client.__init__(self, config['bettercap']['hostname'],
config['bettercap']['scheme'],
config['bettercap']['port'],
config['bettercap']['username'],
config['bettercap']['password'])
AsyncAdvertiser.__init__(self, config, view)
AsyncAdvertiser.__init__(self, config, view, keypair)
AsyncTrainer.__init__(self, config)
self._started_at = time.time()
@ -35,6 +36,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
self._last_pwnd = None
self._history = {}
self._handshakes = {}
self.last_session = LastSession(self._config)
@staticmethod
def is_connected():
@ -48,6 +50,9 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
def config(self):
return self._config
def view(self):
return self._view
def supported_channels(self):
return self._supported_channels
@ -296,7 +301,8 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
def _update_peers(self):
peer = self._advertiser.closest_peer()
self._view.set_closest_peer(peer)
tot = self._advertiser.num_peers()
self._view.set_closest_peer(peer, tot)
def _save_recovery_data(self):
logging.warning("writing recovery data to %s ..." % RECOVERY_DATA_FILE)

View File

@ -39,4 +39,4 @@ def load(config, agent, epoch, from_disk=True):
for key, value in config['params'].items():
logging.info(" %s: %s" % (key, value))
return a2c
return a2c

View File

@ -6,6 +6,11 @@ main:
custom_plugins:
# which plugins to load and enable
plugins:
grid:
enabled: true
report: false # don't report pwned networks by default!
exclude: # do not report the following networks (accepts both ESSIDs and BSSIDs)
- YourHomeNetworkHere
auto-update:
enabled: false
interval: 1 # every day
@ -15,9 +20,8 @@ main:
files:
- /root/brain.nn
- /root/brain.json
- /root/custom.yml
- /root/handshakes
- /etc/ssh
- /root/handshakes/
- /etc/pwnagotchi/
- /etc/hostname
- /etc/hosts
- /etc/motd
@ -29,6 +33,8 @@ main:
enabled: false
gps:
enabled: false
speed: 19200
device: /dev/ttyUSB0
twitter:
enabled: false
consumer_key: aaa
@ -41,6 +47,12 @@ main:
wpa-sec:
enabled: false
api_key: ~
wigle:
enabled: false
api_key: ~
screen_refresh:
enabled: false
refresh_interval: 50
# monitor interface to use
iface: mon0

47
pwnagotchi/identity.py Normal file
View File

@ -0,0 +1,47 @@
from Crypto.Signature import PKCS1_PSS
from Crypto.PublicKey import RSA
import Crypto.Hash.SHA256 as SHA256
import base64
import hashlib
import os
import logging
DefaultPath = "/etc/pwnagotchi/"
class KeyPair(object):
def __init__(self, path=DefaultPath):
self.path = path
self.priv_path = os.path.join(path, "id_rsa")
self.priv_key = None
self.pub_path = "%s.pub" % self.priv_path
self.pub_key = None
if not os.path.exists(self.path):
os.makedirs(self.path)
if not os.path.exists(self.priv_path) or not os.path.exists(self.pub_path):
logging.info("generating %s ..." % self.priv_path)
os.system("/usr/bin/ssh-keygen -t rsa -m PEM -b 4096 -N '' -f '%s'" % self.priv_path)
with open(self.priv_path) as fp:
self.priv_key = RSA.importKey(fp.read())
with open(self.pub_path) as fp:
self.pub_key = RSA.importKey(fp.read())
self.pub_key_pem = self.pub_key.exportKey('PEM').decode("ascii")
# python is special
if 'RSA PUBLIC KEY' not in self.pub_key_pem:
self.pub_key_pem = self.pub_key_pem.replace('PUBLIC KEY', 'RSA PUBLIC KEY')
pem = self.pub_key_pem.encode("ascii")
self.pub_key_pem_b64 = base64.b64encode(pem).decode("ascii")
self.fingerprint = hashlib.sha256(pem).hexdigest()
def sign(self, message):
hasher = SHA256.new(message.encode("ascii"))
signer = PKCS1_PSS.new(self.priv_key, saltLen=16)
signature = signer.sign(hasher)
signature_b64 = base64.b64encode(signature).decode("ascii")
return signature, signature_b64

View File

@ -33,11 +33,11 @@ msgid "AI ready."
msgstr "ΤΝ έτοιμη."
msgid "The neural network is ready."
msgstr "Το νευρωνικό δίκτυοείναι έτοιμο."
msgstr "Το νευρωνικό δίκτυο είναι έτοιμο."
#, python-brace-format
msgid "Hey, channel {channel} is free! Your AP will say thanks."
msgstr "Ε, το κανάλι {channel} είναιελεύθερο! Το AP σου θαείναι ευγνώμων."
msgstr "Ε, το κανάλι {channel} είναιελεύθερο! Το AP σου θα είναι ευγνώμων."
msgid "I'm bored ..."
msgstr "Βαριέμαι ..."

View File

@ -25,20 +25,20 @@ msgid "Hi, I'm Pwnagotchi! Starting ..."
msgstr "Bonjour, je suis Pwnagotchi! Démarrage ..."
msgid "New day, new hunt, new pwns!"
msgstr "Nouvelle journée, nouvelle chasse, nouveau pwns!"
msgstr "Nouveau jour, nouvelle chasse, nouveaux pwns !"
msgid "Hack the Planet!"
msgstr "Hack la planète!"
msgid "AI ready."
msgstr "IA prête."
msgstr "L'IA est prête."
msgid "The neural network is ready."
msgstr "Le réseau neuronal est prêt."
#, python-brace-format
msgid "Hey, channel {channel} is free! Your AP will say thanks."
msgstr "Hey, le channel {channel} est libre! Ton AP va dis merci."
msgstr "Hey, le channel {channel} est libre! Ton point d'accès va te remercier."
msgid "I'm bored ..."
msgstr "Je m'ennuie ..."
@ -68,17 +68,17 @@ msgid "I pwn therefore I am."
msgstr "Je pwn donc je suis."
msgid "So many networks!!!"
msgstr "Autant de réseaux!!!"
msgstr "Tellement de réseaux!!!"
msgid "I'm having so much fun!"
msgstr "Je m'amuse tellement!"
msgid "My crime is that of curiosity ..."
msgstr "Mon crime est celui de la curiosité ..."
msgstr "Mon crime, c'est la curiosité ..."
#, python-brace-format
msgid "Hello {name}! Nice to meet you. {name}"
msgstr "Bonjour {name}! Ravis de te rencontrer. {name}"
msgstr "Bonjour {name}! Ravi de te rencontrer. {name}"
#, python-brace-format
msgid "Unit {name} is nearby! {name}"
@ -145,7 +145,7 @@ msgstr ""
#, python-brace-format
msgid "Just decided that {mac} needs no WiFi!"
msgstr "Décidé à l'instant que {mac} n'a pas besoin de WiFi!"
msgstr "Je viens de décider que {mac} n'a pas besoin de WiFi!"
#, python-brace-format
msgid "Deauthenticating {mac}"
@ -153,11 +153,11 @@ msgstr "Désauthentification de {mac}"
#, python-brace-format
msgid "Kickbanning {mac}!"
msgstr ""
msgstr "Je kick et je bannis {mac}!"
#, python-brace-format
msgid "Cool, we got {num} new handshake{plural}!"
msgstr "Cool, nous avons {num} nouveaux handshake{plural}!"
msgstr "Cool, on a {num} nouveaux handshake{plural}!"
msgid "Ops, something went wrong ... Rebooting ..."
msgstr "Oups, quelque chose s'est mal passé ... Redémarrage ..."
@ -188,7 +188,7 @@ msgid ""
"#pwnlog #pwnlife #hacktheplanet #skynet"
msgstr ""
"J'ai pwn durant {duration} et kick {deauthed} clients! J'ai aussi rencontré "
"{associated} nouveaux amis and mangé {handshakes} handshakes! #pwnagotchi "
"{associated} nouveaux amis et dévoré {handshakes} handshakes! #pwnagotchi "
"#pwnlog #pwnlife #hacktheplanet #skynet"
msgid "hours"

View File

@ -11,9 +11,9 @@ from file_read_backwards import FileReadBackwards
LAST_SESSION_FILE = '/root/.pwnagotchi-last-session'
class SessionParser(object):
class LastSession(object):
EPOCH_TOKEN = '[epoch '
EPOCH_PARSER = re.compile(r'^\s*\[epoch (\d+)\] (.+)')
EPOCH_PARSER = re.compile(r'^.+\[epoch (\d+)\] (.+)')
EPOCH_DATA_PARSER = re.compile(r'([a-z_]+)=([^\s]+)')
TRAINING_TOKEN = ' training epoch '
START_TOKEN = 'connecting to http'
@ -70,27 +70,27 @@ class SessionParser(object):
if started_at is None:
started_at = stopped_at
if SessionParser.DEAUTH_TOKEN in line and line not in cache:
if LastSession.DEAUTH_TOKEN in line and line not in cache:
self.deauthed += 1
cache[line] = 1
elif SessionParser.ASSOC_TOKEN in line and line not in cache:
elif LastSession.ASSOC_TOKEN in line and line not in cache:
self.associated += 1
cache[line] = 1
elif SessionParser.HANDSHAKE_TOKEN in line and line not in cache:
elif LastSession.HANDSHAKE_TOKEN in line and line not in cache:
self.handshakes += 1
cache[line] = 1
elif SessionParser.TRAINING_TOKEN in line:
elif LastSession.TRAINING_TOKEN in line:
self.train_epochs += 1
elif SessionParser.EPOCH_TOKEN in line:
elif LastSession.EPOCH_TOKEN in line:
self.epochs += 1
m = SessionParser.EPOCH_PARSER.findall(line)
m = LastSession.EPOCH_PARSER.findall(line)
if m:
epoch_num, epoch_data = m[0]
m = SessionParser.EPOCH_DATA_PARSER.findall(epoch_data)
m = LastSession.EPOCH_DATA_PARSER.findall(epoch_data)
for key, value in m:
if key == 'reward':
reward = float(value)
@ -101,7 +101,7 @@ class SessionParser(object):
elif reward > self.max_reward:
self.max_reward = reward
elif SessionParser.PEER_TOKEN in line:
elif LastSession.PEER_TOKEN in line:
m = self._peer_parser.findall(line)
if m:
name, pubkey, rssi, sid, pwnd_tot, uptime = m[0]
@ -134,6 +134,30 @@ class SessionParser(object):
self.duration_human = ', '.join(self.duration_human)
self.avg_reward /= (self.epochs if self.epochs else 1)
def parse(self):
lines = []
if os.path.exists(self.path):
with FileReadBackwards(self.path, encoding="utf-8") as fp:
for line in fp:
line = line.strip()
if line != "" and line[0] != '[':
continue
lines.append(line)
if LastSession.START_TOKEN in line:
break
lines.reverse()
if len(lines) == 0:
lines.append("Initial Session");
self.last_session = lines
self.last_session_id = hashlib.md5(lines[0].encode()).hexdigest()
self.last_saved_session_id = self._get_last_saved_session_id()
self._parse_stats()
self.parsed = True
def __init__(self, config):
self.config = config
self.voice = Voice(lang=config['main']['lang'])
@ -150,28 +174,10 @@ class SessionParser(object):
self.last_peer = None
self._peer_parser = re.compile(
'detected unit (.+)@(.+) \(v.+\) on channel \d+ \(([\d\-]+) dBm\) \[sid:(.+) pwnd_tot:(\d+) uptime:(\d+)\]')
lines = []
if os.path.exists(self.path):
with FileReadBackwards(self.path, encoding="utf-8") as fp:
for line in fp:
line = line.strip()
if line != "" and line[0] != '[':
continue
lines.append(line)
if SessionParser.START_TOKEN in line:
break
lines.reverse()
if len(lines) == 0:
lines.append("Initial Session");
self.last_session = lines
self.last_session_id = hashlib.md5(lines[0].encode()).hexdigest()
self.last_saved_session_id = self._get_last_saved_session_id()
self._parse_stats()
self.last_session = []
self.last_session_id = None
self.last_saved_session_id = None
self.parsed = False
def is_new(self):
return self.last_session_id != self.last_saved_session_id

View File

@ -1,14 +1,4 @@
import os
from Crypto.PublicKey import RSA
import hashlib
def new_session_id():
return ':'.join(['%02x' % b for b in os.urandom(6)])
def get_identity(config):
pubkey = None
with open(config['main']['pubkey']) as fp:
pubkey = RSA.importKey(fp.read())
return pubkey, hashlib.sha1(pubkey.exportKey('DER')).hexdigest()

View File

@ -152,7 +152,7 @@ class Advertiser(object):
if self._is_broadcasted_advertisement(dot11):
try:
dot11elt = p.getlayer(Dot11Elt)
if dot11elt.ID == wifi.Dot11ElemID_Identity:
if dot11elt.ID == wifi.Dot11ElemID_Whisper:
self._parse_identity(p[RadioTap], dot11, dot11elt)
else:

View File

@ -3,16 +3,18 @@ import logging
import pwnagotchi
import pwnagotchi.plugins as plugins
from pwnagotchi.mesh import get_identity
class AsyncAdvertiser(object):
def __init__(self, config, view):
def __init__(self, config, view, keypair):
self._config = config
self._view = view
self._public_key, self._identity = get_identity(config)
self._keypair = keypair
self._advertiser = None
def keypair(self):
return self._keypair
def start_advertising(self):
_thread.start_new_thread(self._adv_worker, ())
@ -24,7 +26,7 @@ class AsyncAdvertiser(object):
self._config['main']['iface'],
pwnagotchi.name(),
pwnagotchi.version,
self._identity,
self._keypair.fingerprint,
period=0.3,
data=self._config['personality'])

View File

@ -1,6 +1,6 @@
SignatureAddress = 'de:ad:be:ef:de:ad'
BroadcastAddress = 'ff:ff:ff:ff:ff:ff'
Dot11ElemID_Identity = 222
Dot11ElemID_Whisper = 222
NumChannels = 140
def freq_to_channel(freq):
@ -30,7 +30,7 @@ def encapsulate(payload, addr_from, addr_to=BroadcastAddress):
while data_left > 0:
sz = min(chunk_size, data_left)
chunk = payload[data_off: data_off + sz]
frame /= Dot11Elt(ID=Dot11ElemID_Identity, info=chunk, len=sz)
frame /= Dot11Elt(ID=Dot11ElemID_Whisper, info=chunk, len=sz)
data_off += sz
data_left -= sz

View File

@ -33,7 +33,7 @@ def on_loaded():
logging.info("AUTO-BACKUP: Successfuly loaded.")
def on_internet_available(display, config, log):
def on_internet_available(agent):
global STATUS
if READY:
@ -42,6 +42,8 @@ def on_internet_available(display, config, log):
files_to_backup = " ".join(OPTIONS['files'])
try:
display = agent.view()
logging.info("AUTO-BACKUP: Backing up ...")
display.set('status', 'Backing up ...')
display.update()

View File

@ -23,13 +23,15 @@ def on_loaded():
READY = True
def on_internet_available(display, config, log):
def on_internet_available(agent):
global STATUS
if READY:
if STATUS.newer_then_days(OPTIONS['interval']):
return
display = agent.view()
try:
display.set('status', 'Updating ...')
display.update()

View File

@ -20,7 +20,7 @@ def on_loaded():
# called in manual mode when there's internet connectivity
def on_internet_available(ui, config, log):
def on_internet_available(agent):
pass

View File

@ -8,9 +8,8 @@ import logging
import json
import os
device = '/dev/ttyUSB0'
speed = 19200
running = False
OPTIONS = dict()
def on_loaded():
@ -27,8 +26,8 @@ def on_ready(agent):
except:
pass
agent.run('set gps.device %s' % device)
agent.run('set gps.speed %d' % speed)
agent.run('set gps.device %s' % OPTIONS['device'])
agent.run('set gps.speed %d' % OPTIONS['speed'])
agent.run('gps on')
running = True
else:

View File

@ -0,0 +1,163 @@
__author__ = 'evilsocket@gmail.com'
__version__ = '1.0.0'
__name__ = 'grid'
__license__ = 'GPL3'
__description__ = 'This plugin signals the unit cryptographic identity and list of pwned networks and list of pwned networks to api.pwnagotchi.ai'
import os
import logging
import requests
import glob
import subprocess
import pwnagotchi
import pwnagotchi.utils as utils
from pwnagotchi.utils import WifiInfo, extract_from_pcap
OPTIONS = dict()
AUTH = utils.StatusFile('/root/.api-enrollment.json', data_format='json')
REPORT = utils.StatusFile('/root/.api-report.json', data_format='json')
def on_loaded():
logging.info("grid plugin loaded.")
def get_api_token(last_session, keys):
global AUTH
if AUTH.newer_then_minutes(25) and AUTH.data is not None and 'token' in AUTH.data:
return AUTH.data['token']
if AUTH.data is None:
logging.info("grid: enrolling unit ...")
else:
logging.info("grid: refreshing token ...")
identity = "%s@%s" % (pwnagotchi.name(), keys.fingerprint)
# sign the identity string to prove we own both keys
_, signature_b64 = keys.sign(identity)
api_address = 'https://api.pwnagotchi.ai/api/v1/unit/enroll'
enrollment = {
'identity': identity,
'public_key': keys.pub_key_pem_b64,
'signature': signature_b64,
'data': {
'duration': last_session.duration,
'epochs': last_session.epochs,
'train_epochs': last_session.train_epochs,
'avg_reward': last_session.avg_reward,
'min_reward': last_session.min_reward,
'max_reward': last_session.max_reward,
'deauthed': last_session.deauthed,
'associated': last_session.associated,
'handshakes': last_session.handshakes,
'peers': last_session.peers,
'uname': subprocess.getoutput("uname -a")
}
}
r = requests.post(api_address, json=enrollment)
if r.status_code != 200:
raise Exception("(status %d) %s" % (r.status_code, r.json()))
AUTH.update(data=r.json())
logging.info("grid: done")
return AUTH.data["token"]
def parse_pcap(filename):
logging.info("grid: parsing %s ..." % filename)
net_id = os.path.basename(filename).replace('.pcap', '')
if '_' in net_id:
# /root/handshakes/ESSID_BSSID.pcap
essid, bssid = net_id.split('_')
else:
# /root/handshakes/BSSID.pcap
essid, bssid = '', net_id
it = iter(bssid)
bssid = ':'.join([a + b for a, b in zip(it, it)])
info = {
WifiInfo.ESSID: essid,
WifiInfo.BSSID: bssid,
}
try:
info = extract_from_pcap(filename, [WifiInfo.BSSID, WifiInfo.ESSID])
except Exception as e:
logging.error("grid: %s" % e)
return info[WifiInfo.ESSID], info[WifiInfo.BSSID]
def api_report_ap(last_session, keys, token, essid, bssid):
while True:
token = AUTH.data['token']
logging.info("grid: reporting %s (%s)" % (essid, bssid))
try:
api_address = 'https://api.pwnagotchi.ai/api/v1/unit/report/ap'
headers = {'Authorization': 'access_token %s' % token}
report = {
'essid': essid,
'bssid': bssid,
}
r = requests.post(api_address, headers=headers, json=report)
if r.status_code != 200:
if r.status_code == 401:
logging.warning("token expired")
token = get_api_token(last_session, keys)
continue
else:
raise Exception("(status %d) %s" % (r.status_code, r.text))
else:
return True
except Exception as e:
logging.error("grid: %s" % e)
return False
def on_internet_available(agent):
global REPORT
try:
config = agent.config()
keys = agent.keypair()
pcap_files = glob.glob(os.path.join(config['bettercap']['handshakes'], "*.pcap"))
num_networks = len(pcap_files)
reported = REPORT.data_field_or('reported', default=[])
num_reported = len(reported)
num_new = num_networks - num_reported
if num_new > 0:
if OPTIONS['report']:
logging.info("grid: %d new networks to report" % num_new)
token = get_api_token(agent.last_session, agent.keypair())
for pcap_file in pcap_files:
net_id = os.path.basename(pcap_file).replace('.pcap', '')
do_skip = False
for skip in OPTIONS['exclude']:
skip = skip.lower()
net = net_id.lower()
if skip in net or skip.replace(':', '') in net:
do_skip = True
break
if net_id not in reported and not do_skip:
essid, bssid = parse_pcap(pcap_file)
if bssid:
if api_report_ap(agent.last_session, keys, token, essid, bssid):
reported.append(net_id)
REPORT.update(data={'reported': reported})
else:
logging.debug("grid: reporting disabled")
except Exception as e:
logging.exception("error while enrolling the unit")

View File

@ -0,0 +1,144 @@
__author__ = 'zenzen san'
__version__ = '1.0.0'
__name__ = 'net-pos'
__license__ = 'GPL3'
__description__ = """Saves a json file with the access points with more signal
whenever a handshake is captured.
When internet is available the files are converted in geo locations
using Mozilla LocationService """
import logging
import json
import os
import requests
MOZILLA_API_URL = 'https://location.services.mozilla.com/v1/geolocate?key={api}'
ALREADY_SAVED = None
SKIP = None
READY = False
OPTIONS = {}
def on_loaded():
global ALREADY_SAVED
global SKIP
global READY
SKIP = list()
if 'api_key' not in OPTIONS or ('api_key' in OPTIONS and OPTIONS['api_key'] is None):
logging.error("NET-POS: api_key isn't set. Can't use mozilla's api.")
return
try:
with open('/root/.net_pos_saved', 'r') as f:
ALREADY_SAVED = f.read().splitlines()
except OSError:
logging.warning('NET-POS: No net-pos-file found.')
ALREADY_SAVED = []
READY = True
logging.info("net-pos plugin loaded.")
def _append_saved(path):
to_save = list()
if isinstance(path, str):
to_save.append(path)
elif isinstance(path, list):
to_save += path
else:
raise TypeError("Expected list or str, got %s" % type(path))
with open('/root/.net_pos_saved', 'a') as saved_file:
for x in to_save:
saved_file.write(x + "\n")
def on_internet_available(agent):
global SKIP
if READY:
config = agent.config()
display = agent.view()
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_np_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.net-pos.json')]
new_np_files = set(all_np_files) - set(ALREADY_SAVED) - set(SKIP)
if new_np_files:
logging.info("NET-POS: Found {num} new net-pos files. Fetching positions ...", len(new_np_files))
display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...")
display.update(force=True)
for idx, np_file in enumerate(new_np_files):
geo_file = np_file.replace('.net-pos.json', '.geo.json')
if os.path.exists(geo_file):
# got already the position
ALREADY_SAVED.append(np_file)
_append_saved(np_file)
continue
try:
geo_data = _get_geo_data(np_file) # returns json obj
except requests.exceptions.RequestException as req_e:
logging.error("NET-POS: %s", req_e)
SKIP += np_file
except json.JSONDecodeError as js_e:
logging.error("NET-POS: %s", js_e)
SKIP += np_file
except OSError as os_e:
logging.error("NET-POS: %s", os_e)
SKIP += np_file
with open(geo_file, 'w+t') as sf:
json.dump(geo_data, sf)
ALREADY_SAVED.append(np_file)
_append_saved(np_file)
display.set('status', f"Fetching positions ({idx+1}/{len(new_np_files)}")
display.update(force=True)
def on_handshake(agent, filename, access_point, client_station):
netpos = _get_netpos(agent)
netpos_filename = filename.replace('.pcap', '.net-pos.json')
logging.info("NET-POS: Saving net-location to %s", netpos_filename)
try:
with open(netpos_filename, 'w+t') as fp:
json.dump(netpos, fp)
except OSError as os_e:
logging.error("NET-POS: %s", os_e)
def _get_netpos(agent):
aps = agent.get_access_points()
netpos = {}
netpos['wifiAccessPoints'] = list()
# 6 seems a good number to save a wifi networks location
for access_point in sorted(aps, key=lambda i: i['rssi'], reverse=True)[:6]:
netpos['wifiAccessPoints'].append({'macAddress': access_point['mac'],
'signalStrength': access_point['rssi']})
return netpos
def _get_geo_data(path, timeout=30):
geourl = MOZILLA_API_URL.format(api=OPTIONS['api_key'])
try:
with open(path, "r") as json_file:
data = json.load(json_file)
except json.JSONDecodeError as js_e:
raise js_e
except OSError as os_e:
raise os_e
try:
result = requests.post(geourl,
json=data,
timeout=timeout)
return result.json()
except requests.exceptions.RequestException as req_e:
raise req_e

View File

@ -55,14 +55,17 @@ def _upload_to_ohc(path, timeout=30):
raise e
def on_internet_available(display, config, log):
def on_internet_available(agent):
"""
Called in manual mode when there's internet connectivity
"""
if READY:
display = agent.view()
config = agent.config()
handshake_dir = config['bettercap']['handshakes']
handshake_filenames = os.listdir(handshake_dir)
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames]
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if filename.endswith('.pcap')]
handshake_new = set(handshake_paths) - set(ALREADY_UPLOADED)
if handshake_new:

View File

@ -0,0 +1,24 @@
__author__ = 'pwnagotcchi [at] rossmarks [dot] uk'
__version__ = '1.0.0'
__name__ = 'screen_refresh'
__license__ = 'GPL3'
__description__ = 'Refresh he e-ink display after X amount of updates'
import logging
OPTIONS = dict()
update_count = 0;
def on_loaded():
logging.info("Screen refresh plugin loaded")
def on_ui_update(ui):
global update_count
update_count += 1
if update_count == OPTIONS['refresh_interval']:
ui._init_display()
ui.set('status', "Screen cleaned")
logging.info("Screen refreshing")
update_count = 0

View File

@ -14,8 +14,12 @@ def on_loaded():
# called in manual mode when there's internet connectivity
def on_internet_available(ui, config, log):
if log.is_new() and log.handshakes > 0:
def on_internet_available(agent):
config = agent.config()
display = agent.view()
last_session = agent.last_session
if last_session.is_new() and last_session.handshakes > 0:
try:
import tweepy
except ImportError:
@ -26,20 +30,20 @@ def on_internet_available(ui, config, log):
picture = '/dev/shm/pwnagotchi.png'
ui.on_manual_mode(log)
ui.update(force=True)
ui.image().save(picture, 'png')
ui.set('status', 'Tweeting...')
ui.update(force=True)
display.on_manual_mode(last_session)
display.update(force=True)
display.image().save(picture, 'png')
display.set('status', 'Tweeting...')
display.update(force=True)
try:
auth = tweepy.OAuthHandler(OPTIONS['consumer_key'], OPTIONS['consumer_secret'])
auth.set_access_token(OPTIONS['access_token_key'], OPTIONS['access_token_secret'])
api = tweepy.API(auth)
tweet = Voice(lang=config['main']['lang']).on_log_tweet(log)
tweet = Voice(lang=config['main']['lang']).on_last_session_tweet(last_session)
api.update_with_media(filename=picture, status=tweet)
log.save_session_id()
last_session.save_session_id()
logging.info("tweeted: %s" % tweet)
except Exception as e:

View File

@ -0,0 +1,278 @@
__author__ = '33197631+dadav@users.noreply.github.com'
__version__ = '1.0.0'
__name__ = 'wigle'
__license__ = 'GPL3'
__description__ = 'This plugin automatically uploades collected wifis to wigle.net'
import os
import logging
import json
from io import StringIO
import csv
from datetime import datetime
import requests
from pwnagotchi.mesh.wifi import freq_to_channel
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap
READY = False
ALREADY_UPLOADED = None
SKIP = None
OPTIONS = dict()
AKMSUITE_TYPES = {
0x00: "Reserved",
0x01: "802.1X",
0x02: "PSK",
}
def _handle_packet(packet, result):
from scapy.all import RadioTap, Dot11Elt, Dot11Beacon, rdpcap, Scapy_Exception, Dot11, Dot11ProbeResp, Dot11AssoReq, \
Dot11ReassoReq, Dot11EltRSN, Dot11EltVendorSpecific, Dot11EltMicrosoftWPA
"""
Analyze each packet and extract the data from Dot11 layers
"""
if hasattr(packet, 'cap') and 'privacy' in packet.cap:
# packet is encrypted
if 'encryption' not in result:
result['encryption'] = set()
if packet.haslayer(Dot11Beacon):
if packet.haslayer(Dot11Beacon)\
or packet.haslayer(Dot11ProbeResp)\
or packet.haslayer(Dot11AssoReq)\
or packet.haslayer(Dot11ReassoReq):
if 'bssid' not in result and hasattr(packet[Dot11], 'addr3'):
result['bssid'] = packet[Dot11].addr3
if 'essid' not in result and hasattr(packet[Dot11Elt], 'info'):
result['essid'] = packet[Dot11Elt].info
if 'channel' not in result and hasattr(packet[Dot11Elt:3], 'info'):
result['channel'] = int(ord(packet[Dot11Elt:3].info))
if packet.haslayer(RadioTap):
if 'rssi' not in result and hasattr(packet[RadioTap], 'dBm_AntSignal'):
result['rssi'] = packet[RadioTap].dBm_AntSignal
if 'channel' not in result and hasattr(packet[RadioTap], 'ChannelFrequency'):
result['channel'] = freq_to_channel(packet[RadioTap].ChannelFrequency)
# see: https://fossies.org/linux/scapy/scapy/layers/dot11.py
if packet.haslayer(Dot11EltRSN):
if hasattr(packet[Dot11EltRSN], 'akm_suites'):
auth = AKMSUITE_TYPES.get(packet[Dot11EltRSN].akm_suites[0].suite)
result['encryption'].add(f"WPA2/{auth}")
else:
result['encryption'].add("WPA2")
if packet.haslayer(Dot11EltVendorSpecific)\
and (packet.haslayer(Dot11EltMicrosoftWPA)
or packet.info.startswith(b'\x00P\xf2\x01\x01\x00')):
if hasattr(packet, 'akm_suites'):
auth = AKMSUITE_TYPES.get(packet.akm_suites[0].suite)
result['encryption'].add(f"WPA2/{auth}")
else:
result['encryption'].add("WPA2")
# end see
return result
def _analyze_pcap(pcap):
from scapy.all import RadioTap, Dot11Elt, Dot11Beacon, rdpcap, Scapy_Exception, Dot11, Dot11ProbeResp, Dot11AssoReq, \
Dot11ReassoReq, Dot11EltRSN, Dot11EltVendorSpecific, Dot11EltMicrosoftWPA
"""
Iterate over the packets and extract data
"""
result = dict()
try:
packets = rdpcap(pcap)
for packet in packets:
result = _handle_packet(packet, result)
except Scapy_Exception as sc_e:
raise sc_e
return result
def on_loaded():
"""
Gets called when the plugin gets loaded
"""
global READY
global ALREADY_UPLOADED
global SKIP
SKIP = list()
if 'api_key' not in OPTIONS or ('api_key' in OPTIONS and OPTIONS['api_key'] is None):
logging.error("WIGLE: api_key isn't set. Can't upload to wigle.net")
return
try:
with open('/root/.wigle_uploads', 'r') as f:
ALREADY_UPLOADED = f.read().splitlines()
except OSError:
logging.warning('WIGLE: No upload-file found.')
ALREADY_UPLOADED = []
READY = True
def _extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try:
with open(path, 'r') as json_file:
return json.load(json_file)
except OSError as os_err:
raise os_err
except json.JSONDecodeError as json_err:
raise json_err
def _format_auth(data):
out = ""
for auth in data:
out = f"{out}[{auth}]"
return out
def _transform_wigle_entry(gps_data, pcap_data):
"""
Transform to wigle entry in file
"""
dummy = StringIO()
# write kismet header
dummy.write("WigleWifi-1.4,appRelease=20190201,model=Kismet,release=2019.02.01.{},device=kismet,display=kismet,board=kismet,brand=kismet\n")
dummy.write("MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type")
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE)
writer.writerow([
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
gps_data['Latitude'],
gps_data['Longitude'],
gps_data['Altitude'],
0, # accuracy?
'WIFI'])
return dummy.getvalue()
def _send_to_wigle(lines, api_key, timeout=30):
"""
Uploads the file to wigle-net
"""
dummy = StringIO()
for line in lines:
dummy.write(f"{line}")
dummy.seek(0)
headers = {'Authorization': f"Basic {api_key}",
'Accept': 'application/json'}
data = {'donate': 'false'}
payload = {'file': dummy, 'type': 'text/csv'}
try:
res = requests.post('https://api.wigle.net/api/v2/file/upload',
data=data,
headers=headers,
files=payload,
timeout=timeout)
json_res = res.json()
if not json_res['success']:
raise requests.exceptions.RequestException(json_res['message'])
except requests.exceptions.RequestException as re_e:
raise re_e
def on_internet_available(agent):
from scapy.all import RadioTap, Dot11Elt, Dot11Beacon, rdpcap, Scapy_Exception, Dot11, Dot11ProbeResp, Dot11AssoReq, \
Dot11ReassoReq, Dot11EltRSN, Dot11EltVendorSpecific, Dot11EltMicrosoftWPA
"""
Called in manual mode when there's internet connectivity
"""
global ALREADY_UPLOADED
global SKIP
if READY:
config = agent.config()
display = agent.view()
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_gps_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.gps.json')]
new_gps_files = set(all_gps_files) - set(ALREADY_UPLOADED) - set(SKIP)
if new_gps_files:
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
csv_entries = list()
no_err_entries = list()
for gps_file in new_gps_files:
pcap_filename = gps_file.replace('.gps.json', '.pcap')
if not os.path.exists(pcap_filename):
logging.error("WIGLE: Can't find pcap for %s", gps_file)
SKIP.append(gps_file)
continue
try:
gps_data = _extract_gps_data(gps_file)
except OSError as os_err:
logging.error("WIGLE: %s", os_err)
SKIP.append(gps_file)
continue
except json.JSONDecodeError as json_err:
logging.error("WIGLE: %s", json_err)
SKIP.append(gps_file)
continue
try:
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
WifiInfo.ESSID,
WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL,
WifiInfo.RSSI])
except FieldNotFoundError:
logging.error("WIGLE: Could not extract all informations. Skip %s", gps_file)
SKIP.append(gps_file)
continue
except Scapy_Exception as sc_e:
logging.error("WIGLE: %s", sc_e)
SKIP.append(gps_file)
continue
new_entry = _transform_wigle_entry(gps_data, pcap_data)
csv_entries.append(new_entry)
no_err_entries.append(gps_file)
if csv_entries:
display.set('status', "Uploading gps-data to wigle.net ...")
display.update(force=True)
try:
_send_to_wigle(csv_entries, OPTIONS['api_key'])
ALREADY_UPLOADED += no_err_entries
with open('/root/.wigle_uploads', 'a') as up_file:
for gps in no_err_entries:
up_file.write(gps + "\n")
logging.info("WIGLE: Successfuly uploaded %d files", len(no_err_entries))
except requests.exceptions.RequestException as re_e:
SKIP += no_err_entries
logging.error("WIGLE: Got an exception while uploading %s", re_e)
except OSError as os_e:
SKIP += no_err_entries
logging.error("WIGLE: Got the following error: %s", os_e)

View File

@ -54,14 +54,17 @@ def _upload_to_wpasec(path, timeout=30):
raise e
def on_internet_available(display, config, log):
def on_internet_available(agent):
"""
Called in manual mode when there's internet connectivity
"""
if READY:
config = agent.config()
display = agent.view()
handshake_dir = config['bettercap']['handshakes']
handshake_filenames = os.listdir(handshake_dir)
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames]
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if filename.endswith('.pcap')]
handshake_new = set(handshake_paths) - set(ALREADY_UPLOADED)
if handshake_new:

View File

@ -15,11 +15,29 @@ class VideoHandler(BaseHTTPRequestHandler):
_lock = Lock()
_index = """<html>
<head>
<title>%s</title>
<title>%s</title>
<style>
.block {
-webkit-appearance: button;
-moz-appearance: button;
appearance: button;
display: block;
cursor: pointer;
text-align: center;
}
</style>
</head>
<body>
<img src="/ui" id="ui"/>
<div style="position: absolute; top:0; left:0; width:100%%;">
<img src="/ui" id="ui" style="width:100%%"/>
<br/>
<hr/>
<form action="/shutdown" onsubmit="return confirm('This will halt the unit, continue?');">
<input type="submit" class="block" value="Shutdown"/>
</form>
</div>
<script type="text/javascript">
window.onload = function() {
var image = document.getElementById("ui");
@ -50,6 +68,9 @@ class VideoHandler(BaseHTTPRequestHandler):
except:
pass
elif self.path.startswith('/shutdown'):
pwnagotchi.shutdown()
elif self.path.startswith('/ui'):
with self._lock:
self.send_response(200)

View File

@ -15,7 +15,7 @@ from pwnagotchi.ui.state import State
WHITE = 0xff
BLACK = 0x00
ROOT = None
def setup_display_specifics(config):
width = 0
@ -57,9 +57,12 @@ def setup_display_specifics(config):
class View(object):
def __init__(self, config, state={}):
global ROOT
self._render_cbs = []
self._config = config
self._canvas = None
self._frozen = False
self._lock = Lock()
self._voice = Voice(lang=config['main']['lang'])
@ -119,6 +122,8 @@ class View(object):
logging.warning("ui.fps is 0, the display will only update for major changes")
self._ignore_changes = ('uptime', 'name')
ROOT = self
def add_element(self, key, elem):
self._state.add_element(key, elem)
@ -153,22 +158,22 @@ class View(object):
self.set('face', faces.AWAKE)
def on_ai_ready(self):
self.set('mode', '')
self.set('mode', ' AI')
self.set('face', faces.HAPPY)
self.set('status', self._voice.on_ai_ready())
self.update()
def on_manual_mode(self, log):
def on_manual_mode(self, last_session):
self.set('mode', 'MANU')
self.set('face', faces.SAD if log.handshakes == 0 else faces.HAPPY)
self.set('status', self._voice.on_log(log))
self.set('epoch', "%04d" % log.epochs)
self.set('uptime', log.duration)
self.set('face', faces.SAD if last_session.handshakes == 0 else faces.HAPPY)
self.set('status', self._voice.on_last_session_data(last_session))
self.set('epoch', "%04d" % last_session.epochs)
self.set('uptime', last_session.duration)
self.set('channel', '-')
self.set('aps', "%d" % log.associated)
self.set('shakes', '%d (%s)' % (log.handshakes, \
self.set('aps', "%d" % last_session.associated)
self.set('shakes', '%d (%s)' % (last_session.handshakes, \
utils.total_unique_handshakes(self._config['bettercap']['handshakes'])))
self.set_closest_peer(log.last_peer)
self.set_closest_peer(last_session.last_peer, last_session.peers)
def is_normal(self):
return self._state.get('face') not in (
@ -188,7 +193,7 @@ class View(object):
self.set('status', self._voice.on_normal())
self.update()
def set_closest_peer(self, peer):
def set_closest_peer(self, peer, num_total):
if peer is None:
self.set('friend_face', None)
self.set('friend_name', None)
@ -207,6 +212,12 @@ class View(object):
name += '' * (4 - num_bars)
name += ' %s %d (%d)' % (peer.name(), peer.pwnd_run(), peer.pwnd_total())
if num_total > 1:
if num_total > 9000:
name += ' of over 9000'
else:
name += ' of %d' % num_total
self.set('friend_face', peer.face())
self.set('friend_name', name)
self.update()
@ -255,6 +266,12 @@ class View(object):
self.on_normal()
def on_shutdown(self):
self.set('face', faces.SLEEP)
self.set('status', self._voice.on_shutdown())
self.update(force=True)
self._frozen = True
def on_bored(self):
self.set('face', faces.BORED)
self.set('status', self._voice.on_bored())
@ -317,6 +334,9 @@ class View(object):
def update(self, force=False):
with self._lock:
if self._frozen:
return
changes = self._state.changes(ignore=self._ignore_changes)
if force or len(changes):
self._canvas = Image.new('1', (self._width, self._height), WHITE)

View File

@ -1,10 +1,12 @@
from datetime import datetime
from enum import Enum
import logging
import glob
import os
import time
import subprocess
import yaml
import json
# https://stackoverflow.com/questions/823196/yaml-merge-in-python
@ -19,13 +21,15 @@ def merge_config(user, default):
def load_config(args):
with open(args.config, 'rt') as fp:
with open(args.config) as fp:
config = yaml.safe_load(fp)
if os.path.exists(args.user_config):
with open(args.user_config, 'rt') as fp:
with open(args.user_config) as fp:
user_config = yaml.safe_load(fp)
config = merge_config(user_config, config)
# if the file is empty, safe_load will return None and merge_config will boom.
if user_config:
config = merge_config(user_config, config)
return config
@ -80,19 +84,138 @@ def blink(times=1, delay=0.3):
time.sleep(delay)
led(True)
class WifiInfo(Enum):
"""
Fields you can extract from a pcap file
"""
BSSID = 0
ESSID = 1
ENCRYPTION = 2
CHANNEL = 3
RSSI = 4
class FieldNotFoundError(Exception):
pass
def extract_from_pcap(path, fields):
"""
Search in pcap-file for specified information
path: Path to pcap file
fields: Array of fields that should be extracted
If a field is not found, FieldNotFoundError is raised
"""
results = dict()
for field in fields:
if not isinstance(field, WifiInfo):
raise TypeError("Invalid field")
subtypes = set()
if field == WifiInfo.BSSID:
from scapy.all import Dot11Beacon, Dot11ProbeResp, Dot11AssoReq, Dot11ReassoReq, Dot11, sniff
subtypes.add('beacon')
bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
packets = sniff(offline=path, filter=bpf_filter)
try:
for packet in packets:
if packet.haslayer(Dot11Beacon):
if hasattr(packet[Dot11], 'addr3'):
results[field] = packet[Dot11].addr3
break
else: # magic
raise FieldNotFoundError("Could not find field [BSSID]")
except Exception:
raise FieldNotFoundError("Could not find field [BSSID]")
elif field == WifiInfo.ESSID:
from scapy.all import Dot11Beacon, Dot11ReassoReq, Dot11AssoReq, Dot11, sniff, Dot11Elt
subtypes.add('beacon')
subtypes.add('assoc-req')
subtypes.add('reassoc-req')
bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
packets = sniff(offline=path, filter=bpf_filter)
try:
for packet in packets:
if packet.haslayer(Dot11Elt) and hasattr(packet[Dot11Elt], 'info'):
results[field] = packet[Dot11Elt].info.decode('utf-8')
break
else: # magic
raise FieldNotFoundError("Could not find field [ESSID]")
except Exception:
raise FieldNotFoundError("Could not find field [ESSID]")
elif field == WifiInfo.ENCRYPTION:
from scapy.all import Dot11Beacon, sniff
subtypes.add('beacon')
bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
packets = sniff(offline=path, filter=bpf_filter)
try:
for packet in packets:
if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11Beacon], 'network_stats'):
stats = packet[Dot11Beacon].network_stats()
if 'crypto' in stats:
results[field] = stats['crypto'] # set with encryption types
break
else: # magic
raise FieldNotFoundError("Could not find field [ENCRYPTION]")
except Exception:
raise FieldNotFoundError("Could not find field [ENCRYPTION]")
elif field == WifiInfo.CHANNEL:
from scapy.all import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
packets = sniff(offline=path, count=1)
try:
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
except Exception:
raise FieldNotFoundError("Could not find field [CHANNEL]")
elif field == WifiInfo.RSSI:
from scapy.all import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
packets = sniff(offline=path, count=1)
try:
results[field] = packets[0][RadioTap].dBm_AntSignal
except Exception:
raise FieldNotFoundError("Could not find field [RSSI]")
return results
class StatusFile(object):
def __init__(self, path):
def __init__(self, path, data_format='raw'):
self._path = path
self._updated = None
self._format = data_format
self.data = None
if os.path.exists(path):
self._updated = datetime.fromtimestamp(os.path.getmtime(path))
with open(path) as fp:
if data_format == 'json':
self.data = json.load(fp)
else:
self.data = fp.read()
def data_field_or(self, name, default=""):
if self.data is not None and name in self.data:
return self.data[name]
return default
def newer_then_minutes(self, minutes):
return self._updated is not None and ((datetime.now() - self._updated).seconds / 60) < minutes
def newer_then_days(self, days):
return self._updated is not None and (datetime.now() - self._updated).days < days
def update(self, data=None):
self._updated = datetime.now()
self.data = data
with open(self._path, 'w') as fp:
fp.write(str(self._updated) if data is None else data)
if data is None:
fp.write(str(self._updated))
elif self._format == 'json':
json.dump(self.data, fp)
else:
fp.write(data)

View File

@ -90,6 +90,11 @@ class Voice:
self._('Zzzzz'),
self._('ZzzZzzz ({secs}s)').format(secs=secs)])
def on_shutdown(self):
return random.choice([
self._('Good night.'),
self._('Zzz')])
def on_awakening(self):
return random.choice(['...', '!'])
@ -120,23 +125,23 @@ class Voice:
def on_rebooting(self):
return self._("Ops, something went wrong ... Rebooting ...")
def on_log(self, log):
status = self._('Kicked {num} stations\n').format(num=log.deauthed)
status += self._('Made {num} new friends\n').format(num=log.associated)
status += self._('Got {num} handshakes\n').format(num=log.handshakes)
if log.peers == 1:
def on_last_session_data(self, last_session):
status = self._('Kicked {num} stations\n').format(num=last_session.deauthed)
status += self._('Made {num} new friends\n').format(num=last_session.associated)
status += self._('Got {num} handshakes\n').format(num=last_session.handshakes)
if last_session.peers == 1:
status += self._('Met 1 peer')
elif log.peers > 0:
status += self._('Met {num} peers').format(num=log.peers)
elif last_session.peers > 0:
status += self._('Met {num} peers').format(num=last_session.peers)
return status
def on_log_tweet(self, log):
def on_last_session_tweet(self, last_session):
return self._(
'I\'ve been pwning for {duration} and kicked {deauthed} clients! I\'ve also met {associated} new friends and ate {handshakes} handshakes! #pwnagotchi #pwnlog #pwnlife #hacktheplanet #skynet').format(
duration=log.duration_human,
deauthed=log.deauthed,
associated=log.associated,
handshakes=log.handshakes)
duration=last_session.duration_human,
deauthed=last_session.deauthed,
associated=last_session.associated,
handshakes=last_session.handshakes)
def hhmmss(self, count, fmt):
if count > 1: