Compare commits

..

119 Commits
v2.9.5 ... noai

Author SHA1 Message Date
c3df982870 Merge pull request #375 from Sniffleupagus/fixRot90
Allow rotation 90 and 270 degrees on screens
2025-03-21 15:47:00 +01:00
6f7b07f21f Merge pull request #380 from wpa-2/patch-3
Update defaults.toml
2025-03-21 15:42:44 +01:00
4c583cf184 Update defaults.toml
removed dupe entries 
iface = "wlan0mon"
mon_start_cmd = "/usr/bin/monstart"
mon_stop_cmd = "/usr/bin/monstop"
mon_max_blind_epochs = 5
no_restart = false

Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-03-20 17:27:31 +00:00
9a8616df51 Merge branch 'jayofelony:noai' into fixRot90 2025-03-17 16:02:24 -07:00
28c2ef294c Support 90 and 270 rotation 2025-03-17 15:36:16 -07:00
255ca31837 Merge pull request #374 from wpa-2/noai
Update defaults.toml
2025-03-14 07:51:52 +01:00
661687e180 Update defaults.toml
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-03-13 19:20:02 +00:00
fb8ee9dbee Keep toml installed
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-03-09 12:04:50 +01:00
d98be99c6b Add PwnCrack.org plugin
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-03-09 11:59:33 +01:00
08e9656bb5 Merge pull request #367 from Sniffleupagus/toml_two_times_baby
Toml two times baby
2025-03-08 23:50:27 +01:00
dbcc488900 Load dotted toml files with old toml library, new ones with tomlkit to preserve format and comments 2025-03-08 13:26:13 -08:00
94b01b2fc7 get_bbox fails on 2.9.5. get_size works 2025-03-08 13:25:32 -08:00
9b4b239deb Merge pull request #364 from wpa-2/noai
Update defaults.toml
2025-03-08 13:00:16 +01:00
4fc029d638 Update defaults.toml
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-03-08 10:35:45 +00:00
9d0ada2a33 defaults.toml small edit
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-03-08 09:12:08 +01:00
da156fde4b defaults.toml small edit
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-03-08 09:12:00 +01:00
edaadfdbd4 auto-update using a github token, to avoid getting rate limited. This is optional
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-03-08 09:09:44 +01:00
0341ac0202 Changed TOML format and parser
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-03-08 09:08:52 +01:00
60832e788d Merge pull request #362 from B0r1s-B4d3n0v/noai
Log entry when Pwnagotchi is Re|Started
2025-03-08 07:27:41 +01:00
87a51d1de7 Merge pull request #361 from wpa-2/noai
Add files via upload
2025-03-08 07:27:19 +01:00
db85c68e63 Log Pwnagotchi Re|Started
Adds a line to the log file to identify when pwnagotchi was restarted
2025-03-07 10:51:08 -06:00
c98745cd1c Add files via upload
Changed log interval time 

Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-03-07 14:32:06 +00:00
d91fe8e17e Merge pull request #358 from JdaieLin/noai
fix: use same curves as pisugar-power-manager
2025-03-03 09:59:52 +01:00
8da625e7e2 fix: use same curves as pisugar-power-manager 2025-03-03 08:57:00 +08:00
6bab0b36d1 Merge pull request #353 from wpa-2/noai
Makes backups work better inside windows and Linux
2025-02-27 17:55:45 +01:00
19de2a2d8f Update defaults.toml
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-26 19:13:15 +00:00
5d4c2e3b3a Add files via upload
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-26 19:12:49 +00:00
e7eba208bd Merge branch 'jayofelony:noai' into noai 2025-02-26 18:45:37 +00:00
b47371cfe6 Update defaults.toml
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-26 18:45:28 +00:00
56b70fcc83 Merge pull request #351 from wpa-2/noai
sorry missed a line
2025-02-25 22:59:15 +01:00
d698c35fef Merge branch 'jayofelony:noai' into noai 2025-02-25 18:31:33 +00:00
1f5785f823 Update defaults.toml
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-25 18:31:21 +00:00
d78220b9ba Merge pull request #349 from C0d3-5t3w/patch-1
Update README.md
2025-02-25 09:44:39 +01:00
faca5b2904 Update README.md
Twitter is now X.

Signed-off-by: 5T3W <stewy1994@gmail.com>
2025-02-24 22:57:44 -08:00
234b38e4e4 Merge pull request #348 from wpa-2/noai
New autobackup plugin as default, fixes to backup scripts,
2025-02-24 18:17:23 +01:00
27b9ec49da Add files via upload
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-24 16:20:59 +00:00
87730543d1 Add files via upload
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-24 16:20:27 +00:00
67e28fa7ab Update defaults.toml
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-24 16:20:02 +00:00
7285863c00 Merge branch 'jayofelony:noai' into noai 2025-02-24 15:57:12 +00:00
23eae9c65a Merge pull request #341 from fmatray/noai
Add Cache plugin, improve Wigle, debug bt-tether, add more sentences to voices
2025-02-17 14:16:58 +01:00
89a85bed78 Merge pull request #342 from wlmh110/noai
PiSugar Plugin update
2025-02-17 14:14:43 +01:00
393981e0ba Remove debgging mesasge in cache 2025-02-17 11:01:16 +01:00
31e542f45a Merge branch 'noai' into noai
Signed-off-by: 铲屎将军 <37292630+wlmh110@users.noreply.github.com>
2025-02-17 16:26:59 +08:00
c433a6c2d5 remove debug messages 2025-02-17 00:49:46 +01:00
7dd809afe0 add otion for cache 2025-02-16 20:29:06 +01:00
d7f7dac0d7 - Catch an exception on lstat() 2025-02-16 20:13:01 +01:00
2dc45bc4b4 - Add more sentences to voice.py, with some geek references 2025-02-16 20:07:22 +01:00
29d1ca6728 Bt-tether: add autoconnect to device
Cache:
- add cache to all hools with access point
- add cleaning
Wigle: use cache
2025-02-13 22:17:09 +01:00
1d635d955b BT-Tether: add a check to DNS config 2025-02-12 14:10:33 +01:00
f20d4017bd Merge branch 'noai' of https://github.com/fmatray/pwnagotchi into noai 2025-02-12 13:04:41 +01:00
5d668ae34e update wigle.py 2025-02-12 13:04:37 +01:00
9d0df49fb6 add cache.py 2025-02-12 13:04:14 +01:00
e1f22cd6a0 modified: .gitignore
new file:   pwnagotchi/plugins/default/cache.py
	modified:   pwnagotchi/plugins/default/wigle.py
- Add a cache plugin
- Add statistics to wigle
2025-02-12 13:02:22 +01:00
f4a7588a48 Update for wigle
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-02-12 12:59:28 +01:00
c66728f03e Edits for defaults.toml
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-02-12 10:42:13 +01:00
20ddc9425c Update defaults.toml
The charging protection feature is disabled by default.
2025-02-12 11:32:14 +08:00
e1bcea681a Next version will be 2.9.5.4
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-02-11 10:33:23 +01:00
43d58b77c9 Remove wardriver config from defaults.toml
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-02-11 10:31:57 +01:00
40918029e7 Small changes
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-02-11 10:28:20 +01:00
54986ef831 Merge pull request #333 from fmatray/noai
Updates for BT-Tether, Wigle and OHCAPI
2025-02-10 21:41:17 +01:00
1400e8aac8 Update defaults.toml
Add options for bt-tether and Wigle

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-10 20:29:13 +01:00
5a398c70bb wigle.py update
- '.wigle_uploads' is no longer hardcoded but moved to the handshakes directory

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-10 19:42:51 +01:00
81d377f491 Set bettercap "ble.recon off"
Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-10 19:34:19 +01:00
1135ec3df1 Update pisugarx.py
bug fix
2025-02-10 19:11:49 +08:00
d3231a11ce Update pisugarx.py
bug fix
2025-02-10 19:08:19 +08:00
a00d94b520 Update pisugarx.py 2025-02-10 18:26:52 +08:00
beb8308969 Update pisugarx.py
Added charging voltage protection function
2025-02-10 18:23:26 +08:00
2893632ee5 Add display
- Now the plugins display some statistics from WIGLE

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-09 18:44:02 +01:00
0b3b38bb44 Wigle.py update
- Correct the CSV format 
- Add 2 missing columns: frequency(populated) and RCOI(not populated)
- Before sending to wigle, if cvs_dir is set, the CVS data ti written to that directory
- Update auth method to match the api: api_name + api_key
- Refactoring into only one plugin class


Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-09 01:50:57 +01:00
d4874a18e1 utils.py update
- Add WifiInfo.FREQUENCY to extract_from_pcap()
- Move the "Invalid field" to the the "else" as a default

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-09 01:32:59 +01:00
5a113a2163 wigle.py update
- CSV format debug
- Update auth method to API Name+key
- Add logs
- Code refactor and cleaning

stil debugging

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-08 17:29:33 +01:00
e82621c80b update
New Features:

Low Battery Shutdown Functionality
Bug Fixes:

Resolved an issue where the main process would become blocked when unable to connect to a device.
2025-02-08 17:52:54 +08:00
1337494e74 bt-tether.py update
- Add a delay before going up to give a change to the NetworkManager to get ready
- now check DNS format

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-08 00:52:57 +01:00
675e275f34 Add web hook to bt-tether.py
Webhook now show Bluetooth, Device and Connection configuration

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-08 00:07:41 +01:00
7a0301b57f bt-tether rework
- Move connexion to NetworkManager autoconnect for more reliability
- Check options formats for several values
- Add a dns option to be able to choose another DNS provider (ex: OpenNIC for no traceability)
- The "BT" show more information (Device Connected/disconnected, Connexion Up/Down)
- Status now show a message on error
- nmcli calls are non-blocking  
- Less logs when disconnected (ex: phone's bluetooth is off) 

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-02-07 22:34:52 +01:00
4774a983f9 Merge branch 'jayofelony:noai' into noai 2025-02-06 23:49:30 +01:00
83b9077e09 Fix gdrivesync defaults
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-02-02 22:22:27 +01:00
ef0680a140 Delete scripts/restore.sh
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-02 19:36:01 +00:00
a54579cf3d Delete scripts/backup.sh
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-02 19:35:51 +00:00
c08f33c2c5 Merge branch 'jayofelony:noai' into noai 2025-02-02 19:35:00 +00:00
fe7e7ec31a Backup and restore edits
Windows and linux files 

Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-02-02 19:34:51 +00:00
10d387afcf Fix gdrivesync.py
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-29 12:50:02 +01:00
4748b671bc Bug report update
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-27 12:04:07 +01:00
17f04f7506 Merge pull request #318
Added DNS check to plugins command
2025-01-27 10:24:45 +01:00
76c4888e88 Merge pull request #319
Update gdrivesync.py
2025-01-27 10:23:58 +01:00
d44b8d02b6 Merge remote-tracking branch 'origin/noai' into noai 2025-01-27 10:23:05 +01:00
02454597b2 Version 2.9.5.3
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-27 10:22:55 +01:00
cb207d4d70 Update gdrivesync.py
Updated backup items

Signed-off-by: findingmoist <128169791+findingmoist@users.noreply.github.com>
2025-01-25 20:06:15 -05:00
7b9150af6b Merge branch 'jayofelony:noai' into noai 2025-01-25 23:02:21 +00:00
8787c1bdd3 Update cmd.py
Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
2025-01-25 23:02:06 +00:00
ec93838b7a Update defaults.toml 2025-01-25 17:26:31 +01:00
f6d5a481bb Removed unused import
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-24 23:48:38 +01:00
eee3eb962b Small changes on pisugarx.py
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-24 23:48:25 +01:00
847e9f5908 Enable auto-update by default now.
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-24 23:47:58 +01:00
af1544535c Merge pull request #316 from wlmh110/noai
Update pisugarx.py
2025-01-24 09:41:50 +01:00
0fbf209881 Update pisugarx.py
Complete the basic information retrieval for the PiSugar power module without the need for additional installation of PiSugar-server:
1. Automatically determine the model of PiSugar in use.
2. Automatically initialize the power module.
3. Retrieve the battery level and charging status.
2025-01-24 15:16:51 +08:00
cf14f3f663 Another fix for bt-tether
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-21 17:30:06 +01:00
948fe89ce6 Another fix for bt-tether
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-21 14:37:02 +01:00
89af46c6fc Update ohcapi.py
A more elegant code to test the internet connection

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 22:01:10 +01:00
8d1a5babe8 Version 2.9.5.2 will be next
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-20 21:30:58 +01:00
042d5ba765 Change in defaults for PwnDroid.
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-20 21:13:17 +01:00
58857058a4 Revert "PwnDroid, new plugin for the companion app on Android to share GPS data from your Android phone."
This reverts commit 0e06d3bd76.
2025-01-20 21:11:40 +01:00
5e6443ae58 Merge remote-tracking branch 'origin/noai' into noai 2025-01-20 21:11:14 +01:00
0e06d3bd76 PwnDroid, new plugin for the companion app on Android to share GPS data from your Android phone.
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-20 21:10:46 +01:00
64f7c6e1e5 Merge pull request #311 from fmatray/noai
Web redirections for GRID, OHCAPI and Wigle
2025-01-20 19:18:10 +01:00
8442ce93be Update ohcapi.py
version update

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 15:43:55 +01:00
730fa7dc8e Update wigle.py
version update

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 15:43:35 +01:00
0959140098 Update grid.py
Rediction to https://opwngrid.xyz in the plugins' page

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 15:41:49 +01:00
7c4764bff8 Update ohcapi.py
add a web hook to redirect to onlinehashcrack

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 15:30:00 +01:00
e179165850 Update wigle.py
Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 15:28:46 +01:00
0140a1fc97 Update wigle.py
add a redirection to wiggle.net

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-20 15:27:54 +01:00
cb0c6c6e44 Quick release, because of error in bt-tether plugin.
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-18 10:30:52 +01:00
0ceeb94111 https://github.com/jayofelony/pwnagotchi/issues/300
It needed a comma at the end of the line. Silly me.

Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-18 10:29:41 +01:00
6bf50a36dd Merge remote-tracking branch 'origin/noai' into noai 2025-01-18 10:28:09 +01:00
fb0fda4d0d https://github.com/jayofelony/pwnagotchi/issues/300
Revert, as this is the correct method.

Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-18 10:27:59 +01:00
51f86d0286 Merge pull request #302 from fmatray/noai
Update ohcapi.py
2025-01-16 07:37:10 +01:00
614e844a51 https://github.com/jayofelony/pwnagotchi/issues/300
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-15 21:58:15 +01:00
fd16b2c94d Update ohcapi.py
catch an exception when internet is not available

Signed-off-by: Frédéric <fmatray@users.noreply.github.com>
2025-01-13 15:32:48 +01:00
a4d5930477 Set ohcapi to false
Signed-off-by: Jeroen Oudshoorn <oudshoorn.jeroen@gmail.com>
2025-01-12 22:36:42 +01:00
30 changed files with 2384 additions and 703 deletions

View File

@ -33,7 +33,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- 2.9.4-2
- 2.9.5.2
- 2.9.5.3
default: 0
validations:
required: true

View File

@ -33,7 +33,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- 2.9.4-2
- 2.9.5.2
- 2.9.5.3
default: 0
validations:
required: true

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.pyc
.vscode

View File

@ -39,4 +39,4 @@ https://pwnagotchi.org
## License
`pwnagotchi` created by [@evilsocket](https://twitter.com/evilsocket) and updated by [us](https://github.com/jayofelony/pwnagotchi/graphs/contributors). It is released under the GPL3 license.
`pwnagotchi` created by [@evilsocket](https://x.com/evilsocket) and updated by [us](https://github.com/jayofelony/pwnagotchi/graphs/contributors). It is released under the GPL3 license.

View File

@ -1 +1 @@
__version__ = '2.9.5'
__version__ = '2.9.5.4'

View File

@ -3,7 +3,7 @@ import argparse
import time
import signal
import sys
import toml
import tomlkit
import requests
import os
import re
@ -14,7 +14,7 @@ from pwnagotchi.google import cmd as google_cmd
from pwnagotchi.plugins import cmd as plugins_cmd
from pwnagotchi import log
from pwnagotchi import fs
from pwnagotchi.utils import DottedTomlEncoder, parse_version as version_to_tuple
from pwnagotchi.utils import parse_version as version_to_tuple
def pwnagotchi_cli():
@ -187,12 +187,14 @@ def pwnagotchi_cli():
if pwn_name == "":
pwn_name = "Pwnagotchi"
print("I shall go by Pwnagotchi from now on!")
pwn_name = f"main.name = \"{pwn_name}\"\n"
pwn_name = (f"[main]\n"
f"name = \"{pwn_name}\"\n")
f.write(pwn_name)
else:
if is_valid_hostname(pwn_name):
print(f"I shall go by {pwn_name} from now on!")
pwn_name = f"main.name = \"{pwn_name}\"\n"
pwn_name = (f"[main]\n"
f"name = \"{pwn_name}\"\n")
f.write(pwn_name)
else:
print("You have chosen an invalid name. Please start over.")
@ -203,7 +205,7 @@ def pwnagotchi_cli():
"Be sure to use digits as your answer.\n\n"
"Amount of networks: ")
if int(pwn_whitelist) > 0:
f.write("main.whitelist = [\n")
f.write("whitelist = [\n")
for x in range(int(pwn_whitelist)):
ssid = input("SSID (Name): ")
bssid = input("BSSID (MAC): ")
@ -215,42 +217,45 @@ def pwnagotchi_cli():
pwn_bluetooth = input("Do you want to enable BT-Tether?\n\n"
"[Y/N] ")
if pwn_bluetooth.lower() in ('y', 'yes'):
f.write("main.plugins.bt-tether.enabled = true\n\n")
f.write("[main.plugins.bt-tether]\n"
"enabled = true\n\n")
pwn_bluetooth_phone_name = input("What name uses your phone, check settings?\n\n")
if pwn_bluetooth_phone_name != "":
f.write(f"main.plugins.bt-tether.phone-name = \"{pwn_bluetooth_phone_name}\"\n")
f.write(f"phone-name = \"{pwn_bluetooth_phone_name}\"\n")
pwn_bluetooth_device = input("What device do you use? android or ios?\n\n"
"Device: ")
if pwn_bluetooth_device != "":
if pwn_bluetooth_device != "android" and pwn_bluetooth_device != "ios":
print("You have chosen an invalid device. Please start over.")
exit()
f.write(f"main.plugins.bt-tether.phone = \"{pwn_bluetooth_device.lower()}\"\n")
f.write(f"phone = \"{pwn_bluetooth_device.lower()}\"\n")
if pwn_bluetooth_device == "android":
f.write("main.plugins.bt-tether.ip = \"192.168.44.44\"\n")
f.write("ip = \"192.168.44.44\"\n")
elif pwn_bluetooth_device == "ios":
f.write("main.plugins.bt-tether.ip = \"172.20.10.6\"\n")
f.write("ip = \"172.20.10.6\"\n")
pwn_bluetooth_mac = input("What is the bluetooth MAC of your device?\n\n"
"MAC: ")
if pwn_bluetooth_mac != "":
f.write(f"main.plugins.bt-tether.mac = \"{pwn_bluetooth_mac}\"\n")
f.write(f"mac = \"{pwn_bluetooth_mac}\"\n")
# set up display settings
pwn_display_enabled = input("Do you want to enable a display?\n\n"
"[Y/N]: ")
if pwn_display_enabled.lower() in ('y', 'yes'):
f.write("ui.display.enabled = true\n")
f.write("[ui.display]\n"
"enabled = true\n")
pwn_display_type = input("What display do you use?\n\n"
"Be sure to check for the correct display type @ \n"
"https://github.com/jayofelony/pwnagotchi/blob/master/pwnagotchi/utils.py#L240-L501\n\n"
"Display type: ")
if pwn_display_type != "":
f.write(f"ui.display.type = \"{pwn_display_type}\"\n")
f.write(f"type = \"{pwn_display_type}\"\n")
pwn_display_invert = input("Do you want to invert the display colors?\n"
"N = Black background\n"
"Y = White background\n\n"
"[Y/N]: ")
if pwn_display_invert.lower() in ('y', 'yes'):
f.write("ui.invert = true\n")
f.write("[ui]\n"
"invert = true\n")
f.close()
if pwn_bluetooth.lower() in ('y', 'yes'):
if pwn_bluetooth_device.lower == "android":
@ -300,7 +305,7 @@ def pwnagotchi_cli():
config = utils.load_config(args)
if args.print_config:
print(toml.dumps(config, encoder=DottedTomlEncoder()))
print(tomlkit.dumps(config))
sys.exit(0)
from pwnagotchi.identity import KeyPair

View File

@ -1,181 +1,257 @@
main.name = "pwnagotchi"
main.lang = "en"
main.whitelist = [
[main]
name = "pwnagotchi"
lang = "en"
iface = "wlan0mon"
mon_start_cmd = "/usr/bin/monstart"
mon_stop_cmd = "/usr/bin/monstop"
mon_max_blind_epochs = 5
no_restart = false
whitelist = [
"EXAMPLE_NETWORK",
"ANOTHER_EXAMPLE_NETWORK",
"fo:od:ba:be:fo:od",
"fo:od:ba"
]
main.confd = "/etc/pwnagotchi/conf.d/"
main.custom_plugin_repos = [
confd = "/etc/pwnagotchi/conf.d/"
custom_plugin_repos = [
"https://github.com/jayofelony/pwnagotchi-torch-plugins/archive/master.zip",
"https://github.com/Sniffleupagus/pwnagotchi_plugins/archive/master.zip",
"https://github.com/NeonLightning/pwny/archive/master.zip",
"https://github.com/marbasec/UPSLite_Plugin_1_3/archive/master.zip",
"https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip"
"https://github.com/wpa-2/Pwnagotchi-Plugins/archive/master.zip",
"https://github.com/cyberartemio/wardriver-pwnagotchi-plugin/archive/main.zip"
]
custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/"
main.custom_plugins = "/usr/local/share/pwnagotchi/custom-plugins/"
[main.plugins.auto-tune]
enabled = true
main.plugins.auto-tune.enabled = true
[main.plugins.auto_backup]
enabled = false
interval = "daily" # or "hourly", or a number (minutes)
max_tries = 0
backup_location = "/home/pi/"
files = [
"/root/settings.yaml",
"/root/client_secrets.json",
"/root/.api-report.json",
"/root/.ssh",
"/root/.bashrc",
"/root/.profile",
"/home/pi/handshakes",
"/root/peers",
"/etc/pwnagotchi/",
"/usr/local/share/pwnagotchi/custom-plugins",
"/etc/ssh/",
"/home/pi/.bashrc",
"/home/pi/.profile",
"/home/pi/.wpa_sec_uploads"
]
exclude = [ "/etc/pwnagotchi/logs/*"]
commands = [ "tar cf {backup_file} {files}"]
main.plugins.auto-update.enabled = false
main.plugins.auto-update.install = false
main.plugins.auto-update.interval = 1
[main.plugins.auto-update]
enabled = true
install = true
interval = 1
token = "" # Create a personal access token (classic) with scope set to public_repo to use the GitHub API
main.plugins.bt-tether.enabled = false
main.plugins.bt-tether.phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
main.plugins.bt-tether.mac = ""
main.plugins.bt-tether.phone = "" # android or ios
main.plugins.bt-tether.ip = "" # 192.168.44.2 android / 172.20.10.2 ios
[main.plugins.bt-tether]
enabled = false
phone-name = "" # name as shown on the phone i.e. "Pwnagotchi's Phone"
mac = ""
phone = "" # android or ios
ip = "" # optional, default : 192.168.44.2 if android or 172.20.10.2 if ios
dns = "8.8.8.8 1.1.1.1" # optional, default (google): "8.8.8.8 1.1.1.1". Consider using anonymous DNS like OpenNic :-)
main.plugins.fix_services.enabled = true
[main.plugins.fix_services]
enabled = true
main.plugins.gdrivesync.enabled = false
main.plugins.gdrivesync.backupfiles = ['']
main.plugins.gdrivesync.backup_folder = "PwnagotchiBackups"
main.plugin.gdrivesync.interval = 1
[main.plugins.cache]
enabled = true
main.plugins.gpio_buttons.enabled = false
[main.plugins.gdrivesync]
enabled = false
backupfiles = [""]
backup_folder = "PwnagotchiBackups"
interval = 1
main.plugins.gps.enabled = false
main.plugins.gps.speed = 19200
main.plugins.gps.device = "/dev/ttyUSB0" # for GPSD: "localhost:2947"
[main.plugins.gpio_buttons]
enabled = false
main.plugins.gps_listener.enabled = false
[main.plugins.gps]
enabled = false
speed = 19200
device = "/dev/ttyUSB0" # for GPSD: "localhost:2947"
main.plugins.grid.enabled = true
main.plugins.grid.report = true
[main.plugins.gps_listener]
enabled = false
main.plugins.logtail.enabled = false
main.plugins.logtail.max-lines = 10000
[main.plugins.grid]
enabled = true
report = true
main.plugins.memtemp.enabled = false
main.plugins.memtemp.scale = "celsius"
main.plugins.memtemp.orientation = "horizontal"
[main.plugins.logtail]
enabled = false
max-lines = 10000
main.plugins.ohcapi.enabled = true
main.plugins.ohcapi.api_key = "sk_your_api_key_here"
main.plugins.ohcapi.receive_email = "yes"
[main.plugins.memtemp]
enabled = false
scale = "celsius"
orientation = "horizontal"
main.plugins.pisugarx.enabled = false
main.plugins.pisugarx.rotation = false
main.plugins.pisugarx.default_display = "percentage"
[main.plugins.ohcapi]
enabled = false
api_key = "sk_your_api_key_here"
receive_email = "yes"
main.plugins.session-stats.enabled = false
main.plugins.session-stats.save_directory = "/var/tmp/pwnagotchi/sessions/"
[main.plugins.pwndroid]
enabled = false
display = false # show coords on display
display_altitude = false # show altitude on display
main.plugins.ups_hat_c.enabled = false
main.plugins.ups_hat_c.label_on = true # show BAT label or just percentage
main.plugins.ups_hat_c.shutdown = 5 # battery percent at which the device will turn off
main.plugins.ups_hat_c.bat_x_coord = 140
main.plugins.ups_hat_c.bat_y_coord = 0
[main.plugins.pisugarx]
enabled = false
rotation = false
default_display = "percentage"
lowpower_shutdown = true
lowpower_shutdown_level = 10 # battery percent at which the device will turn off
max_charge_voltage_protection = false #It will limit the battery voltage to about 80% to extend battery life
main.plugins.ups_lite.enabled = false
main.plugins.ups_lite.shutdown = 2
[main.plugins.pwncrack]
enabled = false
key = ""
main.plugins.webcfg.enabled = true
[main.plugins.session-stats]
enabled = false
save_directory = "/var/tmp/pwnagotchi/sessions/"
main.plugins.webgpsmap.enabled = false
[main.plugins.ups_hat_c]
enabled = false
label_on = true # show BAT label or just percentage
shutdown = 5 # battery percent at which the device will turn off
bat_x_coord = 140
bat_y_coord = 0
main.plugins.wigle.enabled = false
main.plugins.wigle.api_key = ""
main.plugins.wigle.donate = false
[main.plugins.ups_lite]
enabled = false
shutdown = 2
main.plugins.wpa-sec.enabled = false
main.plugins.wpa-sec.api_key = ""
main.plugins.wpa-sec.api_url = "https://wpa-sec.stanev.org"
main.plugins.wpa-sec.download_results = false
main.plugins.wpa-sec.show_pwd = false
[main.plugins.webcfg]
enabled = true
main.iface = "wlan0mon"
main.mon_start_cmd = "/usr/bin/monstart"
main.mon_stop_cmd = "/usr/bin/monstop"
main.mon_max_blind_epochs = 5
main.no_restart = false
[main.plugins.webgpsmap]
enabled = false
main.log.path = "/etc/pwnagotchi/log/pwnagotchi.log"
main.log.path-debug = "/etc/pwnagotchi/log/pwnagotchi-debug.log"
main.log.rotation.enabled = true
main.log.rotation.size = "10M"
[main.plugins.wigle]
enabled = false
api_key = "" # mandatory
cvs_dir = "/tmp" # optionnal, is set, the CVS is written to this directory
donate = false # default: off
timeout = 30 # default: 30
position = [7, 85] # optionnal
personality.advertise = true
personality.deauth = true
personality.associate = true
personality.channels = []
personality.min_rssi = -200
personality.ap_ttl = 120
personality.sta_ttl = 300
personality.recon_time = 30
personality.max_inactive_scale = 2
personality.recon_inactive_multiplier = 2
personality.hop_recon_time = 10
personality.min_recon_time = 5
personality.max_interactions = 3
personality.max_misses_for_recon = 5
personality.excited_num_epochs = 10
personality.bored_num_epochs = 15
personality.sad_num_epochs = 25
personality.bond_encounters_factor = 20000
personality.throttle_a = 0.4
personality.throttle_d = 0.9
[main.plugins.wpa-sec]
enabled = false
api_key = ""
api_url = "https://wpa-sec.stanev.org"
download_results = false
show_pwd = false
ui.invert = false # false = black background, true = white background
ui.cursor = true
ui.fps = 0.0
ui.font.name = "DejaVuSansMono" # for japanese: fonts-japanese-gothic
ui.font.size_offset = 0 # will be added to the font size
[main.log]
path = "/etc/pwnagotchi/log/pwnagotchi.log"
path-debug = "/etc/pwnagotchi/log/pwnagotchi-debug.log"
ui.faces.look_r = "( ⚆_⚆)"
ui.faces.look_l = "(☉_☉ )"
ui.faces.look_r_happy = "( ◕‿◕)"
ui.faces.look_l_happy = "(◕‿◕ )"
ui.faces.sleep = "(⇀‿‿↼)"
ui.faces.sleep2 = "(≖‿‿≖)"
ui.faces.awake = "(◕‿‿◕)"
ui.faces.bored = "(-__-)"
ui.faces.intense = "(°▃▃°)"
ui.faces.cool = "(⌐■_■)"
ui.faces.happy = "(•‿‿•)"
ui.faces.excited = "(ᵔ◡◡ᵔ)"
ui.faces.grateful = "(^‿‿^)"
ui.faces.motivated = "(☼‿‿☼)"
ui.faces.demotivated = "(≖__≖)"
ui.faces.smart = "(✜‿‿✜)"
ui.faces.lonely = "(ب__ب)"
ui.faces.sad = "(╥☁╥ )"
ui.faces.angry = "(-_-')"
ui.faces.friend = "(♥‿‿♥)"
ui.faces.broken = "(☓‿‿☓)"
ui.faces.debug = "(#__#)"
ui.faces.upload = "(1__0)"
ui.faces.upload1 = "(1__1)"
ui.faces.upload2 = "(0__1)"
ui.faces.png = false
ui.faces.position_x = 0
ui.faces.position_y = 34
[main.log.rotation]
enabled = true
size = "10M"
ui.web.enabled = true
ui.web.address = "::" # listening on both ipv4 and ipv6 - switch to 0.0.0.0 to listen on just ipv4
ui.web.auth = false
ui.web.username = "changeme" # if auth is true
ui.web.password = "changeme" # if auth is true
ui.web.origin = ""
ui.web.port = 8080
ui.web.on_frame = ""
[personality]
advertise = true
deauth = true
associate = true
channels = []
min_rssi = -200
ap_ttl = 120
sta_ttl = 300
recon_time = 30
max_inactive_scale = 2
recon_inactive_multiplier = 2
hop_recon_time = 10
min_recon_time = 5
max_interactions = 3
max_misses_for_recon = 5
excited_num_epochs = 10
bored_num_epochs = 15
sad_num_epochs = 25
bond_encounters_factor = 20000
throttle_a = 0.4
throttle_d = 0.9
ui.display.enabled = false
ui.display.rotation = 180
ui.display.type = "waveshare_4"
[ui]
invert = false # false = black background, true = white background
cursor = true
fps = 0.0
bettercap.handshakes = "/home/pi/handshakes"
bettercap.silence = [
[ui.font]
name = "DejaVuSansMono" # for japanese: fonts-japanese-gothic
size_offset = 0 # will be added to the font size
[ui.faces]
look_r = "( ⚆_⚆)"
look_l = "(☉_☉ )"
look_r_happy = "( ◕‿◕)"
look_l_happy = "(◕‿◕ )"
sleep = "(⇀‿‿↼)"
sleep2 = "(≖‿‿≖)"
awake = "(◕‿‿◕)"
bored = "(-__-)"
intense = "(°▃▃°)"
cool = "(⌐■_■)"
happy = "(•‿‿•)"
excited = "(ᵔ◡◡ᵔ)"
grateful = "(^‿‿^)"
motivated = "(☼‿‿☼)"
demotivated = "(≖__≖)"
smart = "(✜‿‿✜)"
lonely = "(ب__ب)"
sad = "(╥☁╥ )"
angry = "(-_-')"
friend = "(♥‿‿♥)"
broken = "(☓‿‿☓)"
debug = "(#__#)"
upload = "(1__0)"
upload1 = "(1__1)"
upload2 = "(0__1)"
png = false
position_x = 0
position_y = 34
[ui.web]
enabled = true
address = "::" # listening on both ipv4 and ipv6 - switch to 0.0.0.0 to listen on just ipv4
auth = false
username = "changeme" # if auth is true
password = "changeme" # if auth is true
origin = ""
port = 8080
on_frame = ""
[ui.display]
enabled = false
rotation = 180
type = "waveshare_4"
[bettercap]
handshakes = "/home/pi/handshakes"
silence = [
"ble.device.new",
"ble.device.lost",
"ble.device.disconnected",
"ble.device.connected",
"ble.device.service.discovered",
"ble.device.characteristic.discovered",
"ble.device.disconnected",
"ble.device.connected",
"ble.connection.timeout",
"wifi.client.new",
"wifi.client.lost",
"wifi.client.probe",
@ -184,17 +260,21 @@ bettercap.silence = [
"mod.started"
]
fs.memory.enabled = true
fs.memory.mounts.log.enabled = true
fs.memory.mounts.log.mount = "/etc/pwnagotchi/log/"
fs.memory.mounts.log.size = "50M"
fs.memory.mounts.log.sync = 60
fs.memory.mounts.log.zram = true
fs.memory.mounts.log.rsync = true
[fs.memory]
enabled = true
fs.memory.mounts.data.enabled = true
fs.memory.mounts.data.mount = "/var/tmp/pwnagotchi"
fs.memory.mounts.data.size = "10M"
fs.memory.mounts.data.sync = 3600
fs.memory.mounts.data.zram = true
fs.memory.mounts.data.rsync = true
[fs.memory.mounts.log]
enabled = true
mount = "/etc/pwnagotchi/log/"
size = "50M"
sync = 60
zram = true
rsync = true
[fs.memory.mounts.data]
enabled = true
mount = "/var/tmp/pwnagotchi"
size = "10M"
sync = 3600
zram = true
rsync = true

View File

@ -268,6 +268,7 @@ def setup_logging(args, config):
requests_log.addHandler(logging.NullHandler())
requests_log.prpagate = False
logging.info("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- Pwnagotchi Re|Started -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-")

View File

@ -1,10 +1,9 @@
# Handles the commandline stuff
import os
import logging
import glob
import re
import shutil
import socket # <-- Added for DNS check
from fnmatch import fnmatch
from pwnagotchi.utils import download_file, unzip, save_config, parse_version, md5
from pwnagotchi.plugins import default_path
@ -107,20 +106,19 @@ def edit(args, config):
plugin_config = {'main': {'plugins': {plugin: config['main']['plugins'][plugin]}}}
import toml
import tomlkit
from subprocess import call
from tempfile import NamedTemporaryFile
from pwnagotchi.utils import DottedTomlEncoder
new_plugin_config = None
with NamedTemporaryFile(suffix=".tmp", mode='r+t') as tmp:
tmp.write(toml.dumps(plugin_config, encoder=DottedTomlEncoder()))
tmp.write(tomlkit.dumps(plugin_config))
tmp.flush()
rc = call([editor, tmp.name])
if rc != 0:
return rc
tmp.seek(0)
new_plugin_config = toml.load(tmp)
new_plugin_config = tomlkit.load(tmp)
config['main']['plugins'][plugin] = new_plugin_config['main']['plugins'][plugin]
save_config(config, args.user_config)
@ -348,12 +346,34 @@ def _analyse_dir(path):
return results
def _check_internet():
"""
Simple DNS check to verify that we can resolve a common hostname.
Returns True if DNS resolution succeeds, False otherwise.
"""
try:
socket.gethostbyname('google.com')
return True
except:
return False
def update(config):
"""
Updates the database
"""
global SAVE_DIR
if not _check_internet():
logging.error("No internet connection or DNS not working. Please follow these instructions:")
logging.error("https://github.com/jayofelony/pwnagotchi/wiki/Step-2-Connecting")
print("No internet/DNS. Please follow these instructions:")
print("https://github.com/jayofelony/pwnagotchi/wiki/Step-2-Connecting")
return 1
else:
logging.info("Internet detected - Please run sudo pwnagotchi plugins list")
print("Internet detected - Please run sudo pwnagotchi plugins list")
urls = config['main']['custom_plugin_repos']
if not urls:
logging.info('No plugin repositories configured.')
@ -393,3 +413,4 @@ def update(config):
logging.error('Error while updating plugins: %s', ex)
rc = 1
return rc

View File

@ -14,7 +14,7 @@ import pwnagotchi.plugins as plugins
from pwnagotchi.utils import StatusFile, parse_version as version_to_tuple
def check(version, repo, native=True):
def check(version, repo, native=True, token=""):
logging.debug("checking remote version for %s, local is %s" % (repo, version))
info = {
'repo': repo,
@ -25,7 +25,21 @@ def check(version, repo, native=True):
'arch': platform.machine()
}
resp = requests.get("https://api.github.com/repos/%s/releases/latest" % repo)
headers = {}
if token != "":
headers['Authorization'] = f'token {token}'
resp = requests.get(f"https://api.github.com/repos/{repo}/releases/latest", headers=headers)
else:
resp = requests.get(f"https://api.github.com/repos/{repo}/releases/latest")
if resp.status_code != 200:
logging.error(f"[Auto-Update] Failed to get latest release for {repo}: {resp.status_code}")
return info
remaining_requests = resp.headers.get('X-RateLimit-Remaining')
logging.debug(f"[Auto-Update] Requests remaining: {remaining_requests}")
latest = resp.json()
info['available'] = latest_ver = latest['tag_name'].replace('v', '')
is_armhf = info['arch'].startswith('arm')
@ -214,7 +228,7 @@ class AutoUpdate(plugins.Plugin):
]
for repo, local_version, is_native, svc_name in to_check:
info = check(local_version, repo, is_native)
info = check(local_version, repo, is_native, self.options['token'])
if info['url'] is not None:
logging.warning(

View File

@ -0,0 +1,152 @@
import pwnagotchi.plugins as plugins
from pwnagotchi.utils import StatusFile
import logging
import os
import subprocess
import time
import socket
class AutoBackup(plugins.Plugin):
__author__ = 'WPA2'
__version__ = '1.1.3'
__license__ = 'GPL3'
__description__ = 'Backs up files when internet is available, with support for excludes.'
def __init__(self):
self.ready = False
self.tries = 0
# Used to throttle repeated log messages for "backup not due yet"
self.last_not_due_logged = 0
# Store the status file path separately.
self.status_file = '/root/.auto-backup'
self.status = StatusFile(self.status_file)
def on_loaded(self):
required_options = ['files', 'interval', 'backup_location', 'max_tries']
for opt in required_options:
if opt not in self.options or self.options[opt] is None:
logging.error(f"AUTO-BACKUP: Option '{opt}' is not set.")
return
# If no custom command(s) are provided, use the default plain tar command.
# The command includes a placeholder for {excludes} so that if no excludes are set, it will be empty.
if 'commands' not in self.options or not self.options['commands']:
self.options['commands'] = ["tar cf {backup_file} {excludes} {files}"]
self.ready = True
logging.info("AUTO-BACKUP: Successfully loaded.")
def get_interval_seconds(self):
"""
Convert the interval option into seconds.
Supports:
- "daily" for 24 hours,
- "hourly" for 60 minutes,
- or a numeric value (interpreted as minutes).
"""
interval = self.options['interval']
if isinstance(interval, str):
if interval.lower() == "daily":
return 24 * 60 * 60
elif interval.lower() == "hourly":
return 60 * 60
else:
try:
minutes = float(interval)
return minutes * 60
except ValueError:
logging.error("AUTO-BACKUP: Invalid interval format. Defaulting to daily interval.")
return 24 * 60 * 60
elif isinstance(interval, (int, float)):
return float(interval) * 60
else:
logging.error("AUTO-BACKUP: Unrecognized type for interval. Defaulting to daily interval.")
return 24 * 60 * 60
def is_backup_due(self):
"""
Determines if enough time has passed since the last backup.
If the status file does not exist, a backup is due.
"""
interval_sec = self.get_interval_seconds()
try:
last_backup = os.path.getmtime(self.status_file)
except OSError:
# Status file doesn't exist—backup is due.
return True
now = time.time()
return (now - last_backup) >= interval_sec
def on_internet_available(self, agent):
if not self.ready:
return
if self.options['max_tries'] and self.tries >= self.options['max_tries']:
logging.info("AUTO-BACKUP: Maximum tries reached, skipping backup.")
return
if not self.is_backup_due():
now = time.time()
# Log "backup not due" only once every 600 seconds.
if now - self.last_not_due_logged > 600:
logging.info("AUTO-BACKUP: Backup not due yet based on the interval.")
self.last_not_due_logged = now
return
# Only include files/directories that exist to prevent errors.
existing_files = list(filter(lambda f: os.path.exists(f), self.options['files']))
if not existing_files:
logging.warning("AUTO-BACKUP: No files found to backup.")
return
files_to_backup = " ".join(existing_files)
# Build excludes string if configured.
# Use get() so that if 'exclude' is missing or empty, we default to an empty list.
excludes = ""
exclude_list = self.options.get('exclude', [])
if exclude_list:
for pattern in exclude_list:
excludes += f" --exclude='{pattern}'"
# Get the backup location from config.
backup_location = self.options['backup_location']
# Retrieve the global config from agent. If agent.config is callable, call it.
global_config = getattr(agent, 'config', None)
if callable(global_config):
global_config = global_config()
if global_config is None:
global_config = {}
pwnagotchi_name = global_config.get('main', {}).get('name', socket.gethostname())
backup_file = os.path.join(backup_location, f"{pwnagotchi_name}-backup.tar")
try:
display = agent.view()
logging.info("AUTO-BACKUP: Starting backup process...")
display.set('status', 'Backing up ...')
display.update()
# Execute each backup command.
for cmd in self.options['commands']:
formatted_cmd = cmd.format(backup_file=backup_file, files=files_to_backup, excludes=excludes)
logging.info(f"AUTO-BACKUP: Running command: {formatted_cmd}")
process = subprocess.Popen(
formatted_cmd,
shell=True,
stdin=None,
stdout=open("/dev/null", "w"),
stderr=subprocess.STDOUT,
executable="/bin/bash"
)
process.wait()
if process.returncode > 0:
raise OSError(f"Command failed with return code: {process.returncode}")
logging.info(f"AUTO-BACKUP: Backup completed successfully. File created at {backup_file}")
display.set('status', 'Backup done!')
display.update()
self.status.update()
except OSError as os_e:
self.tries += 1
logging.error(f"AUTO-BACKUP: Backup error: {os_e}")
display.set('status', 'Backup failed!')
display.update()

View File

@ -1,93 +1,328 @@
import logging
import subprocess
import re
import time
from flask import abort, render_template_string
import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
TEMPLATE = """
{% extends "base.html" %}
{% set active_page = "bt-tether" %}
{% block title %}
{{ title }}
{% endblock %}
{% block meta %}
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, user-scalable=0" />
{% endblock %}
{% block styles %}
{{ super() }}
<style>
#searchText {
width: 100%;
}
table {
table-layout: auto;
width: 100%;
}
table, th, td {
border: 1px solid;
border-collapse: collapse;
}
th, td {
padding: 15px;
text-align: left;
}
@media screen and (max-width:700px) {
table, tr, td {
padding:0;
border:1px solid;
}
table {
border:none;
}
tr:first-child, thead, th {
display:none;
border:none;
}
tr {
float: left;
width: 100%;
margin-bottom: 2em;
}
td {
float: left;
width: 100%;
padding:1em;
}
td::before {
content:attr(data-label);
word-wrap: break-word;
color: white;
border-right:2px solid;
width: 20%;
float:left;
padding:1em;
font-weight: bold;
margin:-1em 1em -1em -1em;
}
}
</style>
{% endblock %}
{% block script %}
var searchInput = document.getElementById("searchText");
searchInput.onkeyup = function() {
var filter, table, tr, td, i, txtValue;
filter = searchInput.value.toUpperCase();
table = document.getElementById("tableOptions");
if (table) {
tr = table.getElementsByTagName("tr");
for (i = 0; i < tr.length; i++) {
td = tr[i].getElementsByTagName("td")[0];
if (td) {
txtValue = td.textContent || td.innerText;
if (txtValue.toUpperCase().indexOf(filter) > -1) {
tr[i].style.display = "";
}else{
tr[i].style.display = "none";
}
}
}
}
}
{% endblock %}
{% block content %}
<input type="text" id="searchText" placeholder="Search for ..." title="Type in a filter">
<table id="tableOptions">
<tr>
<th>Item</th>
<th>Configuration</th>
</tr>
<tr>
<td data-label="bluetooth">Bluetooth</td>
<td>{{bluetooth|safe}}</td>
</tr>
<tr>
<td data-label="device">Device</td>
<td>{{device|safe}}</td>
</tr>
<tr>
<td data-label="connection">Connection</td>
<td>{{connection|safe}}</td>
</tr>
</table>
{% endblock %}
"""
# We all love crazy regex patterns
MAC_PTTRN = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
IP_PTTRN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
DNS_PTTRN = r"^\s*((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]\s*)+((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*[ ,;]?\s*)$"
class BTTether(plugins.Plugin):
__author__ = 'Jayofelony'
__version__ = '1.2'
__license__ = 'GPL3'
__description__ = 'A new BT-Tether plugin'
__author__ = "Jayofelony, modified my fmatray"
__version__ = "1.4"
__license__ = "GPL3"
__description__ = "A new BT-Tether plugin"
def __init__(self):
self.ready = False
self.options = dict()
self.status = '-'
self.phone_name = None
self.mac = None
@staticmethod
def exec_cmd(cmd, args, pattern=None):
try:
result = subprocess.run([cmd] + args, check=True, capture_output=True, text=True)
if pattern:
return result.stdout.find(pattern)
return result
except Exception as exp:
logging.error(f"[BT-Tether] Error with {cmd}")
logging.error(f"[BT-Tether] Exception : {exp}")
raise exp
def bluetoothctl(self, args, pattern=None):
return self.exec_cmd("bluetoothctl", args, pattern)
def nmcli(self, args, pattern=None):
return self.exec_cmd("nmcli", args, pattern)
def on_loaded(self):
logging.info("[BT-Tether] plugin loaded.")
def on_config_changed(self, config):
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
self.ready = False
ip = self.options['ip']
mac = self.options['mac']
phone_name = self.options['phone-name'] + ' Network'
if self.options['phone'].lower() == 'android':
address = f'{ip}'
gateway = '192.168.44.1'
elif self.options['phone'].lower() == 'ios':
address = f'{ip}'
gateway = '172.20.10.1'
if "phone-name" not in self.options:
logging.error("[BT-Tether] Phone name not provided")
return
if not ("mac" in self.options and re.match(MAC_PTTRN, self.options["mac"])):
logging.error("[BT-Tether] Error with mac address")
return
if not ("phone" in self.options and self.options["phone"].lower() in ["android", "ios"]):
logging.error("[BT-Tether] Phone type not supported")
return
if self.options["phone"].lower() == "android":
address = self.options.get("ip", "192.168.44.2")
gateway = "192.168.44.1"
elif self.options["phone"].lower() == "ios":
address = self.options.get("ip", "172.20.10.2")
gateway = "172.20.10.1"
if not re.match(IP_PTTRN, address):
logging.error(f"[BT-Tether] IP error: {address}")
return
self.phone_name = self.options["phone-name"] + " Network"
self.mac = self.options["mac"]
dns = self.options.get("dns", "8.8.8.8 1.1.1.1")
if not re.match(DNS_PTTRN, dns):
if dns == "":
logging.error(f"[BT-Tether] Empty DNS setting")
else:
logging.error("[BT-Tether] Phone type not supported.")
logging.error(f"[BT-Tether] Wrong DNS setting: '{dns}'")
return
dns = re.sub("[\s,;]+", " ", dns).strip() # DNS cleaning
try:
# Configure connection. Metric is set to 200 to prefer connection over USB
self.nmcli(
[
"connection", "modify", f"{self.phone_name}",
"connection.type", "bluetooth",
"bluetooth.type", "panu",
"bluetooth.bdaddr", f"{self.mac}",
"connection.autoconnect", "yes",
"connection.autoconnect-retries", "0",
"ipv4.method", "manual",
"ipv4.dns", f"{dns}",
"ipv4.addresses", f"{address}/24",
"ipv4.gateway", f"{gateway}",
"ipv4.route-metric", "200",
]
)
# Configure Device to autoconnect
self.nmcli([
"device", "set", f"{self.mac}",
"autoconnect", "yes",
"managed", "yes"
])
self.nmcli(["connection", "reload"])
self.ready = True
logging.info(f"[BT-Tether] Connection {self.phone_name} configured")
except Exception as e:
logging.error(f"[BT-Tether] Error while configuring: {e}")
return
try:
subprocess.run([
'nmcli', 'connection', 'modify', f'{phone_name}',
'connection.type', 'bluetooth',
'bluetooth.type', 'panu',
'bluetooth.bdaddr', f'{mac}',
'ipv4.method', 'manual',
'ipv4.dns', '8.8.8.8;1.1.1.1;'
'ipv4.addresses', f'{address}',
'ipv4.gateway', f'{gateway}',
'ipv4.route-metric', '100'
], check=True)
subprocess.run(['nmcli', 'connection', 'reload'], check=True)
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
time.sleep(5) # Give some delay to configure before going up
self.nmcli(["connection", "up", f"{self.phone_name}"])
except Exception as e:
logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
self.ready = True
logging.error(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(
f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?"
)
def on_ready(self, agent):
if any(self.options[key] == '' for key in ['phone', 'phone-name', 'ip', 'mac']):
self.ready = False
self.ready = True
try:
logging.info(f"[BT-Tether] Disabling bettercap's BLE module")
agent.run("ble.recon off", verbose_errors=False)
except Exception as e:
logging.info(f"[BT-Tether] Bettercap BLE was already off.")
def on_unload(self, ui):
with ui._lock:
ui.remove_element("bluetooth")
try:
self.nmcli(["connection", "down", f"{self.phone_name}"])
except Exception as e:
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
def on_ui_setup(self, ui):
with ui._lock:
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-',
ui.add_element(
"bluetooth",
LabeledValue(
color=BLACK,
label="BT",
value="-",
position=(ui.width() / 2 - 10, 0),
label_font=fonts.Bold, text_font=fonts.Medium))
label_font=fonts.Bold,
text_font=fonts.Medium,
),
)
def on_ui_update(self, ui):
if self.ready:
phone_name = self.options['phone-name'] + ' Network'
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
self.status = 'C'
else:
self.status = '-'
try:
subprocess.run(['nmcli', 'connection', 'up', f'{phone_name}'], check=True)
except Exception as e:
logging.debug(f"[BT-Tether] Failed to connect to device: {e}")
logging.error(f"[BT-Tether] Failed to connect to device: have you enabled bluetooth tethering on your phone?")
ui.set('bluetooth', self.status)
if not self.ready:
return
def on_unload(self, ui):
phone_name = self.options['phone-name'] + ' Network'
with ui._lock:
ui.remove_element('bluetooth')
status = ""
try:
if (subprocess.run(['bluetoothctl', 'info'], capture_output=True, text=True)).stdout.find('Connected: yes') != -1:
subprocess.run(['nmcli', 'connection', 'down', f'{phone_name}'], check=True)
logging.info(f"[BT-Tether] Disconnected from device with name: {phone_name}")
# Checking connection
if (
self.nmcli(["-w", "0", "-g", "GENERAL.STATE", "connection", "show", self.phone_name],
"activated",
)
!= -1
):
ui.set("bluetooth", "U")
return
else:
logging.info(f"[BT-Tether] Device with name {phone_name} is not connected, not disconnecting")
ui.set("bluetooth", "D")
status = "BT Conn. down"
# Checking device
if (
self.nmcli(
["-w", "0", "-g", "GENERAL.STATE", "device", "show", self.mac],
"(connected)",
)
!= -1
):
ui.set("bluetooth", "C")
status += "\nBT dev conn."
else:
ui.set("bluetooth", "-")
status += "\nBT dev disconn."
ui.set("status", status)
except Exception as e:
logging.error(f"[BT-Tether] Failed to disconnect from device: {e}")
logging.error(f"[BT-Tether] Error on update: {e}")
def on_webhook(self, path, request):
if not self.ready:
return """<html>
<head><title>BT-tether: Error</title></head>
<body><code>Plugin not ready</code></body>
</html>"""
if path == "/" or not path:
try:
bluetooth = self.bluetoothctl(["info", self.mac])
bluetooth = bluetooth.stdout.replace("\n", "<br>")
except Exception as e:
bluetooth = "Error while checking bluetoothctl"
try:
device = self.nmcli(["-w", "0", "device", "show", self.mac])
device = device.stdout.replace("\n", "<br>")
except Exception as e:
device = "Error while checking nmcli device"
try:
connection = self.nmcli(["-w", "0", "connection", "show", self.phone_name])
connection = connection.stdout.replace("\n", "<br>")
except Exception as e:
connection = "Error while checking nmcli connection"
logging.debug(device)
return render_template_string(
TEMPLATE,
title="BT-Tether",
bluetooth=bluetooth,
device=device,
connection=connection,
)
abort(404)

View File

@ -0,0 +1,119 @@
import logging
import json
import os
import re
import pathlib
import pwnagotchi.plugins as plugins
from datetime import datetime, UTC
from threading import Lock
def read_ap_cache(cache_dir, file):
cache_filename = os.path.basename(re.sub(r"\.(pcap|gps\.json|geo\.json)$", ".cache", file))
cache_filename = os.path.join(cache_dir, cache_filename)
if not os.path.exists(cache_filename):
logging.info("Cache not exist")
return None
try:
with open(cache_filename, "r") as f:
return json.load(f)
except Exception as e:
logging.info(f"Exception {e}")
return None
class Cache(plugins.Plugin):
__author__ = "fmatray"
__version__ = "1.0.0"
__license__ = "GPL3"
__description__ = "A simple plugin to cache AP informations"
def __init__(self):
self.options = dict()
self.ready = False
self.lock = Lock()
def on_loaded(self):
logging.info("[CACHE] plugin loaded.")
def on_config_changed(self, config):
try:
handshake_dir = config["bettercap"].get("handshakes")
self.cache_dir = os.path.join(handshake_dir, "cache")
os.makedirs(self.cache_dir, exist_ok=True)
except Exception:
logging.info(f"[CACHE] Cannot access to the cache directory")
return
self.last_clean = datetime.now(tz=UTC)
self.ready = True
logging.info(f"[CACHE] Cache plugin configured")
self.clean_ap_cache()
def on_unload(self, ui):
self.clean_ap_cache()
def clean_ap_cache(self):
if not self.ready:
return
with self.lock:
ctime = datetime.now(tz=UTC)
cache_to_delete = list()
for cache_file in pathlib.Path(self.cache_dir).glob("*.apcache"):
try:
mtime = datetime.fromtimestamp(cache_file.lstat().st_mtime, tz=UTC)
if (ctime - mtime).total_seconds() > 60 * 5:
cache_to_delete.append(cache_file)
except FileNotFoundError:
pass
if cache_to_delete:
logging.info(f"[CACHE] Cleaning {len(cache_to_delete)} files")
for cache_file in cache_to_delete:
try:
cache_file.unlink()
except FileNotFoundError as e:
pass
def write_ap_cache(self, access_point):
with self.lock:
try:
mac = access_point["mac"].replace(":", "")
hostname = re.sub(r"[^a-zA-Z0-9]", "", access_point["hostname"])
except KeyError:
return
cache_file = os.path.join(self.cache_dir, f"{hostname}_{mac}.apcache")
try:
with open(cache_file, "w") as f:
json.dump(access_point, f)
except Exception as e:
logging.error(f"[CACHE] Cannot write {cache_file}: {e}")
pass
def on_wifi_update(self, agent, access_points):
if self.ready:
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], access_points):
self.write_ap_cache(ap)
def on_unfiltered_ap_list(self, agent, aps):
if self.ready:
for ap in filter(lambda ap: ap["hostname"] not in ["", "<hidden>"], aps):
self.write_ap_cache(ap)
def on_association(self, agent, access_point):
if self.ready:
self.write_ap_cache(access_point)
def on_deauthentication(self, agent, access_point, client_station):
if self.ready:
self.write_ap_cache(access_point)
def on_handshake(self, agent, filename, access_point, client_station):
if self.ready:
self.write_ap_cache(access_point)
def on_ui_update(self, ui):
if not self.ready:
return
current_time = datetime.now(tz=UTC)
if (current_time - self.last_clean).total_seconds() > 60:
self.clean_ap_cache()
self.last_clean = current_time

View File

@ -13,8 +13,8 @@ import zipfile
class GdriveSync(plugins.Plugin):
__author__ = '@jayofelony'
__version__ = '1.2'
__author__ = '@jayofelony & Moist'
__version__ = '1.4'
__license__ = 'GPL3'
__description__ = 'A plugin to backup various pwnagotchi files and folders to Google Drive. Once every hour from loading plugin.'
@ -26,12 +26,15 @@ class GdriveSync(plugins.Plugin):
self.status = StatusFile('/root/.gdrive-backup')
self.backup = True
self.backupfiles = [
'/root/brain.nn',
'/root/brain.json',
'/root/.api-report.json',
'/root/handshakes',
'/home/pi/handshakes',
'/root/peers',
'/etc/pwnagotchi'
'/etc/pwnagotchi',
'.etc/profile/',
'/usr/local/share/pwnagotchi/custom-plugins',
'/boot/firmware/config.txt',
'/boot/firmware/cmdline.txt'
]
def on_loaded(self):
@ -168,7 +171,7 @@ class GdriveSync(plugins.Plugin):
"""
self.internet = True
def on_handshake(self, agent):
def on_handshake(self, agent, filename, access_point, client_station):
display = agent.view()
if not self.ready and not self.internet:
return

View File

@ -44,7 +44,7 @@ def parse_pcap(filename):
class Grid(plugins.Plugin):
__author__ = 'evilsocket@gmail.com'
__version__ = '1.0.1'
__version__ = '1.1.0'
__license__ = 'GPL3'
__description__ = 'This plugin signals the unit cryptographic identity and list of pwned networks and list of pwned ' \
'networks to opwngrid.xyz '
@ -69,6 +69,11 @@ class Grid(plugins.Plugin):
def on_loaded(self):
logging.info("grid plugin loaded.")
def on_webhook(self, path, request):
from flask import make_response, redirect
response = make_response(redirect("https://opwngrid.xyz", code=302))
return response
def set_reported(self, reported, net_id):
if net_id not in reported:
reported.append(net_id)

View File

@ -10,7 +10,7 @@ from json.decoder import JSONDecodeError
class ohcapi(plugins.Plugin):
__author__ = 'Rohan Dayaram'
__version__ = '1.0.3'
__version__ = '1.1.0'
__license__ = 'GPL3'
__description__ = 'Uploads WPA/WPA2 handshakes to OnlineHashCrack.com using the new API (V2), no dashboard.'
@ -45,6 +45,11 @@ class ohcapi(plugins.Plugin):
self.ready = True
logging.info("OHC NewAPI: Plugin loaded and ready.")
def on_webhook(self, path, request):
from flask import make_response, redirect
response = make_response(redirect("https://www.onlinehashcrack.com", code=302))
return response
def on_internet_available(self, agent):
"""
Called once when the internet becomes available.
@ -70,15 +75,16 @@ class ohcapi(plugins.Plugin):
return
# Check if the internet is still available by pinging Google
self.internet_active = False
try:
response = requests.get('https://www.google.com', timeout=5)
if response.status_code == 200:
self.internet_active = True
else:
self.internet_active = False
except requests.ConnectionError:
return
current_time = time.time()
if current_time - self.last_run >= self.options['sleep']:
if self.internet_active and current_time - self.last_run >= self.options['sleep']:
self._run_tasks(agent)
self.last_run = current_time

View File

@ -6,9 +6,530 @@ import pwnagotchi.ui.fonts as fonts
import pwnagotchi.plugins as plugins
import pwnagotchi
import time
from pisugar import *
import smbus
from flask import abort
from flask import render_template_string
from collections import deque
import threading
PiSugar_addresses = {
"PiSugar2": 0x75, # PiSugar2\2Plus
"PiSugar3": 0x57, # PiSugar3\3Plus
"PiSugar2 RTC": 0x32 # PiSugar2\2Plus RTC
}
# Use the same battery level curve as pisugar-power-manager
curve5312 = [
(4.10, 100.0),
(4.05, 95.0),
(3.90, 88.0),
(3.80, 77.0),
(3.70, 65.0),
(3.62, 55.0),
(3.58, 49.0),
(3.49, 25.6),
(3.32, 4.5),
(3.1, 0.0),
]
curve5209 = [
(4.16, 100.0),
(4.05, 95.0),
(4.00, 80.0),
(3.92, 65.0),
(3.86, 40.0),
(3.79, 25.5),
(3.66, 10.0),
(3.52, 6.5),
(3.49, 3.2),
(3.1, 0.0),
]
class PiSugarServer:
def __init__(self):
"""
PiSugar initialization, if unable to connect to any version of PiSugar, return false
"""
self._bus = smbus.SMBus(1)
self.ready = False
self.model = None
self.i2creg = []
self.address = 0
self.battery_voltage = 0.00
self.voltage_history = deque(maxlen=10)
self.battery_level = 0
self.battery_charging = 0
self.temperature = 0
self.power_plugged = False
self.allow_charging = True
self.lowpower_shutdown = False
self.lowpower_shutdown_level = 10
self.max_charge_voltage_protection = False
self.max_protection_level=80
# Start the device connection in a background thread
self.connection_thread = threading.Thread(
target=self._connect_device, daemon=True)
self.connection_thread.start()
def _connect_device(self):
"""
Attempt to connect to the PiSugar device in a background thread.
"""
while self.model is None:
if self.check_device(PiSugar_addresses["PiSugar2"]) is not None:
self.address = PiSugar_addresses["PiSugar2"]
if self.check_device(PiSugar_addresses["PiSugar2"], 0xC2) != 0:
self.model = "PiSugar2Plus"
else:
self.model = "PiSugar2"
self.device_init()
elif self.check_device(PiSugar_addresses["PiSugar3"]) is not None:
self.model = 'PiSugar3'
self.address = PiSugar_addresses["PiSugar3"]
self.device_init()
else:
self.model = None
logging.info(
"No PiSugar device was found. Please check if the PiSugar device is powered on."
)
time.sleep(5)
logging.info(f"{self.model} is connected")
# Once connected, start the timer
self.start_timer()
while len(self.i2creg) < 256:
time.sleep(1)
self.ready = True
logging.info(f"{self.model} is ready")
def start_timer(self):
# Create a thread to execute the timer function
timer_thread = threading.Thread(target=self.update_value)
timer_thread.daemon = True # Set as a daemon thread, automatically ends when the main program exits
timer_thread.start()
def update_value(self):
"""Update PiSugar status every three seconds, including triggering auto shutdown"""
while True:
try:
if self.model == 'PiSugar2' or self.model == 'PiSugar2Plus':
self.set_battery_notallow_charging() # Temporarily disable charging to get accurate battery voltage
time.sleep(0.05)
self.i2creg = []
for i in range(0, 256, 32):
# Calculate the starting register address for the current read
current_register = 0 + i
# Calculate the length of the current read
current_length = min(32, 256 - i)
# Read data block
chunk = self._bus.read_i2c_block_data(
self.address, current_register, current_length)
# Add the read data block to the result list
self.i2creg.extend(chunk)
time.sleep(0.1)
logging.debug(f"Data length: {len(self.i2creg)}")
logging.debug(f"Data: {self.i2creg}")
if self.model == 'PiSugar3':
low = self.i2creg[0x23]
high = self.i2creg[0x22]
self.battery_voltage = (((high << 8) + low) / 1000)
self.temperature = self.i2creg[0x04] - 40
ctr1 = self.i2creg[0x02] # Read control register 1
self.power_plugged = (ctr1 & (1 << 7)) != 0 # Check if power is plugged in
self.allow_charging = (ctr1 & (1 << 6)) != 0 # Check if charging is allowed
if self.max_charge_voltage_protection:
self._bus.write_byte_data(
self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) | 0b10000000)
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # Enable write protection
else:
self._bus.write_byte_data(
self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x20, self._bus.read_byte_data(
self.address, 0x20) & 0b01111111)
self._bus.write_byte_data(
self.address, 0x0B, 0x00) # Enable write protection
elif self.model == 'PiSugar2':
high = self.i2creg[0xa3]
low = self.i2creg[0xa2]
self.battery_voltage = (2600.0 - (((high | 0b11000000) << 8) + low) * 0.26855) / \
1000.0 if high & 0x20 else (
2600.0 + (((high & 0x1f) << 8) + low) * 0.26855) / 1000.0
self.power_plugged = (self.i2creg[0x55] & 0b00010000) != 0
if self.max_charge_voltage_protection:
self.voltage_history.append(self.battery_voltage)
self.battery_level = self.convert_battery_voltage_to_level()
if (self.battery_level) > self.max_protection_level:
self.set_battery_notallow_charging()
else:
self.set_battery_allow_charging()
else:
self.set_battery_allow_charging()
elif self.model == 'PiSugar2Plus':
low = self.i2creg[0xd0]
high = self.i2creg[0xd1]
self.battery_voltage = (
(((high & 0b00111111) << 8) + low) * 0.26855 + 2600.0)/1000
self.power_plugged = self.i2creg[0xdd] == 0x1f
if self.max_charge_voltage_protection:
self.voltage_history.append(self.battery_voltage)
self.battery_level = self.convert_battery_voltage_to_level()
if (self.battery_level) > self.max_protection_level:
self.set_battery_notallow_charging()
else:
self.set_battery_allow_charging()
else:
self.set_battery_allow_charging()
self.voltage_history.append(self.battery_voltage)
self.battery_level = self.convert_battery_voltage_to_level()
if self.lowpower_shutdown:
if self.battery_level < self.lowpower_shutdown_level:
logging.info("[PiSugarX] low power shutdown now.")
self.shutdown()
pwnagotchi.shutdown()
time.sleep(3)
except Exception as e:
logging.error(f"read error{e}")
time.sleep(3)
def shutdown(self):
# logging.info("[PiSugarX] PiSugar set shutdown .")
if self.model == 'PiSugar3':
# Shutdown the power after 10 seconds
self._bus.write_byte_data(self.address, 0x0B, 0x29) # Disable write protection
self._bus.write_byte_data(self.address, 0x09, 10)
self._bus.write_byte_data(self.address, 0x02, self._bus.read_byte_data(
self.address, 0x02) & 0b11011111)
self._bus.write_byte_data(self.address, 0x0B, 0x00) # Enable write protection
logging.info("[PiSugarX] PiSugar shutdown in 10s.")
elif self.model == 'PiSugar2':
pass
elif self.model == 'PiSugar2Plus':
pass
def check_device(self, address, reg=0):
"""Check if a device is present at the specified address"""
try:
return self._bus.read_byte_data(address, reg)
except OSError as e:
logging.debug(f"Device not found at address {address}: {e}")
return None
def device_init(self):
if self.model == "PiSugar2Plus":
'''Initialize GPIO'''
self._bus.write_byte_data(self.address, 0x52, self._bus.read_byte_data(
self.address, 0x52) | 0b00000010)
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) | 0b00000010)
self._bus.write_byte_data(self.address, 0x52, self._bus.read_byte_data(
self.address, 0x52) | 0b00000100)
self._bus.write_byte_data(self.address, 0x29, self._bus.read_byte_data(
self.address, 0x29) & 0b10111111)
self._bus.write_byte_data(self.address, 0x52, self._bus.read_byte_data(
self.address, 0x52) & 0b10011111 | 0b01000000)
self._bus.write_byte_data(self.address, 0xc2, self._bus.read_byte_data(
self.address, 0xc2) | 0b00010000)
logging.debug(f"PiSugar2Plus GPIO initialization complete")
'''Init boost intensity, 0x3f*50ma, 3A'''
self._bus.write_byte_data(self.address, 0x30, self._bus.read_byte_data(
self.address, 0x30) & 0b11000000 | 0x3f)
logging.debug(f"PiSugar2Plus current setting complete")
elif self.model == "PiSugar2":
'''Initialize GPIO'''
self._bus.write_byte_data(self.address, 0x51, (self._bus.read_byte_data(
self.address, 0x51) & 0b11110011) | 0b00000100)
self._bus.write_byte_data(self.address, 0x53, self._bus.read_byte_data(
self.address, 0x53) | 0b00000010)
self._bus.write_byte_data(self.address, 0x51, (self._bus.read_byte_data(
self.address, 0x51) & 0b11001111) | 0b00010000)
self._bus.write_byte_data(self.address, 0x26, self._bus.read_byte_data(
self.address, 0x26) & 0b10110000)
self._bus.write_byte_data(self.address, 0x52, (self._bus.read_byte_data(
self.address, 0x52) & 0b11110011) | 0b00000100)
self._bus.write_byte_data(self.address, 0x53, (self._bus.read_byte_data(
self.address, 0x53) & 0b11101111) | 0b00010000)
logging.debug(f"PiSugar2 GPIO initialization complete")
pass
def convert_battery_voltage_to_level(self):
"""
Convert battery voltage to battery percentage.
:param voltage: Current battery voltage
:param curve: Battery threshold curve, format [(voltage1, percentage1), (voltage2, percentage2), ...]
:return: Battery percentage
"""
if self.model == "PiSugar2Plus":
curve = curve5312
elif self.model == "PiSugar3Plus":
curve = curve5312
elif self.model == "PiSugar2":
curve = curve5209
elif self.model == "PiSugar3":
curve = curve5312
# Add the current voltage to the history
# If the history is less than 5 entries, return the average directly (to avoid truncation with no valid data)
if len(self.voltage_history) < 5:
avg_voltage = sum(self.voltage_history) / len(self.voltage_history)
else:
# Sort and remove the highest 2 and lowest 2
sorted_history = sorted(self.voltage_history)
trimmed_history = sorted_history[2:-2] # Remove the first two and last two
avg_voltage = sum(trimmed_history) / len(trimmed_history) # Calculate truncated mean
# Traverse each segment of the battery curve
for (v1, p1), (v2, p2) in zip(curve, curve[1:]):
# If the voltage is within the current interval
if v2 <= avg_voltage <= v1:
# Use linear interpolation to calculate the percentage
return p2 + (p1 - p2) * (avg_voltage - v2) / (v1 - v2)
# If the voltage is out of the curve range, return the lowest or highest percentage
return curve[-1][1] if avg_voltage < curve[-1][0] else curve[0][1]
def get_version(self):
"""
Get the firmware version of the PiSugar3.
If not PiSugar3, return None
:return: Version string or None
"""
if self.model == 'PiSugar3':
try:
return bytes(self.i2creg[0xe2:0xee]).decode('ascii')
except OSError as e:
logging.error(f"Failed to read version from PiSugar3: {e}")
return None
return None
def get_model(self):
"""
Get the model of the PiSugar hardware.
:return: Model string.
"""
return self.model
def get_battery_level(self):
"""
Get the current battery level in percentage.
:return: Battery level as a percentage (0-100).
"""
return self.battery_level
def get_battery_voltage(self):
"""
Get the current battery voltage.
:return: Battery voltage in volts.
"""
return self.battery_voltage
def get_battery_current(self):
"""
Get the current battery current.
:return: Battery current in amperes.
"""
pass
def get_battery_allow_charging(self):
"""
Check if battery charging is allowed.
:return: True if charging is allowed, False otherwise.
"""
return self.allow_charging
def set_battery_allow_charging(self):
if self.model == 'PiSugar3':
pass
elif self.model == 'PiSugar2':
# Disable gpio2 output
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) & 0b11111011)
# Enable charging
self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data(
self.address, 0x55) & 0b11111011)
# Enable gpio2 output
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) | 0b00000100)
elif self.model == 'PiSugar2Plus':
# Disable gpio2 output
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) & 0b11111011)
# Enable charging
self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data(
self.address, 0x58) & 0b11111011)
# Enable gpio2 output
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) | 0b00000100)
return
def set_battery_notallow_charging(self):
if self.model == 'PiSugar3':
pass
elif self.model == 'PiSugar2':
# Disable gpio2 output
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) & 0b11111011)
# Disable charging
self._bus.write_byte_data(self.address, 0x55, self._bus.read_byte_data(
self.address, 0x55) | 0b00000100)
# Enable gpio2 output
self._bus.write_byte_data(self.address, 0x54, self._bus.read_byte_data(
self.address, 0x54) | 0b00000100)
elif self.model == 'PiSugar2Plus':
# Disable gpio2 output
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) & 0b11111011)
# Disable charging
self._bus.write_byte_data(self.address, 0x58, self._bus.read_byte_data(
self.address, 0x58) | 0b00000100)
# Enable gpio2 output
self._bus.write_byte_data(self.address, 0x56, self._bus.read_byte_data(
self.address, 0x56) | 0b00000100)
return
def get_battery_charging_range(self):
"""
Get the battery charging range.
:return: Charging range string.
"""
pass
def get_battery_full_charge_duration(self):
"""
Get the duration of keeping the battery charging when full.
:return: Duration in seconds.
"""
pass
def get_battery_safe_shutdown_level(self):
"""
Get the safe shutdown level for the battery.
:return: Safe shutdown level as a percentage.
"""
pass
def get_battery_safe_shutdown_delay(self):
"""
Get the safe shutdown delay.
:return: Delay in seconds.
"""
pass
def get_battery_auto_power_on(self):
"""
Check if auto power on is enabled.
:return: True if enabled, False otherwise.
"""
pass
def get_battery_soft_poweroff(self):
"""
Check if soft power off is enabled.
:return: True if enabled, False otherwise.
"""
pass
def get_system_time(self):
"""
Get the system time.
:return: System time string.
"""
pass
def get_rtc_adjust_ppm(self):
"""
Get the RTC adjust PPM.
:return: RTC adjust PPM value.
"""
pass
def get_rtc_alarm_repeat(self):
"""
Get the RTC alarm repeat setting.
:return: RTC alarm repeat string.
"""
pass
def get_tap_enable(self, tap):
"""
Check if a specific tap (single, double, long) is enabled.
:param tap: Type of tap ('single', 'double', 'long').
:return: True if enabled, False otherwise.
"""
pass
def get_tap_shell(self, tap):
"""
Get the shell command associated with a specific tap.
:param tap: Type of tap ('single', 'double', 'long').
:return: Shell command string.
"""
pass
def get_anti_mistouch(self):
"""
Check if anti-mistouch protection is enabled.
:return: True if enabled, False otherwise.
"""
pass
def get_temperature(self):
"""
Get the current temperature.
:return: Temperature in degrees Celsius.
"""
return self.temperature
def get_battery_power_plugged(self):
"""
Check if the battery is plugged in.
:return: True if plugged in, False otherwise.
"""
return self.power_plugged
def get_battery_charging(self):
"""
Check if the battery is currently charging.
:return: True if charging, False otherwise.
"""
pass
def rtc_web(self):
"""
Synchronize RTC with web time.
"""
pass
class PiSugar(plugins.Plugin):
__author__ = "jayofelony"
@ -22,15 +543,25 @@ class PiSugar(plugins.Plugin):
def __init__(self):
self._agent = None
self.is_new_model = False
self.options = dict()
"""
self.options = {
'enabled': True,
'rotation': False,
'default_display': 'percentage',
'lowpower_shutdown': True,
'lowpower_shutdown_level': 10,
'max_charge_voltage_protection': True
}
"""
self.ps = None
# logging.debug(f"[PiSugarX] {self.options}")
try:
conn, event_conn = connect_tcp()
self.ps = PiSugarServer(conn, event_conn)
self.ps = PiSugarServer()
except Exception as e:
# Log at debug to avoid clutter since it might be a false positive
logging.debug("[PiSugarX] Unable to establish connection: %s", repr(e))
logging.debug(
"[PiSugarX] Unable to establish connection: %s", repr(e))
self.ready = False
self.lasttemp = 69
@ -49,7 +580,8 @@ class PiSugar(plugins.Plugin):
try:
return func()
except Exception as e:
logging.debug("[PiSugarX] Failed to get data using %s: %s", func.__name__, e)
logging.debug(
"[PiSugarX] Failed to get data using %s: %s", func.__name__, e)
return default
def on_loaded(self):
@ -60,20 +592,25 @@ class PiSugar(plugins.Plugin):
valid_displays = ['voltage', 'percentage', 'temp']
if self.default_display not in valid_displays:
logging.warning(f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.")
logging.warning(
f"[PiSugarX] Invalid default_display '{self.default_display}'. Using 'voltage'.")
self.default_display = 'voltage'
logging.info(f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.")
logging.info(f"[PiSugarX] Default display (when rotation disabled): {self.default_display}")
logging.info(
f"[PiSugarX] Rotation is {'enabled' if self.rotation_enabled else 'disabled'}.")
logging.info(
f"[PiSugarX] Default display (when rotation disabled): {self.default_display}")
self.ps.lowpower_shutdown = self.options['lowpower_shutdown']
self.ps.lowpower_shutdown_level = self.options['lowpower_shutdown_level']
self.ps.max_charge_voltage_protection = self.options['max_charge_voltage_protection']
def on_ready(self, agent):
self.ready = True
self._agent = agent
led_amount = self.safe_get(self.ps.get_battery_led_amount, default=0)
if led_amount == 2:
self.is_new_model = True
else:
self.is_new_model = False
try:
self.ready = self.ps.ready
except Exception as e:
# Log at debug to avoid clutter since it might be a false positive
logging.warning(f"[PiSugarX] {e}")
def on_internet_available(self, agent):
self._agent = agent
@ -87,31 +624,52 @@ class PiSugar(plugins.Plugin):
try:
if request.method == "GET":
if path == "/" or not path:
version = self.safe_get(self.ps.get_version, default='Unknown')
version = self.safe_get(
self.ps.get_version, default='Unknown')
model = self.safe_get(self.ps.get_model, default='Unknown')
battery_level = self.safe_get(self.ps.get_battery_level, default='N/A')
battery_voltage = self.safe_get(self.ps.get_battery_voltage, default='N/A')
battery_current = self.safe_get(self.ps.get_battery_current, default='N/A')
battery_led_amount = self.safe_get(self.ps.get_battery_led_amount, default='N/A') if model == 'Pisugar 2' else 'Not supported'
battery_allow_charging = self.safe_get(self.ps.get_battery_allow_charging, default=False)
battery_charging_range = self.safe_get(self.ps.get_battery_charging_range, default='N/A') if self.is_new_model or model == 'Pisugar 3' else 'Not supported'
battery_full_charge_duration = getattr(self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')()
safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=None)
battery_level = self.safe_get(
self.ps.get_battery_level, default='N/A')
battery_voltage = self.safe_get(
self.ps.get_battery_voltage, default='N/A')
battery_current = self.safe_get(
self.ps.get_battery_current, default='N/A')
battery_allow_charging = self.safe_get(
self.ps.get_battery_allow_charging, default=False)
battery_charging_range = self.safe_get(
self.ps.get_battery_charging_range, default='N/A')
battery_full_charge_duration = getattr(
self.ps, 'get_battery_full_charge_duration', lambda: 'N/A')()
safe_shutdown_level = self.safe_get(
self.ps.get_battery_safe_shutdown_level, default=None)
battery_safe_shutdown_level = f"{safe_shutdown_level}%" if safe_shutdown_level is not None else 'Not set'
battery_safe_shutdown_delay = self.safe_get(self.ps.get_battery_safe_shutdown_delay, default='N/A')
battery_auto_power_on = self.safe_get(self.ps.get_battery_auto_power_on, default=False)
battery_soft_poweroff = self.safe_get(self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False
system_time = self.safe_get(self.ps.get_system_time, default='N/A')
rtc_adjust_ppm = self.safe_get(self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported'
rtc_alarm_repeat = self.safe_get(self.ps.get_rtc_alarm_repeat, default='N/A')
single_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='single'), default=False)
double_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='double'), default=False)
long_tap_enabled = self.safe_get(lambda: self.ps.get_tap_enable(tap='long'), default=False)
single_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='single'), default='N/A')
double_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='double'), default='N/A')
long_tap_shell = self.safe_get(lambda: self.ps.get_tap_shell(tap='long'), default='N/A')
anti_mistouch = self.safe_get(self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False
temperature = self.safe_get(self.ps.get_temperature, default='N/A')
battery_safe_shutdown_delay = self.safe_get(
self.ps.get_battery_safe_shutdown_delay, default='N/A')
battery_auto_power_on = self.safe_get(
self.ps.get_battery_auto_power_on, default=False)
battery_soft_poweroff = self.safe_get(
self.ps.get_battery_soft_poweroff, default=False) if model == 'Pisugar 3' else False
system_time = self.safe_get(
self.ps.get_system_time, default='N/A')
rtc_adjust_ppm = self.safe_get(
self.ps.get_rtc_adjust_ppm, default='Not supported') if model == 'Pisugar 3' else 'Not supported'
rtc_alarm_repeat = self.safe_get(
self.ps.get_rtc_alarm_repeat, default='N/A')
single_tap_enabled = self.safe_get(
lambda: self.ps.get_tap_enable(tap='single'), default=False)
double_tap_enabled = self.safe_get(
lambda: self.ps.get_tap_enable(tap='double'), default=False)
long_tap_enabled = self.safe_get(
lambda: self.ps.get_tap_enable(tap='long'), default=False)
single_tap_shell = self.safe_get(
lambda: self.ps.get_tap_shell(tap='single'), default='N/A')
double_tap_shell = self.safe_get(
lambda: self.ps.get_tap_shell(tap='double'), default='N/A')
long_tap_shell = self.safe_get(
lambda: self.ps.get_tap_shell(tap='long'), default='N/A')
anti_mistouch = self.safe_get(
self.ps.get_anti_mistouch, default=False) if model == 'Pisugar 3' else False
temperature = self.safe_get(
self.ps.get_temperature, default='N/A')
ret = '''
<!DOCTYPE html>
@ -172,8 +730,7 @@ class PiSugar(plugins.Plugin):
<tr><td>Battery Level</td><td>{battery_level}%</td></tr>
<tr><td>Battery Voltage</td><td>{battery_voltage}V</td></tr>
<tr><td>Battery Current</td><td>{battery_current}A</td></tr>
<tr><td>Battery LED Amount</td><td>{battery_led_amount}</td></tr>
<tr><td>Battery Allow Charging</td><td>{"Yes" if battery_allow_charging and self.is_new_model else "No"}</td></tr>
<tr><td>Battery Allow Charging</td><td>{"Yes" if battery_allow_charging else "No"}</td></tr>
<tr><td>Battery Charging Range</td><td>{battery_charging_range}</td></tr>
<tr><td>Duration of Keep Charging When Full</td><td>{battery_full_charge_duration} seconds</td></tr>
<tr><td>Battery Safe Shutdown Level</td><td>{battery_safe_shutdown_level}</td></tr>
@ -240,13 +797,25 @@ class PiSugar(plugins.Plugin):
# Make sure "bat" is in the UI state (guard to prevent KeyError)
if 'bat' not in ui._state._state:
return
try:
self.ready = self.ps.ready
except Exception as e:
# Log at debug to avoid clutter since it might be a false positive
logging.warning(f"[PiSugarX] {e}")
if self.ready:
capacity = self.safe_get(self.ps.get_battery_level, default=0)
voltage = self.safe_get(self.ps.get_battery_voltage, default=0.00)
temp = self.safe_get(self.ps.get_temperature, default=0)
else:
capacity = 0
voltage = 0.00
temp = 0
logging.info(f"[PiSugarX] PiSugar is not ready")
# Check if battery is plugged in
battery_plugged = self.safe_get(self.ps.get_battery_power_plugged, default=False)
battery_plugged = self.safe_get(
self.ps.get_battery_power_plugged, default=False)
if battery_plugged:
# If plugged in, display "CHG"
@ -275,12 +844,3 @@ class PiSugar(plugins.Plugin):
ui.set('bat', f"{capacity:.0f}%")
elif self.default_display == 'temp':
ui.set('bat', f"{temp}°C")
charging = self.safe_get(self.ps.get_battery_charging, default=None)
safe_shutdown_level = self.safe_get(self.ps.get_battery_safe_shutdown_level, default=0)
if charging is not None:
if capacity <= safe_shutdown_level:
logging.info(
f"[PiSugarX] Empty battery (<= {safe_shutdown_level}%): shutting down"
)
ui.update(force=True, new_data={"status": "Battery exhausted, bye ..."})

View File

@ -0,0 +1,81 @@
import time
import os
import subprocess
import requests
import logging
import socket
from pwnagotchi.plugins import Plugin
class UploadConvertPlugin(Plugin):
__author__ = 'Terminatoror'
__version__ = '1.0.0'
__license__ = 'GPL3'
__description__ = 'Converts .pcap files to .hc22000 and uploads them to pwncrack.org when internet is available.'
def __init__(self):
self.server_url = 'http://pwncrack.org/upload_handshake' # Leave this as is
self.potfile_url = 'http://pwncrack.org/download_potfile_script' # Leave this as is
self.timewait = 600
self.last_run_time = 0
self.options = dict()
def on_loaded(self):
logging.info('[pwncrack] loading')
def on_config_changed(self, config):
self.handshake_dir = config["bettercap"].get("handshakes")
self.key = self.options.get('key', "") # Change this to your key
self.whitelist = config["main"].get("whitelist", [])
self.combined_file = os.path.join(self.handshake_dir, 'combined.hc22000')
self.potfile_path = os.path.join(self.handshake_dir, 'cracked.pwncrack.potfile')
def on_internet_available(self, agent):
current_time = time.time()
remaining_wait_time = self.timewait - (current_time - self.last_run_time)
if remaining_wait_time > 0:
logging.debug(f"[pwncrack] Waiting {remaining_wait_time:.1f} more seconds before next run.")
return
self.last_run_time = current_time
logging.info(f"[pwncrack] Running upload process. Key: {self.key}, waiting: {self.timewait} seconds.")
try:
self._convert_and_upload()
self._download_potfile()
except Exception as e:
logging.error(f"[pwncrack] Error occurred during upload process: {e}", exc_info=True)
def _convert_and_upload(self):
# Convert all .pcap files to .hc22000, excluding files matching whitelist items
pcap_files = [f for f in os.listdir(self.handshake_dir)
if f.endswith('.pcap') and not any(item in f for item in self.whitelist)]
if pcap_files:
for pcap_file in pcap_files:
subprocess.run(['hcxpcapngtool', '-o', self.combined_file, os.path.join(self.handshake_dir, pcap_file)])
# Ensure the combined file is created
if not os.path.exists(self.combined_file):
open(self.combined_file, 'w').close()
# Upload the combined .hc22000 file
with open(self.combined_file, 'rb') as file:
files = {'handshake': file}
data = {'key': self.key}
response = requests.post(self.server_url, files=files, data=data)
# Log the response
logging.info(f"[pwncrack] Upload response: {response.json()}")
os.remove(self.combined_file) # Remove the combined.hc22000 file
else:
logging.info("[pwncrack] No .pcap files found to convert (or all files are whitelisted).")
def _download_potfile(self):
response = requests.get(self.potfile_url, params={'key': self.key})
if response.status_code == 200:
with open(self.potfile_path, 'w') as file:
file.write(response.text)
logging.info(f"[pwncrack] Potfile downloaded to {self.potfile_path}")
else:
logging.error(f"[pwncrack] Failed to download potfile: {response.status_code}")
logging.error(f"[pwncrack] {response.json()}") # Log the error message from the server
def on_unload(self, ui):
logging.info('[pwncrack] unloading')

View File

@ -4,207 +4,368 @@ import json
import csv
import requests
import pwnagotchi
from io import StringIO
from datetime import datetime
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile, remove_whitelisted
import re
from glob import glob
from threading import Lock
from io import StringIO
from datetime import datetime, UTC
from dataclasses import dataclass
from flask import make_response, redirect
from pwnagotchi.utils import (
WifiInfo,
FieldNotFoundError,
extract_from_pcap,
StatusFile,
remove_whitelisted,
)
from pwnagotchi import plugins
from pwnagotchi.plugins.default.cache import read_ap_cache
from pwnagotchi._version import __version__ as __pwnagotchi_version__
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import Text
from pwnagotchi.ui.view import BLACK
def _extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try:
if path.endswith('.geo.json'):
with open(path, 'r') as json_file:
tempJson = json.load(json_file)
d = datetime.utcfromtimestamp(int(tempJson["ts"]))
return {"Latitude": tempJson["location"]["lat"],
"Longitude": tempJson["location"]["lng"],
"Altitude": 10,
"Accuracy": tempJson["accuracy"],
"Updated": d.strftime('%Y-%m-%dT%H:%M:%S.%f')}
else:
with open(path, 'r') as json_file:
return json.load(json_file)
except OSError as os_err:
raise os_err
except json.JSONDecodeError as json_err:
raise json_err
from scapy.all import Scapy_Exception
def _format_auth(data):
out = ""
for auth in data:
out = f"{out}[{auth}]"
return [f"{auth}" for auth in data]
@dataclass
class WigleStatistics:
ready: bool = False
username: str = None
rank: int = None
monthrank: int = None
discoveredwiFi: int = None
last: str = None
groupID: str = None
groupname: str = None
grouprank: int = None
def update_user(self, json_res):
self.ready = True
self.username = json_res["user"]
self.rank = json_res["rank"]
self.monthrank = json_res["monthRank"]
self.discoveredwiFi = json_res["statistics"]["discoveredWiFi"]
last = json_res["statistics"]["last"]
self.last = f"{last[6:8]}/{last[4:6]}/{last[0:4]}"
def _transform_wigle_entry(gps_data, pcap_data, plugin_version):
"""
Transform to wigle entry in file
"""
dummy = StringIO()
# write kismet header
dummy.write(f"WigleWifi-1.6,appRelease={plugin_version},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n")
dummy.write(
"MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\n")
def update_user_group(self, json_res):
self.groupID = json_res["groupId"]
self.groupname = json_res["groupName"]
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
writer.writerow([
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
gps_data['Latitude'],
gps_data['Longitude'],
gps_data['Altitude'],
gps_data['Accuracy'],
'WIFI'])
return dummy.getvalue()
def _send_to_wigle(lines, api_key, donate=True, timeout=30):
"""
Uploads the file to wigle-net
"""
dummy = StringIO()
for line in lines:
dummy.write(f"{line}")
dummy.seek(0)
headers = {"Authorization": f"Basic {api_key}",
"Accept": "application/json",
"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
data = {"donate": "on" if donate else "false"}
payload = {"file": (pwnagotchi.name() + ".csv", dummy, "multipart/form-data", {"Expires": "0"})}
try:
res = requests.post('https://api.wigle.net/api/v2/file/upload',
data=data,
headers=headers,
files=payload,
timeout=timeout)
json_res = res.json()
if not json_res['success']:
raise requests.exceptions.RequestException(json_res['message'])
except requests.exceptions.RequestException as re_e:
raise re_e
def update_group(self, json_res):
rank = 1
for group in json_res["groups"]:
if group["groupId"] == self.groupID:
self.grouprank = rank
rank += 1
class Wigle(plugins.Plugin):
__author__ = "Dadav and updated by Jayofelony"
__version__ = "3.0.1"
__author__ = "Dadav and updated by Jayofelony and fmatray"
__version__ = "4.1.0"
__license__ = "GPL3"
__description__ = "This plugin automatically uploads collected WiFi to wigle.net"
LABEL_SPACING = 0
def __init__(self):
self.ready = False
self.report = StatusFile('/root/.wigle_uploads', data_format='json')
self.report = None
self.skip = list()
self.lock = Lock()
self.options = dict()
self.statistics = WigleStatistics()
self.last_stat = datetime.now(tz=UTC)
self.ui_counter = 0
def on_loaded(self):
if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None):
logging.debug("WIGLE: api_key isn't set. Can't upload to wigle.net")
logging.info("[WIGLE] plugin loaded.")
def on_config_changed(self, config):
self.api_key = self.options.get("api_key", None)
if not self.api_key:
logging.info("[WIGLE] api_key must be set.")
return
if 'donate' not in self.options:
self.options['donate'] = False
self.donate = self.options.get("donate", False)
self.handshake_dir = config["bettercap"].get("handshakes")
report_filename = os.path.join(self.handshake_dir, ".wigle_uploads")
self.report = StatusFile(report_filename, data_format="json")
self.cache_dir = os.path.join(self.handshake_dir, "cache")
self.cvs_dir = self.options.get("cvs_dir", None)
self.whitelist = config["main"].get("whitelist", [])
self.timeout = self.options.get("timeout", 30)
self.position = self.options.get("position", (10, 10))
self.ready = True
logging.info("WIGLE: ready")
logging.info("[WIGLE] Ready for wardriving!!!")
self.get_statistics(force=True)
def on_internet_available(self, agent):
"""
Called when there's internet connectivity
"""
if not self.ready or self.lock.locked():
return
def on_webhook(self, path, request):
return make_response(redirect("https://www.wigle.net/", code=302))
from scapy.all import Scapy_Exception
def get_new_gps_files(self, reported):
all_gps_files = glob(os.path.join(self.handshake_dir, "*.gps.json"))
all_gps_files += glob(os.path.join(self.handshake_dir, "*.geo.json"))
all_gps_files = remove_whitelisted(all_gps_files, self.whitelist)
return set(all_gps_files) - set(reported) - set(self.skip)
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
all_files = os.listdir(handshake_dir)
all_gps_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.gps.json') or filename.endswith('.geo.json')]
all_gps_files = remove_whitelisted(all_gps_files, config['main']['whitelist'])
new_gps_files = set(all_gps_files) - set(reported) - set(self.skip)
if new_gps_files:
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
csv_entries = list()
no_err_entries = list()
for gps_file in new_gps_files:
if gps_file.endswith('.gps.json'):
pcap_filename = gps_file.replace('.gps.json', '.pcap')
if gps_file.endswith('.geo.json'):
pcap_filename = gps_file.replace('.geo.json', '.pcap')
@staticmethod
def get_pcap_filename(gps_file):
pcap_filename = re.sub(r"\.(geo|gps)\.json$", ".pcap", gps_file)
if not os.path.exists(pcap_filename):
logging.debug("WIGLE: Can't find pcap for %s", gps_file)
self.skip.append(gps_file)
continue
logging.debug("[WIGLE] Can't find pcap for %s", gps_file)
return None
return pcap_filename
@staticmethod
def extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try:
gps_data = _extract_gps_data(gps_file)
except OSError as os_err:
logging.debug("WIGLE: %s", os_err)
self.skip.append(gps_file)
continue
except json.JSONDecodeError as json_err:
logging.debug("WIGLE: %s", json_err)
self.skip.append(gps_file)
continue
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
logging.debug("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
self.skip.append(gps_file)
continue
if path.endswith(".geo.json"):
with open(path, "r") as json_file:
tempJson = json.load(json_file)
d = datetime.fromtimestamp(int(tempJson["ts"]), tz=UTC)
return {
"Latitude": tempJson["location"]["lat"],
"Longitude": tempJson["location"]["lng"],
"Altitude": 10,
"Accuracy": tempJson["accuracy"],
"Updated": d.strftime("%Y-%m-%dT%H:%M:%S.%f"),
}
with open(path, "r") as json_file:
return json.load(json_file)
except (OSError, json.JSONDecodeError) as exp:
raise exp
def get_gps_data(self, gps_file):
try:
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
gps_data = self.extract_gps_data(gps_file)
except (OSError, json.JSONDecodeError) as exp:
logging.debug(f"[WIGLE] Error while extracting GPS data: {exp}")
return None
if gps_data["Latitude"] == 0 and gps_data["Longitude"] == 0:
logging.debug(f"[WIGLE] Not enough gps data for {gps_file}. Next time.")
return None
return gps_data
def get_pcap_data(self, pcap_filename):
try:
if cache := read_ap_cache(self.cache_dir, self.pcap_filename):
logging.info(f"[WIGLE] Using cache for {pcap_filename}")
return {
WifiInfo.BSSID: cache["mac"],
WifiInfo.ESSID: cache["hostname"],
WifiInfo.ENCRYPTION: cache["encryption"],
WifiInfo.CHANNEL: cache["channel"],
WifiInfo.FREQUENCY: cache["frequency"],
WifiInfo.RSSI: cache["rssi"],
}
except (AttributeError, KeyError):
pass
try:
pcap_data = extract_from_pcap(
pcap_filename,
[
WifiInfo.BSSID,
WifiInfo.ESSID,
WifiInfo.ENCRYPTION,
WifiInfo.CHANNEL,
WifiInfo.RSSI])
WifiInfo.FREQUENCY,
WifiInfo.RSSI,
],
)
logging.debug(f"[WIGLE] PCAP data for {pcap_filename}: {pcap_data}")
except FieldNotFoundError:
logging.debug("WIGLE: Could not extract all information. Skip %s", gps_file)
self.skip.append(gps_file)
continue
logging.debug(f"[WIGLE] Cannot extract all data: {pcap_filename} (skipped)")
return None
except Scapy_Exception as sc_e:
logging.debug("WIGLE: %s", sc_e)
self.skip.append(gps_file)
continue
new_entry = _transform_wigle_entry(gps_data, pcap_data, self.__version__)
csv_entries.append(new_entry)
no_err_entries.append(gps_file)
if csv_entries:
display.on_uploading('wigle.net')
logging.debug(f"[WIGLE] {sc_e}")
return None
return pcap_data
def generate_csv(self, data):
date = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{pwnagotchi.name()}_{date}.csv"
content = StringIO()
# write kismet header + header
content.write(
f"WigleWifi-1.6,appRelease={self.__version__},model=pwnagotchi,release={__pwnagotchi_version__},"
f"device={pwnagotchi.name()},display=kismet,board=RaspberryPi,brand=pwnagotchi,star=Sol,body=3,subBody=0\n"
f"MAC,SSID,AuthMode,FirstSeen,Channel,Frequency,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,RCOIs,MfgrId,Type\n"
)
writer = csv.writer(content, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\")
for gps_data, pcap_data in data: # write WIFIs
try:
_send_to_wigle(csv_entries, self.options['api_key'], donate=self.options['donate'])
reported += no_err_entries
self.report.update(data={'reported': reported})
logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries))
except requests.exceptions.RequestException as re_e:
self.skip += no_err_entries
logging.debug("WIGLE: Got an exception while uploading %s", re_e)
except OSError as os_e:
self.skip += no_err_entries
logging.debug("WIGLE: Got the following error: %s", os_e)
timestamp = datetime.strptime(
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%dT%H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
timestamp = datetime.strptime(
gps_data["Updated"].rsplit(".")[0], "%Y-%m-%d %H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S")
writer.writerow(
[
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
f"[{']['.join(pcap_data[WifiInfo.ENCRYPTION])}]",
timestamp,
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.FREQUENCY],
pcap_data[WifiInfo.RSSI],
gps_data["Latitude"],
gps_data["Longitude"],
gps_data["Altitude"],
gps_data["Accuracy"],
"", # RCOIs to populate
"", # MfgrId always empty
"WIFI",
]
)
content.seek(0)
return filename, content
def save_to_file(self, cvs_filename, cvs_content):
if not self.cvs_dir:
return
filename = os.path.join(self.cvs_dir, cvs_filename)
logging.info(f"[WIGLE] Saving to file {filename}")
try:
with open(filename, mode="w") as f:
f.write(cvs_content.getvalue())
except Exception as exp:
logging.error(f"[WIGLE] Error while writing CSV file(skipping): {exp}")
def post_wigle(self, reported, cvs_filename, cvs_content, no_err_entries):
try:
json_res = requests.post(
"https://api.wigle.net/api/v2/file/upload",
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
data={"donate": "on" if self.donate else "false"},
files=dict(file=(cvs_filename, cvs_content, "text/csv")),
timeout=self.timeout,
).json()
if not json_res["success"]:
raise requests.exceptions.RequestException(json_res["message"])
reported += no_err_entries
self.report.update(data={"reported": reported})
logging.info(f"[WIGLE] Successfully uploaded {len(no_err_entries)} wifis")
except (requests.exceptions.RequestException, OSError) as exp:
self.skip += no_err_entries
logging.debug(f"[WIGLE] Exception while uploading: {exp}")
def upload_new_handshakes(self, reported, new_gps_files, agent):
logging.info("[WIGLE] Uploading new handshakes to wigle.net")
csv_entries, no_err_entries = list(), list()
for gps_file in new_gps_files:
logging.info(f"[WIGLE] Processing {os.path.basename(gps_file)}")
if (
(pcap_filename := self.get_pcap_filename(gps_file))
and (gps_data := self.get_gps_data(gps_file))
and (pcap_data := self.get_pcap_data(pcap_filename))
):
csv_entries.append((gps_data, pcap_data))
no_err_entries.append(gps_file)
else:
self.skip.append(gps_file)
logging.info(f"[WIGLE] Wifi to upload: {len(csv_entries)}")
if csv_entries:
cvs_filename, cvs_content = self.generate_csv(csv_entries)
self.save_to_file(cvs_filename, cvs_content)
display = agent.view()
display.on_uploading("wigle.net")
self.post_wigle(reported, cvs_filename, cvs_content, no_err_entries)
display.on_normal()
def request_statistics(self, url):
try:
return requests.get(
url,
headers={
"Authorization": f"Basic {self.api_key}",
"Accept": "application/json",
},
timeout=self.timeout,
).json()
except (requests.exceptions.RequestException, OSError) as exp:
return None
def get_user_statistics(self):
json_res = self.request_statistics(
"https://api.wigle.net/api/v2/stats/user",
)
if json_res and json_res["success"]:
self.statistics.update_user(json_res)
def get_usergroup_statistics(self):
if not self.statistics.username or self.statistics.groupID:
return
url = f"https://api.wigle.net/api/v2/group/groupForUser/{self.statistics.username}"
if json_res := self.request_statistics(url):
self.statistics.update_user_group(json_res)
def get_group_statistics(self):
if not self.statistics.groupID:
return
json_res = self.request_statistics("https://api.wigle.net/api/v2/stats/group")
if json_res and json_res["success"]:
self.statistics.update_group(json_res)
def get_statistics(self, force=False):
if force or (datetime.now(tz=UTC) - self.last_stat).total_seconds() > 30:
self.last_stat = datetime.now(tz=UTC)
self.get_user_statistics()
self.get_usergroup_statistics()
self.get_group_statistics()
def on_internet_available(self, agent):
if not self.ready:
return
with self.lock:
reported = self.report.data_field_or("reported", default=list())
if new_gps_files := self.get_new_gps_files(reported):
self.upload_new_handshakes(reported, new_gps_files, agent)
else:
self.get_statistics()
def on_ui_setup(self, ui):
with ui._lock:
ui.add_element(
"wigle",
Text(value="-", position=self.position, font=fonts.Small, color=BLACK),
)
def on_unload(self, ui):
with ui._lock:
try:
ui.remove_element("wigle")
except KeyError:
pass
def on_ui_update(self, ui):
with ui._lock:
if not (self.ready and self.statistics.ready):
ui.set("wigle", "We Will Wait Wigle")
return
msg = "-"
self.ui_counter = (self.ui_counter + 1) % 6
if self.ui_counter == 0:
msg = f"User:{self.statistics.username}"
if self.ui_counter == 1:
msg = f"Rank:{self.statistics.rank} Month:{self.statistics.monthrank}"
elif self.ui_counter == 2:
msg = f"{self.statistics.discoveredwiFi} discovered WiFis"
elif self.ui_counter == 3:
msg = f"Last upl.:{self.statistics.last}"
elif self.ui_counter == 4:
msg = f"Grp:{self.statistics.groupname}"
elif self.ui_counter == 5:
msg = f"Grp rank:{self.statistics.grouprank}"
ui.set("wigle", msg)

View File

@ -1,7 +1,6 @@
import os
import logging
import requests
import subprocess
from datetime import datetime
from threading import Lock
from pwnagotchi.utils import StatusFile, remove_whitelisted

View File

@ -319,7 +319,7 @@ class Display(View):
def image(self):
img = None
if self._canvas is not None:
img = self._canvas if self._rotation == 0 else self._canvas.rotate(-self._rotation)
img = self._canvas if self._rotation == 0 else self._canvas.rotate(-self._rotation, expand=True)
return img
def _render_thread(self):
@ -338,7 +338,7 @@ class Display(View):
logging.error("%s" % e)
if self._enabled:
self._canvas = (img if self._rotation == 0 else img.rotate(self._rotation))
self._canvas = (img if self._rotation == 0 else img.rotate(self._rotation, expand=True))
if self._implementation is not None:
self._canvas_next = self._canvas
self._canvas_next_event.set()

View File

@ -7,6 +7,7 @@ from pwnagotchi.ui.hw.base import DisplayImpl
class DummyDisplay(DisplayImpl):
def __init__(self, config):
super(DummyDisplay, self).__init__(config, 'DummyDisplay')
self._display = self
def layout(self):
width = 480 if 'width' not in self.config else self.config['width']
@ -25,7 +26,7 @@ class DummyDisplay(DisplayImpl):
self._layout['friend_name'] = (int(width/12), int(height/10))
self._layout['shakes'] = (0, height-int(height/25))
self._layout['mode'] = (width-int(width/8), height - int (height/25))
lw, lh = fonts.Small.getbbox("W")
lw, lh = fonts.Small.getsize("W")
self._layout['status'] = {
'pos': (int(width/48), int(height/3)),
'font': fonts.status_font(fonts.Small),

View File

@ -49,8 +49,14 @@ class View(object):
self._voice = Voice(lang=config['main']['lang'])
self._implementation = impl
self._layout = impl.layout()
self._rotation = config['ui']['display'].get('rotation',0)
if (self._rotation/90)%2 == 0:
self._width = self._layout['width']
self._height = self._layout['height']
else:
self._width = self._layout['height']
self._height = self._layout['width']
self._state = State(state={
'channel': LabeledValue(color=BLACK, label='CH', value='00', position=self._layout['channel'],
label_font=fonts.Bold,

View File

@ -5,61 +5,13 @@ import subprocess
import json
import shutil
import toml
import sys
import re
import tomlkit
from toml.encoder import TomlEncoder, _dump_str
from zipfile import ZipFile
from datetime import datetime
from enum import Enum
class DottedTomlEncoder(TomlEncoder):
"""
Dumps the toml into the dotted-key format
"""
def __init__(self, _dict=dict):
super(DottedTomlEncoder, self).__init__(_dict)
def dump_list(self, v):
retval = "["
# 1 line if its just 1 item; therefore no newline
if len(v) > 1:
retval += "\n"
for u in v:
retval += " " + str(self.dump_value(u)) + ",\n"
# 1 line if its just 1 item; remove newline
if len(v) <= 1:
retval = retval.rstrip("\n")
retval += "]"
return retval
def dump_sections(self, o, sup):
retstr = ""
pre = ""
if sup:
pre = sup + "."
for section, value in o.items():
section = str(section)
qsection = section
if not re.match(r'^[A-Za-z0-9_-]+$', section):
qsection = _dump_str(section)
if value is not None:
if isinstance(value, dict):
toadd, _ = self.dump_sections(value, pre + qsection)
retstr += toadd
# separte sections
if not retstr.endswith('\n\n'):
retstr += '\n'
else:
retstr += (pre + qsection + " = " + str(self.dump_value(value)) + '\n')
return retstr, self._dict()
def parse_version(version):
"""
Converts a version str to tuple, so that versions can be compared
@ -150,7 +102,8 @@ def keys_to_str(data):
def save_config(config, target):
with open(target, 'wt') as fp:
fp.write(toml.dumps(config, encoder=DottedTomlEncoder()))
fp.write(tomlkit.dumps(config))
#fp.write(toml.dumps(config, encoder=DottedTomlEncoder()))
return True
@ -198,9 +151,33 @@ def load_config(args):
print("!!! file in %s is different than release defaults, overwriting !!!" % args.config)
shutil.copy(ref_defaults_file, args.config)
def load_toml_file(filename):
"""Load toml data from a file. Use toml for dotted, tomlkit for nice formatted"""
with open(filename) as fp:
text = fp.read()
# look for "[main]". if not there, then load
# dotted toml with toml instead of tomlkit
if text.find("[main]") != -1:
return tomlkit.loads(text)
else:
print("Converting dotted toml %s: %s" % (filename, text[0:100]))
import toml
data = toml.loads(text)
# save original as a backup
try:
backup = filename + ".ORIG"
os.rename(filename, backup)
with open(filename, "w") as fp2:
tomlkit.dump(data, fp2)
print("Converted to new format. Original saved at %s" % backup)
except Exception as e:
print("Unable to convert %s to new format: %s" % (backup, e))
return data
# load the defaults
with open(args.config) as fp:
config = toml.load(fp)
config = load_toml_file(args.config)
# load the user config
try:
@ -216,10 +193,10 @@ def load_config(args):
# convert int/float keys to str
user_config = keys_to_str(user_config)
# convert to toml but use loaded yaml
toml.dump(user_config, toml_file)
# toml.dump(user_config, toml_file)
tomlkit.dump(user_config, toml_file)
elif os.path.exists(args.user_config):
with open(args.user_config) as toml_file:
user_config = toml.load(toml_file)
user_config = load_toml_file(args.user_config)
if user_config:
config = merge_config(user_config, config)
@ -232,8 +209,7 @@ def load_config(args):
if dropin and os.path.isdir(dropin):
dropin += '*.toml' if dropin.endswith('/') else '/*.toml' # only toml here; yaml is no more
for conf in glob.glob(dropin):
with open(conf) as toml_file:
additional_config = toml.load(toml_file)
additional_config = load_toml_file(conf)
config = merge_config(additional_config, config)
# the very first step is to normalize the display name, so we don't need dozens of if/elif around
@ -564,7 +540,8 @@ class WifiInfo(Enum):
ESSID = 1
ENCRYPTION = 2
CHANNEL = 3
RSSI = 4
FREQUENCY = 4
RSSI = 5
class FieldNotFoundError(Exception):
@ -594,9 +571,6 @@ def extract_from_pcap(path, fields):
"""
results = dict()
for field in fields:
if not isinstance(field, WifiInfo):
raise TypeError("Invalid field")
subtypes = set()
if field == WifiInfo.BSSID:
@ -606,8 +580,7 @@ def extract_from_pcap(path, fields):
packets = sniff(offline=path, filter=bpf_filter)
try:
for packet in packets:
if packet.haslayer(Dot11Beacon):
if hasattr(packet[Dot11], 'addr3'):
if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11], 'addr3'):
results[field] = packet[Dot11].addr3
break
else: # magic
@ -654,6 +627,14 @@ def extract_from_pcap(path, fields):
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
except Exception:
raise FieldNotFoundError("Could not find field [CHANNEL]")
elif field == WifiInfo.FREQUENCY:
from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
packets = sniff(offline=path, count=1)
try:
results[field] = packets[0][RadioTap].ChannelFrequency
except Exception:
raise FieldNotFoundError("Could not find field [FREQUENCY]")
elif field == WifiInfo.RSSI:
from scapy.layers.dot11 import sniff, RadioTap
from pwnagotchi.mesh.wifi import freq_to_channel
@ -662,7 +643,8 @@ def extract_from_pcap(path, fields):
results[field] = packets[0][RadioTap].dBm_AntSignal
except Exception:
raise FieldNotFoundError("Could not find field [RSSI]")
else:
raise TypeError("Invalid field")
return results

View File

@ -27,11 +27,19 @@ class Voice:
self._('Hack the Planet!'),
self._('No more mister Wi-Fi!!'),
self._('Pretty fly 4 a Wi-Fi!'),
self._('Good Pwning!'), # Battlestar Galactica
self._('Ensign, Engage!'), # Star trek
self._('Free your Wi-Fi!'), # Matrix
self._('Chevron Seven, locked.'), # Stargate
self._('May the Wi-fi be with you'), # Star wars
])
def on_keys_generation(self):
return random.choice([
self._('Generating keys, do not turn off ...')])
self._('Generating keys, do not turn off ...'),
self._('Are you the keymaster?'), # Ghostbusters
self._('I am the keymaster!'), # Ghostbusters
])
def on_normal(self):
return random.choice([
@ -44,7 +52,6 @@ class Voice:
def on_reading_logs(self, lines_so_far=0):
if lines_so_far == 0:
return self._('Reading last session logs ...')
else:
return self._('Read {lines_so_far} log lines so far ...').format(lines_so_far=lines_so_far)
def on_bored(self):
@ -53,7 +60,11 @@ class Voice:
self._('Let\'s go for a walk!')])
def on_motivated(self, reward):
return self._('This is the best day of my life!')
return random.choice([
self._('This is the best day of my life!'),
self._('All your base are belong to us'),
self._('Fascinating!'), # Star trek
])
def on_demotivated(self, reward):
return self._('Shitty day :/')
@ -63,6 +74,8 @@ class Voice:
self._('I\'m extremely bored ...'),
self._('I\'m very sad ...'),
self._('I\'m sad'),
self._('I\'m so happy ...'), # Marvin in H2G2
self._('Life? Don\'t talk to me about life.'), # Also Marvin in H2G2
'...'])
def on_angry(self):
@ -78,13 +91,13 @@ class Voice:
self._('I pwn therefore I am.'),
self._('So many networks!!!'),
self._('I\'m having so much fun!'),
self._('It\'s a Wi-Fi system! I know this!'), # Jurassic park
self._('My crime is that of curiosity ...')])
def on_new_peer(self, peer):
if peer.first_encounter():
return random.choice([
self._('Hello {name}! Nice to meet you.').format(name=peer.name())])
else:
return random.choice([
self._('Yo {name}! Sup?').format(name=peer.name()),
self._('Hey {name} how are you doing?').format(name=peer.name()),
@ -104,19 +117,23 @@ class Voice:
def on_grateful(self):
return random.choice([
self._('Good friends are a blessing!'),
self._('I love my friends!')])
self._('I love my friends!')
])
def on_lonely(self):
return random.choice([
self._('Nobody wants to play with me ...'),
self._('I feel so alone ...'),
self._('Let\'s find friends'),
self._('Where\'s everybody?!')])
def on_napping(self, secs):
return random.choice([
self._('Napping for {secs}s ...').format(secs=secs),
self._('Zzzzz'),
self._('ZzzZzzz ({secs}s)').format(secs=secs)])
self._('Snoring ...'),
self._('ZzzZzzz ({secs}s)').format(secs=secs),
])
def on_shutdown(self):
return random.choice([
@ -124,12 +141,17 @@ class Voice:
self._('Zzz')])
def on_awakening(self):
return random.choice(['...', '!'])
return random.choice([
'...',
'!',
'Hello World!',
self._('I dreamed of electric sheep'),
])
def on_waiting(self, secs):
return random.choice([
self._('Waiting for {secs}s ...').format(secs=secs),
'...',
self._('Waiting for {secs}s ...').format(secs=secs),
self._('Looking around ({secs}s)').format(secs=secs)])
def on_assoc(self, ap):
@ -138,12 +160,16 @@ class Voice:
return random.choice([
self._('Hey {what} let\'s be friends!').format(what=what),
self._('Associating to {what}').format(what=what),
self._('Yo {what}!').format(what=what)])
self._('Yo {what}!').format(what=what),
self._('Rise and Shine Mr. {what}!').format(what=what), # Half Life
])
def on_deauth(self, sta):
return random.choice([
self._('Just decided that {mac} needs no WiFi!').format(mac=sta['mac']),
self._('Just decided that {mac} needs no Wi-Fi!').format(mac=sta['mac']),
self._('Deauthenticating {mac}').format(mac=sta['mac']),
self._('No more Wi-Fi for {mac}').format(mac=sta['mac']),
self._('It\'s a trap! {mac}').format(mac=sta['mac']), # Star wars
self._('Kickbanning {mac}!').format(mac=sta['mac'])])
def on_handshakes(self, new_shakes):
@ -155,10 +181,19 @@ class Voice:
return self._('You have {count} new message{plural}!').format(count=count, plural=s)
def on_rebooting(self):
return self._("Oops, something went wrong ... Rebooting ...")
return random.choice([
self._("Oops, something went wrong ... Rebooting ..."),
self._("Have you tried turning it off and on again?"), # The IT crew
self._("I\'m afraid Dave"), # 2001 Space Odyssey
self._("I\'m dead, Jim!"), # Star Trek
self._("I have a bad feeling about this"), # Star wars
])
def on_uploading(self, to):
return self._("Uploading data to {to} ...").format(to=to)
return random.choice([
self._("Uploading data to {to} ...").format(to=to),
self._("Beam me up to {to}").format(to=to),
])
def on_downloading(self, name):
return self._("Downloading from {name} ...").format(name=name)

View File

@ -9,7 +9,7 @@ dependencies = [
"PyYAML", "dbus-python", "file-read-backwards", "flask", "flask-cors",
"flask-wtf", "gast", "gpiozero", "inky", "numpy", "pycryptodome", "pydrive2", "python-dateutil",
"requests", "rpi-lgpio", "rpi_hardware_pwm", "scapy", "setuptools", "shimmy", "smbus", "smbus2",
"spidev", "toml", "tweepy", "websockets", "pisugar",
"spidev", "tomlkit", "toml", "tweepy", "websockets", "pisugar",
]
requires-python = ">=3.11"

142
scripts/BR.bat Normal file
View File

@ -0,0 +1,142 @@
@echo off
setlocal
REM ---------------------------------------------------------------------------
REM A simple .bat script that handles two subcommands: "backup" and "restore"
REM USAGE:
REM backup-restore.bat backup [ -n HOST ] [ -u USER ] [ -o OUTPUT ]
REM backup-restore.bat restore [ -n HOST ] [ -u USER ] [ -b BACKUP_FILE ]
REM ---------------------------------------------------------------------------
set "LOGFILE=%~dp0BR_GPT.log"
echo [INFO] Script started at %DATE% %TIME% > "%LOGFILE%"
REM ---------------------------------------------------------------------------
REM Check for Arguments
if "%~1"=="" goto usage
set "SUBCOMMAND=%~1"
shift
REM Default Values
set "UNIT_HOSTNAME=10.0.0.2"
set "UNIT_USERNAME=pi"
set "OUTPUT="
set "BACKUP_FILE="
:parse_args
if "%~1"=="" goto done_args
if "%~1"=="-n" (
set "UNIT_HOSTNAME=%~2"
shift
shift
goto parse_args
)
if "%~1"=="-u" (
set "UNIT_USERNAME=%~2"
shift
shift
goto parse_args
)
if "%SUBCOMMAND%"=="backup" (
if "%~1"=="-o" (
set "OUTPUT=%~2"
shift
shift
goto parse_args
)
)
if "%SUBCOMMAND%"=="restore" (
if "%~1"=="-b" (
set "BACKUP_FILE=%~2"
shift
shift
goto parse_args
)
)
echo [ERROR] Unknown or invalid argument: %~1 >> "%LOGFILE%"
goto usage
:done_args
REM ---------------------------------------------------------------------------
REM Subcommand Routing
if "%SUBCOMMAND%"=="backup" goto do_backup
if "%SUBCOMMAND%"=="restore" goto do_restore
goto usage
REM ===========================================================================
:do_backup
if "%OUTPUT%"=="" (
set /a RAND=%random%
set "OUTPUT=%UNIT_HOSTNAME%-backup-%RAND%.tgz"
)
echo [INFO] Checking connectivity to %UNIT_HOSTNAME% >> "%LOGFILE%"
ping -n 1 %UNIT_HOSTNAME% >nul 2>&1
if errorlevel 1 (
echo [ERROR] Device %UNIT_HOSTNAME% is unreachable. >> "%LOGFILE%"
exit /b 1
)
echo [INFO] Backing up %UNIT_HOSTNAME% to %OUTPUT% ... >> "%LOGFILE%"
set "FILES_TO_BACKUP=/root/settings.yaml /root/client_secrets.json /root/.ssh /root/.api-report.json /root/.bashrc /root/.profile /home/pi/handshakes /root/peers /etc/pwnagotchi/ /usr/local/share/pwnagotchi/custom-plugins /etc/ssh/ /home/pi/.bashrc /home/pi/.profile /home/pi/.wpa_sec_uploads"
ssh %UNIT_USERNAME%@%UNIT_HOSTNAME% "sudo tar -cf - %FILES_TO_BACKUP% | gzip -9" > "%OUTPUT%" 2>> "%LOGFILE%"
if errorlevel 1 (
echo [ERROR] Backup failed! >> "%LOGFILE%"
exit /b 1
)
echo [INFO] Backup completed successfully: %OUTPUT% >> "%LOGFILE%"
exit /b 0
REM ===========================================================================
:do_restore
if "%BACKUP_FILE%"=="" (
for /f "delims=" %%A in ('dir /b /a-d /O-D "%UNIT_HOSTNAME%-backup-*.tgz" 2^>nul') do (
set "BACKUP_FILE=%%A"
goto found_file
)
echo [ERROR] No backup file found. >> "%LOGFILE%"
exit /b 1
:found_file
echo [INFO] Found backup file: %BACKUP_FILE% >> "%LOGFILE%"
)
echo [INFO] Checking connectivity to %UNIT_HOSTNAME% >> "%LOGFILE%"
ping -n 1 %UNIT_HOSTNAME% > nul 2>&1
if errorlevel 1 (
echo [ERROR] Device %UNIT_HOSTNAME% is unreachable. >> "%LOGFILE%"
exit /b 1
)
echo [INFO] Copying backup file to remote device... >> "%LOGFILE%"
scp "%BACKUP_FILE%" %UNIT_USERNAME%@%UNIT_HOSTNAME%:/tmp/ 2>> "%LOGFILE%"
if errorlevel 1 (
echo [ERROR] Failed to copy backup file. >> "%LOGFILE%"
exit /b 1
)
echo [INFO] Restoring %BACKUP_FILE% on %UNIT_HOSTNAME% >> "%LOGFILE%"
ssh %UNIT_USERNAME%@%UNIT_HOSTNAME% "sudo tar xzvf /tmp/%BACKUP_FILE% -C /" 2>> "%LOGFILE%"
if errorlevel 1 (
echo [ERROR] Restore failed! >> "%LOGFILE%"
exit /b 1
)
echo [INFO] Restore completed successfully. >> "%LOGFILE%"
exit /b 0
REM ===========================================================================
:usage
echo Usage:
echo BR_GPT.bat backup [ -n HOST ] [ -u USER ] [ -o OUTPUT ]
echo BR_GPT.bat restore [ -n HOST ] [ -u USER ] [ -b BACKUP_FILE ]
exit /b 1

197
scripts/BR.sh Normal file
View File

@ -0,0 +1,197 @@
#!/usr/bin/env bash
#
# A Combined backup/restore script for linux
# Usage:
# backup-restore.sh backup [ -n HOST ] [ -u USER ] [ -o OUTPUT ]
# backup-restore.sh restore [ -n HOST ] [ -u USER ] [ -b BACKUP_FILE ]
#
###############################################################################
# GLOBAL USAGE
###############################################################################
usage() {
echo "Usage:"
echo " $0 backup [ -n HOST ] [ -u USER ] [ -o OUTPUT ]"
echo " $0 restore [ -n HOST ] [ -u USER ] [ -b BACKUP_FILE ]"
echo
echo "Subcommands:"
echo " backup Backup files from the remote device."
echo " restore Restore files to the remote device."
echo
echo "Common Options:"
echo " -n HOST Hostname or IP of remote device (default: 10.0.0.2)"
echo " -u USER Username for SSH (default: pi)"
echo
echo "Options for 'backup':"
echo " -o OUTPUT Path/name of the output archive (default: \$HOST-backup-\$(date +%s).tgz)"
echo
echo "Options for 'restore':"
echo " -b BACKUP Path to the local backup archive to restore."
echo
echo "Examples:"
echo " $0 backup"
echo " $0 backup -n 10.0.0.2 -u pi -o my-backup.tgz"
echo " $0 restore -b 10.0.0.2-backup-123456789.tgz"
echo
}
###############################################################################
# BACKUP FUNCTION
###############################################################################
do_backup() {
# Defaults
UNIT_HOSTNAME="10.0.0.2"
UNIT_USERNAME="pi"
OUTPUT=""
# Parse arguments specific to backup
while getopts "hn:u:o:" arg; do
case $arg in
h)
usage
exit 0
;;
n)
UNIT_HOSTNAME=$OPTARG
;;
u)
UNIT_USERNAME=$OPTARG
;;
o)
OUTPUT=$OPTARG
;;
*)
usage
exit 1
;;
esac
done
# If OUTPUT was not specified, set a default
if [[ -z "$OUTPUT" ]]; then
OUTPUT="${UNIT_HOSTNAME}-backup-$(date +%s).tgz"
fi
# List of files/directories to backup
# (the same list you used in your backup.sh script)
FILES_TO_BACKUP="
/root/settings.yaml
/root/client_secrets.json
/root/.api-report.json
/root/.ssh
/root/.bashrc
/root/.profile
/home/pi/handshakes
/root/peers
/etc/pwnagotchi/
/usr/local/share/pwnagotchi/custom-plugins
/etc/ssh/
/home/pi/.bashrc
/home/pi/.profile
/home/pi/.wpa_sec_uploads
"
# Convert multiline variable into a space-separated list
FILES_TO_BACKUP=$(echo "$FILES_TO_BACKUP" | tr '\n' ' ')
echo "@ Checking connectivity to $UNIT_HOSTNAME ..."
if ! ping -c 1 "$UNIT_HOSTNAME" &>/dev/null; then
echo "@ unit ${UNIT_HOSTNAME} can't be reached, check network or USB interface IP."
exit 1
fi
echo "@ Backing up ${UNIT_HOSTNAME} to ${OUTPUT} ..."
# -n => do not read from stdin
ssh -n "${UNIT_USERNAME}@${UNIT_HOSTNAME}" \
"sudo tar -cf - ${FILES_TO_BACKUP}" \
| gzip -9 > "${OUTPUT}"
echo "@ Backup finished. Archive: ${OUTPUT}"
}
###############################################################################
# RESTORE FUNCTION
###############################################################################
do_restore() {
# Defaults
UNIT_HOSTNAME="10.0.0.2"
UNIT_USERNAME="pi"
BACKUP_FILE=""
# Parse arguments specific to restore
while getopts "hn:u:b:" arg; do
case $arg in
h)
usage
exit 0
;;
n)
UNIT_HOSTNAME=$OPTARG
;;
u)
UNIT_USERNAME=$OPTARG
;;
b)
BACKUP_FILE=$OPTARG
;;
*)
usage
exit 1
;;
esac
done
# If no backup file given, try to find the latest
if [[ -z "$BACKUP_FILE" ]]; then
BACKUP_FILE=$(ls -rt "${UNIT_HOSTNAME}"-backup-*.tgz 2>/dev/null | tail -n1)
if [[ -z "$BACKUP_FILE" ]]; then
echo "@ Can't find backup file. Please specify one with '-b'."
exit 1
fi
echo "@ Found backup file: $BACKUP_FILE"
echo -n "@ Continue restoring this file? (y/n) "
read -r CONTINUE
CONTINUE=$(echo "${CONTINUE}" | tr "[:upper:]" "[:lower:]")
if [[ "${CONTINUE}" != "y" ]]; then
exit 1
fi
fi
echo "@ Checking connectivity to $UNIT_HOSTNAME ..."
if ! ping -c 1 "$UNIT_HOSTNAME" &>/dev/null; then
echo "@ unit ${UNIT_HOSTNAME} can't be reached, make sure it's connected and a static IP assigned."
exit 1
fi
echo "@ Restoring $BACKUP_FILE to $UNIT_HOSTNAME ..."
cat "${BACKUP_FILE}" | ssh "${UNIT_USERNAME}@${UNIT_HOSTNAME}" "sudo tar xzv -C /"
echo "@ Restore finished."
}
###############################################################################
# MAIN ENTRY POINT
###############################################################################
# We expect the first argument to be either 'backup', 'restore', or '-h'.
SUBCOMMAND=$1
if [[ -z "$SUBCOMMAND" ]]; then
usage
exit 1
fi
shift # Shift past subcommand
case "$SUBCOMMAND" in
backup)
do_backup "$@"
;;
restore)
do_restore "$@"
;;
-h|--help)
usage
exit 0
;;
*)
usage
exit 1
;;
esac

View File

@ -1,67 +0,0 @@
#!/bin/sh
usage() {
echo "Usage: backup.sh [-honu] [-h] [-u user] [-n host name or ip] [-o output]"
}
while getopts "ho:n:u:" arg; do
case $arg in
h)
usage
exit
;;
n)
UNIT_HOSTNAME=$OPTARG
;;
o)
OUTPUT=$OPTARG
;;
u)
UNIT_USERNAME=$OPTARG
;;
*)
usage
exit 1
esac
done
# name of the ethernet gadget interface on the host
UNIT_HOSTNAME=${UNIT_HOSTNAME:-10.0.0.2}
# output backup tgz file
OUTPUT=${OUTPUT:-${UNIT_HOSTNAME}-backup-$(date +%s).tgz}
# username to use for ssh
UNIT_USERNAME=${UNIT_USERNAME:-pi}
# what to backup
FILES_TO_BACKUP="/boot/firmware/cmdline.txt \
/boot/firmware/config.txt \
/root/settings.yaml \
/root/client_secrets.json \
/root/.api-report.json \
/root/.ssh \
/root/.bashrc \
/root/.profile \
/home/pi/handshakes \
/root/peers \
/etc/modprobe.d/g_ether.conf \
/etc/pwnagotchi/ \
/etc/ssh/ \
/etc/pwnagotchi/log/pwnagotchi.log \
/etc/pwnagotchi/log/pwnagotchi*.gz \
/home/pi/.ssh \
/home/pi/.bashrc \
/home/pi/.profile \
/root/.api-report.json \
/root/.auto-update \
/root/.bt-tether* \
/root/.ohc_uploads \
/root/.wigle_uploads \
/home/pi/.wpa_sec_uploads"
ping -c 1 "${UNIT_HOSTNAME}" > /dev/null 2>&1 || {
echo "@ unit ${UNIT_HOSTNAME} can't be reached, make sure it's connected and a static IP assigned to the USB interface."
exit 1
}
echo "@ backing up $UNIT_HOSTNAME to $OUTPUT ..."
# shellcheck disable=SC2029
ssh "${UNIT_USERNAME}@${UNIT_HOSTNAME}" "sudo find ${FILES_TO_BACKUP} -type f -print0 | xargs -0 sudo tar cv" | gzip -9 > "$OUTPUT"

View File

@ -1,60 +0,0 @@
#!/bin/sh
usage() {
echo "Usage: restore.sh [-bhnu] [-h] [-b backup name] [-n host name] [-u user name]"
}
while getopts "hb:n:u:" arg; do
case $arg in
b)
BACKUP=$OPTARG
;;
h)
usage
exit
;;
n)
UNIT_HOSTNAME=$OPTARG
;;
u)
UNIT_USERNAME=$OPTARG
;;
*)
usage
exit 1
esac
done
# name of the ethernet gadget interface on the host
UNIT_HOSTNAME=${UNIT_HOSTNAME:-10.0.0.2}
# output backup tgz file
# shellcheck disable=SC2086
if [ -z $BACKUP ]; then
# shellcheck disable=SC2012
BACKUP=$(ls -rt "${UNIT_HOSTNAME}"-backup-*.tgz 2>/dev/null | tail -n1)
if [ -z "$BACKUP" ]; then
echo "@ Can't find backup file. Please specify one with '-b'"
exit 1
fi
echo "@ Found backup file:"
# shellcheck disable=SC2028
echo "\t${BACKUP}"
# shellcheck disable=SC2039
echo -n "@ continue restroring this file? (y/n) "
# shellcheck disable=SC2162
read CONTINUE
CONTINUE=$(echo "${CONTINUE}" | tr "[:upper:]" "[:lower:]")
if [ "${CONTINUE}" != "y" ]; then
exit 1
fi
fi
# username to use for ssh
UNIT_USERNAME=${UNIT_USERNAME:-pi}
ping -c 1 "${UNIT_HOSTNAME}" > /dev/null 2>&1 || {
echo "@ unit ${UNIT_HOSTNAME} can't be reached, make sure it's connected and a static IP assigned to the USB interface."
exit 1
}
echo "@ restoring $BACKUP to $UNIT_HOSTNAME ..."
# shellcheck disable=SC2002
cat "${BACKUP}" | ssh "${UNIT_USERNAME}@${UNIT_HOSTNAME}" "sudo tar xzv -C /"