Update drive.py

This commit is contained in:
Yuvi9587
2025-06-07 02:56:55 +01:00
parent 0940bdb8dd
commit 9701abde5f

107
drive.py
View File

@@ -1,6 +1,6 @@
from mega import Mega # Correctly import the Mega class from mega import Mega
import os import os
import requests # Added import for requests.exceptions import requests
import traceback import traceback
from urllib .parse import urlparse ,urlunparse ,parse_qs ,urlencode from urllib .parse import urlparse ,urlunparse ,parse_qs ,urlencode
@@ -21,50 +21,50 @@ def download_mega_file(mega_link, download_path=".", logger_func=print):
logger_func (callable, optional): Function to use for logging. Defaults to print. logger_func (callable, optional): Function to use for logging. Defaults to print.
""" """
logger_func ("Initializing Mega client...") logger_func ("Initializing Mega client...")
mega_client = Mega() # Instantiate the imported Mega class mega_client =Mega ()
# For public links, login is usually not required.
# If you needed to log in for private files:
# m = mega.login("your_email@example.com", "your_password")
# Or anonymous login for some operations:
m = mega_client.login() # Call login on the instance m =mega_client .login ()
logger_func (f"Attempting to download from: {mega_link }") logger_func (f"Attempting to download from: {mega_link }")
try : try :
# Ensure the download path exists
if not os .path .exists (download_path ): if not os .path .exists (download_path ):
logger_func (f"Download path '{download_path }' does not exist. Creating it...") logger_func (f"Download path '{download_path }' does not exist. Creating it...")
os .makedirs (download_path ,exist_ok =True ) os .makedirs (download_path ,exist_ok =True )
# It's often better to let mega.py handle finding the file from the URL
# and then downloading it.
# The `download_url` method is convenient for public links.
logger_func (f"Starting download to '{download_path }'...") logger_func (f"Starting download to '{download_path }'...")
# mega.py's download_url will download the file to the specified path
# and name it appropriately.
file_handle = m.download_url(mega_link, dest_path=download_path) # m is the logged-in client file_handle =m .download_url (mega_link ,dest_path =download_path )
if file_handle : if file_handle :
# The download_url method itself handles the file saving.
# The return value 'file_handle' might be the filename or a handle,
# depending on the library version and specifics.
# For modern mega.py, it often returns the local path to the downloaded file.
logger_func(f"File downloaded successfully! (Handle: {file_handle})")
# If you need the exact filename it was saved as:
# You might need to inspect what `download_url` returns or list dir contents
# if the filename isn't directly returned in a usable way.
# Often, the library names it based on the Mega file name.
# For this example, we'll assume it's handled and saved in download_path.
# To get the actual filename (as mega.py might rename it)
# We can list the directory and find the newest file if only one download happened
# Or, more robustly, use the information from `m.get_public_url_info(mega_link)`
# to get the expected filename. logger_func (f"File downloaded successfully! (Handle: {file_handle })")
try : try :
info =m .get_public_url_info (mega_link ) info =m .get_public_url_info (mega_link )
filename = info.get('name', 'downloaded_file') # Default if name not found filename =info .get ('name','downloaded_file')
full_download_path =os .path .join (download_path ,filename ) full_download_path =os .path .join (download_path ,filename )
logger_func (f"Downloaded file should be at: {full_download_path }") logger_func (f"Downloaded file should be at: {full_download_path }")
if not os .path .exists (full_download_path ): if not os .path .exists (full_download_path ):
@@ -79,9 +79,9 @@ def download_mega_file(mega_link, download_path=".", logger_func=print):
logger_func (f"Error: Permission denied to write to '{download_path }'. Please check permissions.") logger_func (f"Error: Permission denied to write to '{download_path }'. Please check permissions.")
except FileNotFoundError : except FileNotFoundError :
logger_func (f"Error: The specified download path '{download_path }' is invalid or a component was not found.") logger_func (f"Error: The specified download path '{download_path }' is invalid or a component was not found.")
except ConnectionError as e: # More specific requests exception except ConnectionError as e :
logger_func (f"Error: Connection problem. {e }") logger_func (f"Error: Connection problem. {e }")
except requests.exceptions.RequestException as e: # Catch other requests-related errors except requests .exceptions .RequestException as e :
logger_func (f"Error during request to Mega: {e }") logger_func (f"Error during request to Mega: {e }")
except Exception as e : except Exception as e :
logger_func (f"An error occurred during Mega download: {e }") logger_func (f"An error occurred during Mega download: {e }")
@@ -109,23 +109,23 @@ def download_gdrive_file(gdrive_link, download_path=".", logger_func=print):
os .makedirs (download_path ,exist_ok =True ) os .makedirs (download_path ,exist_ok =True )
logger_func (f"Starting Google Drive download to '{download_path }'...") logger_func (f"Starting Google Drive download to '{download_path }'...")
# gdown.download returns the path to the downloaded file
output_file_path =gdown .download (gdrive_link ,output =download_path ,quiet =False ,fuzzy =True ) output_file_path =gdown .download (gdrive_link ,output =download_path ,quiet =False ,fuzzy =True )
if output_file_path and os .path .exists (os .path .join (download_path ,os .path .basename (output_file_path ))): if output_file_path and os .path .exists (os .path .join (download_path ,os .path .basename (output_file_path ))):
logger_func (f"✅ Google Drive file downloaded successfully: {output_file_path }") logger_func (f"✅ Google Drive file downloaded successfully: {output_file_path }")
elif output_file_path: # gdown might return a path but if download_path was a dir, it might be just the filename elif output_file_path :
full_path_check =os .path .join (download_path ,output_file_path ) full_path_check =os .path .join (download_path ,output_file_path )
if os .path .exists (full_path_check ): if os .path .exists (full_path_check ):
logger_func (f"✅ Google Drive file downloaded successfully: {full_path_check }") logger_func (f"✅ Google Drive file downloaded successfully: {full_path_check }")
else : else :
logger_func (f"⚠️ Google Drive download finished, gdown returned '{output_file_path }', but file not found at expected location.") logger_func (f"⚠️ Google Drive download finished, gdown returned '{output_file_path }', but file not found at expected location.")
logger_func (f" Please check '{download_path }' for the downloaded file, it might have a different name than expected by gdown's return.") logger_func (f" Please check '{download_path }' for the downloaded file, it might have a different name than expected by gdown's return.")
# As a fallback, list files in dir if only one was expected
files_in_dest =[f for f in os .listdir (download_path )if os .path .isfile (os .path .join (download_path ,f ))] files_in_dest =[f for f in os .listdir (download_path )if os .path .isfile (os .path .join (download_path ,f ))]
if len (files_in_dest )==1 : if len (files_in_dest )==1 :
logger_func (f" Found one file in destination: {os .path .join (download_path ,files_in_dest [0 ])}. Assuming this is it.") logger_func (f" Found one file in destination: {os .path .join (download_path ,files_in_dest [0 ])}. Assuming this is it.")
elif len(files_in_dest) > 1 and output_file_path in files_in_dest: # if gdown returned just filename elif len (files_in_dest )>1 and output_file_path in files_in_dest :
logger_func (f" Confirmed file '{output_file_path }' exists in '{download_path }'.") logger_func (f" Confirmed file '{output_file_path }' exists in '{download_path }'.")
else : else :
raise Exception (f"gdown download failed or file not found. Returned: {output_file_path }") raise Exception (f"gdown download failed or file not found. Returned: {output_file_path }")
@@ -162,10 +162,10 @@ def download_dropbox_file(dropbox_link, download_path=".", logger_func=print):
""" """
logger_func (f"Attempting to download from Dropbox: {dropbox_link }") logger_func (f"Attempting to download from Dropbox: {dropbox_link }")
# Modify URL for direct download
parsed_url =urlparse (dropbox_link ) parsed_url =urlparse (dropbox_link )
query_params =parse_qs (parsed_url .query ) query_params =parse_qs (parsed_url .query )
query_params['dl'] = ['1'] # Set dl=1 for direct download query_params ['dl']=['1']
new_query =urlencode (query_params ,doseq =True ) new_query =urlencode (query_params ,doseq =True )
direct_download_url =urlunparse (parsed_url ._replace (query =new_query )) direct_download_url =urlunparse (parsed_url ._replace (query =new_query ))
@@ -179,7 +179,7 @@ def download_dropbox_file(dropbox_link, download_path=".", logger_func=print):
with requests .get (direct_download_url ,stream =True ,allow_redirects =True ,timeout =(10 ,300 ))as r : with requests .get (direct_download_url ,stream =True ,allow_redirects =True ,timeout =(10 ,300 ))as r :
r .raise_for_status () r .raise_for_status ()
filename =_get_filename_from_headers (r .headers )or os .path .basename (urlparse (dropbox_link ).path )or "dropbox_downloaded_file" filename =_get_filename_from_headers (r .headers )or os .path .basename (urlparse (dropbox_link ).path )or "dropbox_downloaded_file"
# Sanitize filename (basic)
filename =re .sub (r'[<>:"/\\|?*]','_',filename ) filename =re .sub (r'[<>:"/\\|?*]','_',filename )
full_save_path =os .path .join (download_path ,filename ) full_save_path =os .path .join (download_path ,filename )
logger_func (f"Starting Dropbox download of '{filename }' to '{full_save_path }'...") logger_func (f"Starting Dropbox download of '{filename }' to '{full_save_path }'...")
@@ -193,36 +193,19 @@ def download_dropbox_file(dropbox_link, download_path=".", logger_func=print):
raise raise
if __name__ =="__main__": if __name__ =="__main__":
# --- IMPORTANT ---
# Replace this with an ACTUAL PUBLIC MEGA.NZ FILE LINK to test.
# Using the specific link you provided:
mega_file_link ="https://mega.nz/file/03oRjBQT#Tcbp5sQVIyPbdmv8sLgbb9Lf9AZvZLdKRSQiuXkNW0k" mega_file_link ="https://mega.nz/file/03oRjBQT#Tcbp5sQVIyPbdmv8sLgbb9Lf9AZvZLdKRSQiuXkNW0k"
if not mega_file_link .startswith ("https://mega.nz/file/"): if not mega_file_link .startswith ("https://mega.nz/file/"):
print("Invalid Mega file link format. It should start with 'https://mega.nz/file/'.") # Or use logger_func if testing outside main print ("Invalid Mega file link format. It should start with 'https://mega.nz/file/'.")
else : else :
# Specify your desired download directory
# If you want to download to a specific folder, e.g., "MegaDownloads" in your user's Downloads folder:
# user_downloads_path = os.path.expanduser("~/Downloads")
# download_directory = os.path.join(user_downloads_path, "MegaDownloads")
# For this example, we'll download to a "mega_downloads" subfolder in the script's directory
script_dir =os .path .dirname (os .path .abspath (__file__ )) script_dir =os .path .dirname (os .path .abspath (__file__ ))
download_directory =os .path .join (script_dir ,"mega_downloads") download_directory =os .path .join (script_dir ,"mega_downloads")
print(f"Files will be downloaded to: {download_directory}") # Or use logger_func print (f"Files will be downloaded to: {download_directory }")
download_mega_file (mega_file_link ,download_directory ,logger_func =print ) download_mega_file (mega_file_link ,download_directory ,logger_func =print )
# Test Google Drive
# gdrive_test_link = "YOUR_PUBLIC_GOOGLE_DRIVE_FILE_LINK_HERE" # Replace with a real link
# if gdrive_test_link and gdrive_test_link != "YOUR_PUBLIC_GOOGLE_DRIVE_FILE_LINK_HERE":
# gdrive_download_dir = os.path.join(script_dir, "gdrive_downloads")
# print(f"\nGoogle Drive files will be downloaded to: {gdrive_download_dir}")
# download_gdrive_file(gdrive_test_link, gdrive_download_dir, logger_func=print)
#
# # Test Dropbox
# dropbox_test_link = "YOUR_PUBLIC_DROPBOX_FILE_LINK_HERE" # Replace with a real link
# if dropbox_test_link and dropbox_test_link != "YOUR_PUBLIC_DROPBOX_FILE_LINK_HERE":
# dropbox_download_dir = os.path.join(script_dir, "dropbox_downloads")
# print(f"\nDropbox files will be downloaded to: {dropbox_download_dir}")
# download_dropbox_file(dropbox_test_link, dropbox_download_dir, logger_func=print)