modified: .gitignore

new file:   pwnagotchi/plugins/default/cache.py
	modified:   pwnagotchi/plugins/default/wigle.py
- Add a cache plugin
- Add statistics to wigle
This commit is contained in:
frédéric
2025-02-12 13:02:22 +01:00
parent 54986ef831
commit e1f22cd6a0
3 changed files with 173 additions and 36 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
*.pyc
*.pyc
.vscode

View File

@ -0,0 +1,53 @@
import logging
import json
import os
import re
import pwnagotchi.plugins as plugins
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
import pwnagotchi.ui.fonts as fonts
class Cache(plugins.Plugin):
__author__ = "fmatray"
__version__ = "1.0.0"
__license__ = "GPL3"
__description__ = "A simple plugin to cache AP informations"
def __init__(self):
self.options = dict()
def on_config_changed(self, config):
self.handshake_dir = config["bettercap"].get("handshakes")
self.cache_dir = os.path.join(self.handshake_dir, "cache")
if not (os.path.exists(self.cache_dir)):
os.mkdir(self.cache_dir)
def get_cache(self, file):
cache_filename = os.path.basename(
re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file)
)
cache_filename = os.path.join(self.cache_dir, cache_filename)
if not os.path.exists(cache_filename):
return None
try:
with open(cache_filename, "r") as f:
return json.load(f)
except Exception as e:
return None
def cache_ap(self, ap):
mac = ap["mac"].replace(":", "")
hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"])
filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache")
with open(filename, "w") as f:
json.dump(ap, f)
def on_unfiltered_ap_list(self, agent, aps):
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], aps):
self.cache_ap(ap)
def on_handshake(self, agent, filename, access_point, client_station):
logging.info(f"[WIGLE] on_handshake")
self.cache_ap(access_point)

View File

@ -9,6 +9,7 @@ from glob import glob
from threading import Lock
from io import StringIO
from datetime import datetime, UTC
from dataclasses import dataclass
from flask import make_response, redirect
from pwnagotchi.utils import (
@ -28,6 +29,40 @@ from pwnagotchi.ui.view import BLACK
from scapy.all import Scapy_Exception
@dataclass
class WigleStatistics:
ready: bool = False
username: str = None
rank: int = None
monthrank: int = None
discoveredwiFi: int = None
last: str = None
groupID: str = None
groupname: str = None
grouprank: int = None
def update_user(self, json_res):
self.ready = True
self.username = json_res["user"]
self.rank = json_res["rank"]
self.monthrank = json_res["monthRank"]
self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
def update_user_group(self, json_res):
self.groupID = json_res["groupId"]
self.groupname = json_res["groupName"]
def update_group(self, json_res):
rank = 1
for group in json_res["groups"]:
if group['groupId'] == self.groupID:
self.grouprank = rank
rank += 1
class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony and fmatray"
__version__ = "4.0.0"
@ -41,14 +76,7 @@ class Wigle(plugins.Plugin):
self.skip = list()
self.lock = Lock()
self.options = dict()
self.statistics = dict(
ready=False,
username=None,
rank=None,
monthrank=None,
discoveredwiFi=None,
last=None,
)
self.statistics = WigleStatistics()
self.last_stat = datetime.now(tz=UTC)
self.ui_counter = 0
@ -61,6 +89,9 @@ class Wigle(plugins.Plugin):
self.handshake_dir = config["bettercap"].get("handshakes")
report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
self.report = StatusFile(report_filename, data_format="json")
self.cache_dir = os.path.join(self.handshake_dir, "cache")
if not (os.path.exists(self.cache_dir)):
os.mkdir(self.cache_dir)
self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", [])
self.timeout = self.options.get("timeout", 30)
@ -86,6 +117,17 @@ class Wigle(plugins.Plugin):
return None
return pcap_filename
def get_cache(self, pcap_file):
cache_filename = os.path.basename(pcap_file.replace(".pcap", ".cache"))
cache_filename = os.path.join(self.cache_dir, cache_filename)
if not os.path.exists(cache_filename):
return None
try:
with open(cache_filename, "r") as f:
return json.load(f)
except Exception as e:
return None
@staticmethod
def extract_gps_data(path):
"""
@ -120,8 +162,17 @@ class Wigle(plugins.Plugin):
return None
return gps_data
@staticmethod
def get_pcap_data(pcap_filename):
def get_pcap_data(self, pcap_filename):
if cache := self.get_cache(pcap_filename):
logging.info(f"[WIGLE] Using cache for {pcap_filename}")
return {
WifiInfo.BSSID: cache["mac"],
WifiInfo.ESSID: cache["hostname"],
WifiInfo.ENCRYPTION: cache["encryption"],
WifiInfo.CHANNEL: cache["channel"],
WifiInfo.FREQUENCY: cache["frequency"],
WifiInfo.RSSI: cache["rssi"],
}
try:
pcap_data = extract_from_pcap(
pcap_filename,
@ -236,31 +287,46 @@ class Wigle(plugins.Plugin):
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
display.on_normal()
def get_statistics(self, force=False):
if not force and (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30:
return
self.last_stat = datetime.now(tz=UTC)
def request_statistics(self, url):
try:
self.statistics["ready"] = False
json_res = requests.get(
"https://api.wigle.net/api/v2/stats/user",
return requests.get(
url,
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
timeout=self.timeout,
).json()
if not json_res["success"]:
return
self.statistics["ready"] = True
self.statistics["username"] = json_res["user"]
self.statistics["rank"] = json_res["rank"]
self.statistics["monthrank"] = json_res["monthRank"]
self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
except (requests.exceptions.RequestException, OSError) as exp:
pass
return None
def get_user_statistics(self):
json_res = self.request_statistics(
"https://api.wigle.net/api/v2/stats/user",
)
if json_res and json_res["success"]:
self.statistics.update_user(json_res)
def get_usergroup_statistics(self):
if not self.statistics.username or self.statistics.groupID:
return
url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}"
if json_res := self.request_statistics(url):
self.statistics.update_user_group(json_res)
def get_group_statistics(self):
if not self.statistics.groupID:
return
json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group")
if json_res and json_res["success"]:
self.statistics.update_group(json_res)
def get_statistics(self, force=False):
if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30:
self.last_stat = datetime.now(tz=UTC)
self.get_user_statistics()
self.get_usergroup_statistics()
self.get_group_statistics()
def on_internet_available(self, agent):
if not self.ready:
@ -272,6 +338,21 @@ class Wigle(plugins.Plugin):
else:
self.get_statistics()
def cache_ap(self, ap):
mac = ap["mac"].replace(":", "")
hostname = re.sub(r"[^a-zA-Z0-9]", "", ap["hostname"])
filename = os.path.join(self.cache_dir, f"{hostname}_{mac}.cache")
with open(filename, "w") as f:
json.dump(ap, f)
def on_unfiltered_ap_list(self, agent, aps):
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], aps):
self.cache_ap(ap)
def on_handshake(self, agent, filename, access_point, client_station):
logging.info(f"[WIGLE] on_handshake")
self.cache_ap(access_point)
def on_ui_setup(self, ui):
with ui._lock:
ui.add_element(
@ -284,20 +365,22 @@ class Wigle(plugins.Plugin):
ui.remove_element("wigle")
def on_ui_update(self, ui):
if not self.ready:
return
with ui._lock:
if not self.statistics["ready"]:
if not (self.ready and self.statistics.ready):
ui.set("wigle", "We Will Wait Wigle")
return
msg = "-"
self.ui_counter = (self.ui_counter + 1) % 4
self.ui_counter = (self.ui_counter + 1) % 6
if self.ui_counter == 0:
msg = f"User:{self.statistics['username']}"
msg = f"User:{self.statistics.username}"
if self.ui_counter == 1:
msg = f"Rank:{self.statistics['rank']} Month:{self.statistics['monthrank']}"
msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}"
elif self.ui_counter == 2:
msg = f"{self.statistics['discoveredwiFi']} discovered WiFis"
msg = f"{self.statistics.discoveredwiFi} discovered WiFis"
elif self.ui_counter == 3:
msg = f"Last upl.:{self.statistics['last']}"
msg = f"Last upl.:{self.statistics.last}"
elif self.ui_counter == 4:
msg = f"Grp:{self.statistics.groupname}"
elif self.ui_counter == 5:
msg = f"Grp rank:{self.statistics.grouprank}"
ui.set("wigle", msg)