mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
@ -98,6 +98,7 @@ def do_auto_mode(agent):
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser = plugins_cmd.add_parsers(parser)
|
||||
parser = google_cmd.add_parsers(parser)
|
||||
|
||||
parser.add_argument('-C', '--config', action='store', dest='config', default='/etc/pwnagotchi/default.toml',
|
||||
help='Main configuration file.')
|
||||
|
60
pwnagotchi/google/cmd.py
Normal file
60
pwnagotchi/google/cmd.py
Normal file
@ -0,0 +1,60 @@
|
||||
# Handles the commandline stuff
|
||||
|
||||
import os
|
||||
import logging
|
||||
import glob
|
||||
import re
|
||||
import shutil
|
||||
from fnmatch import fnmatch
|
||||
from pwnagotchi.utils import download_file, unzip, save_config, parse_version, md5
|
||||
from pwnagotchi.plugins import default_path
|
||||
|
||||
|
||||
def add_parsers(parser):
|
||||
"""
|
||||
Adds the plugins subcommand to a given argparse.ArgumentParser
|
||||
"""
|
||||
subparsers = parser.add_subparsers()
|
||||
# pwnagotchi google
|
||||
parser_plugins = subparsers.add_parser('google')
|
||||
plugin_subparsers = parser_plugins.add_subparsers(dest='googlecmd')
|
||||
|
||||
# pwnagotchi plugins search
|
||||
parser_plugins_search = plugin_subparsers.add_parser('search', help='Search for pwnagotchi plugins')
|
||||
parser_plugins_search.add_argument('pattern', type=str, help="Search expression (wildcards allowed)")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def used_plugin_cmd(args):
|
||||
"""
|
||||
Checks if the plugins subcommand was used
|
||||
"""
|
||||
return hasattr(args, 'plugincmd')
|
||||
|
||||
|
||||
def handle_cmd(args, config):
|
||||
"""
|
||||
Parses the arguments and does the thing the user wants
|
||||
"""
|
||||
if args.plugincmd == 'update':
|
||||
return update(config)
|
||||
elif args.plugincmd == 'search':
|
||||
args.installed = True # also search in installed plugins
|
||||
return list_plugins(args, config, args.pattern)
|
||||
elif args.plugincmd == 'install':
|
||||
return install(args, config)
|
||||
elif args.plugincmd == 'uninstall':
|
||||
return uninstall(args, config)
|
||||
elif args.plugincmd == 'list':
|
||||
return list_plugins(args, config)
|
||||
elif args.plugincmd == 'enable':
|
||||
return enable(args, config)
|
||||
elif args.plugincmd == 'disable':
|
||||
return disable(args, config)
|
||||
elif args.plugincmd == 'upgrade':
|
||||
return upgrade(args, config, args.pattern)
|
||||
elif args.plugincmd == 'edit':
|
||||
return edit(args, config)
|
||||
|
||||
raise NotImplementedError()
|
178
pwnagotchi/plugins/default/gdrivesync.py
Normal file
178
pwnagotchi/plugins/default/gdrivesync.py
Normal file
@ -0,0 +1,178 @@
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
import pwnagotchi.plugins as plugins
|
||||
import pwnagotchi
|
||||
import pydrive2
|
||||
from pydrive2.auth import GoogleAuth
|
||||
from pydrive2.drive import GoogleDrive
|
||||
|
||||
|
||||
class GdriveSync(plugins.Plugin):
|
||||
__author__ = '@jayofelony'
|
||||
__version__ = '1.0'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'A plugin to backup various pwnagotchi files and folders to Google Drive. Once every hour from loading plugin.'
|
||||
__dependencies__ = {
|
||||
'pip': ['pydrive2']
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.internet = False
|
||||
self.ready = False
|
||||
self.drive = None
|
||||
self.last_upload_timestamp = time.time()
|
||||
self.backup = True
|
||||
self.backupfiles = [
|
||||
'/root/brain.nn',
|
||||
'/root/brain.json',
|
||||
'/root/.api-report.json',
|
||||
'/root/handshakes',
|
||||
'/root/peers',
|
||||
'/etc/pwnagotchi'
|
||||
]
|
||||
|
||||
# Function to get the folder ID by its name
|
||||
def get_folder_id_by_name(self, drive, folder_name):
|
||||
file_list = drive.ListFile({'q': "mimeType='application/vnd.google-apps.folder' and trashed=false"}).GetList()
|
||||
for file in file_list:
|
||||
if file['title'] == folder_name:
|
||||
return file['id']
|
||||
return None
|
||||
|
||||
def on_loaded(self):
|
||||
# client_secrets.json needs to be in /root
|
||||
if not os.path.exists("/root/client_secrets.json"):
|
||||
logging.error("client_secrets.json not found in /root. Please RTFM!")
|
||||
return
|
||||
# backup file, so we know if there has been a backup made at least once before.
|
||||
if not os.path.exists("/root/.gdrive-backup"):
|
||||
self.backup = False
|
||||
|
||||
try:
|
||||
gauth = GoogleAuth()
|
||||
|
||||
gauth.Authorize()
|
||||
|
||||
# Create GoogleDrive instance
|
||||
self.drive = GoogleDrive(gauth)
|
||||
|
||||
# if backup file does not exist, we will check for PwnagotchiBackups on gdrive.
|
||||
if not self.backup:
|
||||
# Assume 'PwnagotchiBackups' is the folder ID where backups are stored
|
||||
backup_folder_id = 'PwnagotchiBackups'
|
||||
|
||||
# Get the list of files in the folder
|
||||
file_list = self.drive.ListFile({'q': f"'{self.get_folder_id_by_name(self.drive, backup_folder_id)}"
|
||||
f"' in parents and trashed=false"}).GetList()
|
||||
if not file_list:
|
||||
# Handle the case where no files were found
|
||||
logging.warning(f"[gDriveSync] No files found in the folder with ID {backup_folder_id}")
|
||||
if self.config['backupfiles'] is not None:
|
||||
self.backupfiles = self.backupfiles + self.config['backupfiles']
|
||||
self.backup_files(self.backupfiles, '/backup')
|
||||
|
||||
self.upload_to_gdrive('/backup', 'PwnagotchiBackups')
|
||||
self.backup = True
|
||||
|
||||
# Specify the local backup path
|
||||
local_backup_path = '/'
|
||||
|
||||
# Create the local backup directory if it doesn't exist
|
||||
os.makedirs(local_backup_path, exist_ok=True)
|
||||
|
||||
# Download each file in the folder
|
||||
for file in file_list:
|
||||
local_file_path = os.path.join(local_backup_path, file['title'])
|
||||
file.GetContentFile(local_file_path)
|
||||
logging.info(f"[gDriveSync] Downloaded {file['title']} from Google Drive")
|
||||
|
||||
# Optionally, you can use the downloaded files as needed
|
||||
# For example, you can copy them to the corresponding directories
|
||||
self.backup = True
|
||||
with open("/root/.gdrive-backup", "w").close():
|
||||
pass # Create an empty file
|
||||
pwnagotchi.reboot()
|
||||
|
||||
# all set, gdriveSync is ready to run
|
||||
self.ready = True
|
||||
logging.info("[gdrivesync] loaded")
|
||||
except Exception as e:
|
||||
logging.error(f"Error: {e}")
|
||||
self.ready = False
|
||||
|
||||
def on_unload(self, ui):
|
||||
logging.info("[gdrivesync] unloaded")
|
||||
|
||||
def on_internet_available(self, agent):
|
||||
self.internet = True
|
||||
|
||||
def on_handshake(self, agent):
|
||||
if not self.ready:
|
||||
return
|
||||
try:
|
||||
if self.internet:
|
||||
current_timestamp = time.time()
|
||||
# Check if an hour has passed since the last upload
|
||||
if current_timestamp - self.last_upload_timestamp >= 3600:
|
||||
self.last_upload_timestamp = current_timestamp
|
||||
logging.info("[gdrivesync] new handshake captured, backing up to gdrive")
|
||||
if self.config['backupfiles'] is not None:
|
||||
self.backupfiles = self.backupfiles + self.config['backupfiles']
|
||||
self.backup_files(self.backupfiles, '/backup')
|
||||
|
||||
self.upload_to_gdrive('/backup', 'PwnagotchiBackups')
|
||||
display = agent.view()
|
||||
display.update(force=True, new_data={'Backing up to gdrive ...'})
|
||||
except Exception as e:
|
||||
logging.error(f"Error during handshake processing: {e}")
|
||||
|
||||
def backup_files(self, paths, dest_path):
|
||||
for src_path in paths:
|
||||
self.backup_path(src_path, dest_path)
|
||||
|
||||
def backup_path(self, src_path, dest_path):
|
||||
try:
|
||||
if os.path.exists(src_path):
|
||||
dest = os.path.join(dest_path, os.path.basename(src_path))
|
||||
if os.path.isdir(src_path):
|
||||
shutil.copytree(src_path, dest)
|
||||
else:
|
||||
shutil.copy2(src_path, dest)
|
||||
except Exception as e:
|
||||
logging.error(f"Error during backup_path: {e}")
|
||||
|
||||
def upload_to_gdrive(self, backup_path, gdrive_folder):
|
||||
try:
|
||||
existing_folder = self.get_folder_id_by_name(self.drive, gdrive_folder)
|
||||
if existing_folder:
|
||||
folder = self.drive.CreateFile({'id': existing_folder})
|
||||
else:
|
||||
# Create a folder on Google Drive if it doesn't exist
|
||||
folder = self.drive.CreateFile({'title': gdrive_folder, 'mimeType': 'application/vnd.google-apps.folder'})
|
||||
folder.Upload()
|
||||
|
||||
# Upload files to the created folder
|
||||
for root, dirs, files in os.walk(backup_path):
|
||||
for filename in files:
|
||||
file_path = os.path.join(root, filename)
|
||||
gdrive_file = self.drive.CreateFile({'title': filename, 'parents': [{'id': folder['id']}]})
|
||||
gdrive_file.Upload()
|
||||
logging.info(f"[gDriveSync] Uploaded {file_path} to Google Drive")
|
||||
|
||||
logging.info(f"[gDriveSync] Backup uploaded to Google Drive")
|
||||
except pydrive2.files.ApiRequestError as api_error:
|
||||
self.handle_upload_error(api_error, backup_path, gdrive_folder)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error during upload_to_gdrive: {e}")
|
||||
|
||||
def handle_upload_error(self, api_error, backup_path, gdrive_folder):
|
||||
if 'Rate Limit Exceeded' in str(api_error):
|
||||
logging.warning("[gDriveSync] Rate limit exceeded. Waiting for some time before retrying...")
|
||||
# We set to 100 seconds, because there is a limit 20k requests per 100s per user
|
||||
time.sleep(100) # You can adjust the sleep duration based on your needs
|
||||
self.upload_to_gdrive(backup_path, gdrive_folder)
|
||||
else:
|
||||
logging.error(f"[gDriveSync] API Request Error: {api_error}")
|
@ -479,11 +479,13 @@ INDEX = """
|
||||
{% endblock %}
|
||||
"""
|
||||
|
||||
|
||||
def serializer(obj):
|
||||
if isinstance(obj, set):
|
||||
return list(obj)
|
||||
raise TypeError
|
||||
|
||||
|
||||
class WebConfig(plugins.Plugin):
|
||||
__author__ = '33197631+dadav@users.noreply.github.com'
|
||||
__version__ = '1.0.0'
|
||||
@ -513,7 +515,6 @@ class WebConfig(plugins.Plugin):
|
||||
"""
|
||||
logging.info("webcfg: Plugin loaded.")
|
||||
|
||||
|
||||
def on_webhook(self, path, request):
|
||||
"""
|
||||
Serves the current configuration
|
||||
@ -532,7 +533,7 @@ class WebConfig(plugins.Plugin):
|
||||
elif request.method == "POST":
|
||||
if path == "save-config":
|
||||
try:
|
||||
save_config(request.get_json(), '/etc/pwnagotchi/config.toml') # test
|
||||
save_config(request.get_json(), '/etc/pwnagotchi/config.toml') # test
|
||||
_thread.start_new_thread(restart, (self.mode,))
|
||||
return "success"
|
||||
except Exception as ex:
|
||||
@ -547,7 +548,7 @@ class WebConfig(plugins.Plugin):
|
||||
self._agent._config = merge_config(request.get_json(), self._agent._config)
|
||||
logging.debug(" Agent CONFIG:\n%s" % repr(self._agent._config))
|
||||
logging.debug(" Updated CONFIG:\n%s" % request.get_json())
|
||||
save_config(request.get_json(), '/etc/pwnagotchi/config.toml') # test
|
||||
save_config(request.get_json(), '/etc/pwnagotchi/config.toml') # test
|
||||
return "success"
|
||||
except Exception as ex:
|
||||
logging.error("[webcfg mergesave] %s" % ex)
|
||||
|
Reference in New Issue
Block a user