Add display

- Now the plugins display some statistics from WIGLE

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
This commit is contained in:
Frédéric
2025-02-09 18:44:02 +01:00
committed by GitHub
parent 0b3b38bb44
commit 2893632ee5

View File

@ -20,6 +20,11 @@ from pwnagotchi.utils import (
) )
from pwnagotchi import plugins from pwnagotchi import plugins
from pwnagotchi._version import __version__ as __pwnagotchi_version__ from pwnagotchi._version import __version__ as __pwnagotchi_version__
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import Text
from pwnagotchi.ui.view import BLACK
from scapy.all import Scapy_Exception from scapy.all import Scapy_Exception
@ -28,6 +33,7 @@ class Wigle(plugins.Plugin):
__version__ = "4.0.0" __version__ = "4.0.0"
__license__ = "GPL3" __license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net" __description__ = "This plugin automatically uploads collected WiFi to wigle.net"
LABEL_SPACING = 0
def __init__(self): def __init__(self):
self.ready = False self.ready = False
@ -35,6 +41,16 @@ class Wigle(plugins.Plugin):
self.skip = list() self.skip = list()
self.lock = Lock() self.lock = Lock()
self.options = dict() self.options = dict()
self.statistics = dict(
ready=False,
username=None,
rank=None,
monthrank=None,
discoveredwiFi=None,
last=None,
)
self.last_stat = datetime.now(tz=UTC)
self.ui_counter = 0
def on_config_changed(self, config): def on_config_changed(self, config):
api_name = self.options.get("api_name", None) api_name = self.options.get("api_name", None)
@ -44,10 +60,11 @@ class Wigle(plugins.Plugin):
return return
self.auth = (api_name, api_key) self.auth = (api_name, api_key)
self.donate = self.options.get("donate", False) self.donate = self.options.get("donate", False)
self.handshake_dir = config["bettercap"].get("handshakes", None) self.handshake_dir = config["bettercap"].get("handshakes")
self.cvs_dir = self.options.get("cvs_dir", None) self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", None) self.whitelist = config["main"].get("whitelist", [])
self.timeout = config["main"].get("timeout", 30) self.timeout = self.options.get("timeout", 30)
self.position = self.options.get("position", (10, 10))
self.ready = True self.ready = True
logging.info("[WIGLE] Ready for wardriving!!!") logging.info("[WIGLE] Ready for wardriving!!!")
@ -174,7 +191,7 @@ class Wigle(plugins.Plugin):
except Exception as exp: except Exception as exp:
logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}") logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
def upload_to_wigle(self, reported, cvs_filename, cvs_content, no_err_entries): def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
try: try:
json_res = requests.post( json_res = requests.post(
"https://api.wigle.net/api/v2/file/upload", "https://api.wigle.net/api/v2/file/upload",
@ -188,37 +205,94 @@ class Wigle(plugins.Plugin):
raise requests.exceptions.RequestException(json_res["message"]) raise requests.exceptions.RequestException(json_res["message"])
reported += no_err_entries reported += no_err_entries
self.report.update(data={"reported": reported}) self.report.update(data={"reported": reported})
logging.info("[WIGLE] Successfully uploaded %d wifis", len(no_err_entries)) logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
except (requests.exceptions.RequestException, OSError) as exp: except (requests.exceptions.RequestException, OSError) as exp:
self.skip += no_err_entries self.skip += no_err_entries
logging.debug(f"[WIGLE] Exception while uploading: {exp}") logging.debug(f"[WIGLE] Exception while uploading: {exp}")
def upload_new_handshakes(self, reported, new_gps_files, agent):
logging.info("[WIGLE] Uploading new handshakes to wigle.net")
csv_entries, no_err_entries = list(), list()
for gps_file in new_gps_files:
logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
if (
(pcap_filename := self.get_pcap_filename(gps_file))
and (gps_data := self.get_gps_data(gps_file))
and (pcap_data := self.get_pcap_data(pcap_filename))
):
csv_entries.append((gps_data, pcap_data))
no_err_entries.append(gps_file)
else:
self.skip.append(gps_file)
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
if csv_entries:
cvs_filename, cvs_content = self.generate_csv(csv_entries)
self.save_to_file(cvs_filename, cvs_content)
display = agent.view()
display.on_uploading("wigle.net")
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
display.on_normal()
def get_statistics(self):
if (datetime.now(tz=UTC) - self.last_stat).total_seconds() < 30:
return
self.last_stat = datetime.now(tz=UTC)
try:
self.statistics["ready"] = False
json_res = requests.get(
"https://api.wigle.net/api/v2/stats/user",
headers={"Accept": "application/json"},
auth=self.auth,
timeout=self.timeout,
).json()
if not json_res["success"]:
return
self.statistics["ready"] = True
self.statistics["username"] = json_res["user"]
self.statistics["rank"] = json_res["rank"]
self.statistics["monthrank"] = json_res["monthRank"]
self.statistics["discoveredwiFi"] = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.statistics["last"] = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
except (requests.exceptions.RequestException, OSError) as exp:
pass
def on_internet_available(self, agent): def on_internet_available(self, agent):
if not self.ready: if not self.ready:
return return
with self.lock: with self.lock:
reported = self.report.data_field_or("reported", default=list()) reported = self.report.data_field_or("reported", default=list())
if new_gps_files := self.get_new_gps_files(reported): if new_gps_files := self.get_new_gps_files(reported):
logging.info("[WIGLE] Uploading new handshakes to wigle.net") self.upload_new_handshakes(reported, new_gps_files, agent)
csv_entries, no_err_entries = list(), list() else:
for gps_file in new_gps_files: self.get_statistics()
logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
if ( def on_ui_setup(self, ui):
(pcap_filename := self.get_pcap_filename(gps_file)) with ui._lock:
and (gps_data := self.get_gps_data(gps_file)) ui.add_element(
and (pcap_data := self.get_pcap_data(pcap_filename)) "wigle",
): Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
csv_entries.append((gps_data, pcap_data)) )
no_err_entries.append(gps_file)
else: def on_unload(self, ui):
self.skip.append(gps_file) with ui._lock:
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}") ui.remove_element("wigle")
if csv_entries:
cvs_filename, cvs_content = self.generate_csv(csv_entries) def on_ui_update(self, ui):
self.save_to_file(cvs_filename, cvs_content) if not self.ready:
display = agent.view() return
display.on_uploading("wigle.net") with ui._lock:
self.upload_to_wigle( if not self.statistics["ready"]:
reported, cvs_filename, cvs_content, no_err_entries ui.set("wigle", "We Will Wait Wigle")
) return
display.on_normal() msg = "-"
self.ui_counter = (self.ui_counter + 1) % 4
if self.ui_counter == 0:
msg = f"User:{self.statistics['username']}"
if self.ui_counter == 1:
msg = f"Rank:{self.statistics['rank']} Monthly:{self.statistics['monthrank']}"
elif self.ui_counter == 2:
msg = f"{self.statistics['discoveredwiFi']} discovered WiFis"
elif self.ui_counter == 3:
msg = f"Last report:{self.statistics['last']}"
ui.set("wigle", msg)