mirror of
https://github.com/jayofelony/pwnagotchi.git
synced 2025-07-01 18:37:27 -04:00
@ -9,6 +9,7 @@ from pydrive2.auth import GoogleAuth
|
|||||||
from pydrive2.drive import GoogleDrive
|
from pydrive2.drive import GoogleDrive
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
from pwnagotchi.utils import StatusFile, parse_version as version_to_tuple
|
from pwnagotchi.utils import StatusFile, parse_version as version_to_tuple
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
|
||||||
class GdriveSync(plugins.Plugin):
|
class GdriveSync(plugins.Plugin):
|
||||||
@ -66,54 +67,59 @@ class GdriveSync(plugins.Plugin):
|
|||||||
# if backup file does not exist, we will check for backup folder on gdrive.
|
# if backup file does not exist, we will check for backup folder on gdrive.
|
||||||
if not self.backup:
|
if not self.backup:
|
||||||
# Use self.options['backup_folder'] as the folder ID where backups are stored
|
# Use self.options['backup_folder'] as the folder ID where backups are stored
|
||||||
backup_folder, root_folder, pwnagotchi_folder = self.create_folder_if_not_exists(self.options['backup_folder'])
|
backup_folder = self.create_folder_if_not_exists(self.options['backup_folder'])
|
||||||
|
|
||||||
# Continue with the rest of the code using backup_folder_id
|
# Continue with the rest of the code using backup_folder_id
|
||||||
root_file_list = self.drive.ListFile({'q': f"'{root_folder}' in parents and mimeType = 'application/vnd.google-apps.folder' and trashed=false"}).GetList()
|
backup_folder_file_list = self.drive.ListFile({'q': f"'{backup_folder}' and mimeType = 'application/vnd.google-apps.folder' and trashed=false"}).GetList()
|
||||||
pwnagotchi_file_list = self.drive.ListFile({'q': f"'{pwnagotchi_folder}' in parents and mimeType = 'application/vnd.google-apps.folder' and trashed=false"}).GetList()
|
if not backup_folder_file_list:
|
||||||
|
|
||||||
if not (root_file_list or pwnagotchi_file_list):
|
|
||||||
# Handle the case where no files were found
|
# Handle the case where no files were found
|
||||||
# logging.warning(f"[gDriveSync] No files found in the folder with ID {root_file_list} and {pwnagotchi_file_list}")
|
# logging.warning(f"[gDriveSync] No files found in the folder with ID {root_file_list} and {pwnagotchi_file_list}")
|
||||||
if self.options['backupfiles'] is not None:
|
if self.options['backupfiles'] is not None:
|
||||||
self.backupfiles = self.backupfiles + self.options['backupfiles']
|
self.backupfiles = self.backupfiles + self.options['backupfiles']
|
||||||
self.backup_files(self.backupfiles, '/backup')
|
self.backup_files(self.backupfiles, '/backup')
|
||||||
|
|
||||||
self.upload_to_gdrive('/backup', self.options['backup_folder'], root_folder, pwnagotchi_folder)
|
self.upload_to_gdrive('/backup', self.options['backup_folder'])
|
||||||
self.backup = True
|
self.backup = True
|
||||||
|
|
||||||
# Specify the local backup path
|
# Specify the local backup path
|
||||||
local_backup_path = '/'
|
local_backup_path = '/home/pi'
|
||||||
|
|
||||||
# Create the local backup directory if it doesn't exist
|
# Create the local backup directory if it doesn't exist
|
||||||
os.makedirs(local_backup_path, exist_ok=True)
|
os.makedirs(local_backup_path, exist_ok=True)
|
||||||
|
|
||||||
# Download each file in the /root folder
|
# Download the zip archive from Google Drive
|
||||||
for root_file in root_file_list:
|
zip_file_id = self.get_latest_backup_file_id(self.options['backup_folder'])
|
||||||
local_file_path = os.path.join(local_backup_path, root_file['title'])
|
if zip_file_id:
|
||||||
root_file.GetContentFile(local_file_path)
|
zip_file = self.drive.CreateFile({'id': zip_file_id})
|
||||||
logging.info(f"[gDriveSync] Downloaded {root_file['title']} from Google Drive")
|
zip_file.GetContentFile(os.path.join(local_backup_path, 'backup.zip'))
|
||||||
|
logging.info("[gDriveSync] Downloaded backup.zip from Google Drive")
|
||||||
|
|
||||||
# Download each file in the /etc/pwnagotchi folder
|
# Extract the zip archive to the root directory
|
||||||
for pwnagotchi_file in pwnagotchi_file_list:
|
with zipfile.ZipFile(os.path.join(local_backup_path, 'backup.zip'), 'r') as zip_ref:
|
||||||
local_file_path = os.path.join(local_backup_path, pwnagotchi_file['title'])
|
zip_ref.extractall('/')
|
||||||
pwnagotchi_file.GetContentFile(local_file_path)
|
|
||||||
logging.info(f"[gDriveSync] Downloaded {pwnagotchi_file['title']} from Google Drive")
|
|
||||||
|
|
||||||
# Optionally, you can use the downloaded files as needed
|
self.status.update()
|
||||||
# For example, you can copy them to the corresponding directories
|
# Reboot so we can start opwngrid with the backup id
|
||||||
self.backup = True
|
pwnagotchi.reboot()
|
||||||
self.status.update()
|
|
||||||
# reboot so we can start opwngrid with backup id
|
|
||||||
pwnagotchi.reboot()
|
|
||||||
|
|
||||||
# all set, gdriveSync is ready to run
|
# all set, gdriveSync is ready to run
|
||||||
self.ready = True
|
self.ready = True
|
||||||
logging.info("[gdrivesync] loaded")
|
logging.info("[gdrivesync] loaded")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error: {e}")
|
logging.error(f"Error: {e}")
|
||||||
self.ready = False
|
self.ready = False
|
||||||
|
|
||||||
|
def get_latest_backup_file_id(self, backup_folder_id):
|
||||||
|
# Retrieve the latest backup file in the Google Drive folder
|
||||||
|
file_list = self.drive.ListFile({'q': f"'{backup_folder_id}' in parents and trashed=false"}).GetList()
|
||||||
|
|
||||||
|
if file_list:
|
||||||
|
# Sort the files by creation date in descending order
|
||||||
|
latest_backup = max(file_list, key=lambda file: file['createdDate'])
|
||||||
|
return latest_backup['id']
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def get_folder_id_by_name(self, drive, folder_name, parent_folder_id=None):
|
def get_folder_id_by_name(self, drive, folder_name, parent_folder_id=None):
|
||||||
query = "mimeType='application/vnd.google-apps.folder' and trashed=false"
|
query = "mimeType='application/vnd.google-apps.folder' and trashed=false"
|
||||||
if parent_folder_id:
|
if parent_folder_id:
|
||||||
@ -137,27 +143,7 @@ class GdriveSync(plugins.Plugin):
|
|||||||
backup_folder_id = backup_folder['id']
|
backup_folder_id = backup_folder['id']
|
||||||
logging.info(f"[gDriveSync] Created folder '{backup_folder_name}' with ID: {backup_folder_id}")
|
logging.info(f"[gDriveSync] Created folder '{backup_folder_name}' with ID: {backup_folder_id}")
|
||||||
|
|
||||||
# Now, try to retrieve or create /*BACKUP_FOLDER*/root
|
return backup_folder_id
|
||||||
root_folder_id = self.get_or_create_subfolder('root', backup_folder_id)
|
|
||||||
|
|
||||||
# Now, try to retrieve or create /*BACKUP_FOLDER*/etc
|
|
||||||
etc_folder_id = self.get_or_create_subfolder('etc', backup_folder_id)
|
|
||||||
|
|
||||||
# Now, try to retrieve or create /*BACKUP_FOLDER*/etc/pwnagotchi
|
|
||||||
pwnagotchi_folder_id = self.get_or_create_subfolder('pwnagotchi', etc_folder_id)
|
|
||||||
|
|
||||||
return backup_folder_id, root_folder_id, pwnagotchi_folder_id # Return the IDs of both root and pwnagotchi folders
|
|
||||||
else:
|
|
||||||
# If found, also try to retrieve or create /*BACKUP_FOLDER*/root
|
|
||||||
root_folder_id = self.get_or_create_subfolder('root', backup_folder_id)
|
|
||||||
|
|
||||||
# Also, try to retrieve or create /*BACKUP_FOLDER*/etc
|
|
||||||
etc_folder_id = self.get_or_create_subfolder('etc', backup_folder_id)
|
|
||||||
|
|
||||||
# Also, try to retrieve or create /*BACKUP_FOLDER*/etc/pwnagotchi
|
|
||||||
pwnagotchi_folder_id = self.get_or_create_subfolder('pwnagotchi', etc_folder_id)
|
|
||||||
|
|
||||||
return backup_folder_id, root_folder_id, pwnagotchi_folder_id # Return the IDs of both root and pwnagotchi folders
|
|
||||||
|
|
||||||
def get_or_create_subfolder(self, subfolder_name, parent_folder_id):
|
def get_or_create_subfolder(self, subfolder_name, parent_folder_id):
|
||||||
# Try to retrieve the subfolder
|
# Try to retrieve the subfolder
|
||||||
@ -202,7 +188,20 @@ class GdriveSync(plugins.Plugin):
|
|||||||
self.backupfiles = self.backupfiles + self.options['backupfiles']
|
self.backupfiles = self.backupfiles + self.options['backupfiles']
|
||||||
self.backup_files(self.backupfiles, '/backup')
|
self.backup_files(self.backupfiles, '/backup')
|
||||||
|
|
||||||
self.upload_to_gdrive('/backup', self.options['backup_folder'])
|
# Create a zip archive of the /backup folder
|
||||||
|
zip_file_path = os.path.join('/home/pi', 'backup.zip')
|
||||||
|
with zipfile.ZipFile(zip_file_path, 'w') as zip_ref:
|
||||||
|
for root, dirs, files in os.walk('/backup'):
|
||||||
|
for file in files:
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
arcname = os.path.relpath(file_path, '/backup')
|
||||||
|
zip_ref.write(file_path, arcname=arcname)
|
||||||
|
|
||||||
|
# Upload the zip archive to Google Drive
|
||||||
|
self.upload_to_gdrive(zip_file_path, self.options['backup_folder'])
|
||||||
|
|
||||||
|
# Cleanup the local zip file
|
||||||
|
os.remove(zip_file_path)
|
||||||
self.status.update()
|
self.status.update()
|
||||||
display = agent.view()
|
display = agent.view()
|
||||||
display.update(force=True, new_data={'Backing up to gdrive ...'})
|
display.update(force=True, new_data={'Backing up to gdrive ...'})
|
||||||
@ -226,32 +225,20 @@ class GdriveSync(plugins.Plugin):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"[gDriveSync] Error during backup_path: {e}")
|
logging.error(f"[gDriveSync] Error during backup_path: {e}")
|
||||||
|
|
||||||
def upload_to_gdrive(self, backup_path, gdrive_folder, root_folder_id, pwnagotchi_folder_id):
|
def upload_to_gdrive(self, backup_path, gdrive_folder):
|
||||||
try:
|
try:
|
||||||
# Upload files to the /root folder
|
# Upload zip-file to google drive
|
||||||
if root_folder_id is not None:
|
# Create a GoogleDriveFile instance for the zip file
|
||||||
root_folder = self.drive.CreateFile({'id': root_folder_id})
|
zip_file = self.drive.CreateFile({'title': 'backup.zip', 'parents': [{'id': gdrive_folder}]})
|
||||||
for root, dirs, files in os.walk('/root'):
|
|
||||||
for filename in files:
|
|
||||||
file_path = os.path.join(root, filename)
|
|
||||||
gdrive_file = self.drive.CreateFile({'title': filename, 'parents': [{'id': root_folder['id']}]})
|
|
||||||
gdrive_file.Upload()
|
|
||||||
logging.info(f"[gDriveSync] Uploaded {file_path} to Google Drive")
|
|
||||||
|
|
||||||
# Upload files to the /etc/pwnagotchi folder
|
# Set the content of the file to the zip file
|
||||||
if pwnagotchi_folder_id is not None:
|
zip_file.SetContentFile(backup_path)
|
||||||
pwnagotchi_folder = self.drive.CreateFile({'id': pwnagotchi_folder_id})
|
|
||||||
for root, dirs, files in os.walk('/etc/pwnagotchi'):
|
|
||||||
for filename in files:
|
|
||||||
file_path = os.path.join(root, filename)
|
|
||||||
gdrive_file = self.drive.CreateFile(
|
|
||||||
{'title': filename, 'parents': [{'id': pwnagotchi_folder['id']}]})
|
|
||||||
gdrive_file.Upload()
|
|
||||||
logging.info(f"[gDriveSync] Uploaded {file_path} to Google Drive")
|
|
||||||
|
|
||||||
|
# Upload the file to Google Drive
|
||||||
|
zip_file.Upload()
|
||||||
logging.info(f"[gDriveSync] Backup uploaded to Google Drive")
|
logging.info(f"[gDriveSync] Backup uploaded to Google Drive")
|
||||||
except pydrive2.files.ApiRequestError as api_error:
|
except pydrive2.files.ApiRequestError as api_error:
|
||||||
self.handle_upload_error(api_error, backup_path, gdrive_folder, root_folder_id, pwnagotchi_folder_id)
|
self.handle_upload_error(api_error, backup_path, gdrive_folder)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"[gDriveSync] Error during upload_to_gdrive: {e}")
|
logging.error(f"[gDriveSync] Error during upload_to_gdrive: {e}")
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user