Skip to content

Commit

Permalink
changed to aiogoogle
Browse files Browse the repository at this point in the history
  • Loading branch information
GDjkhp committed Dec 17, 2024
1 parent 9538b8f commit 7930a19
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 147 deletions.
251 changes: 131 additions & 120 deletions api_gdrive.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
import os
import glob
import json
import mimetypes
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from aiogoogle import Aiogoogle

class DriveUploader:
class AsyncDriveUploader:
def __init__(self, credentials_path):
"""
Initialize Google Drive service
Initialize Aiogoogle with user credentials
Args:
credentials_path (str): Path to OAuth 2.0 credentials file
"""
self.creds = Credentials.from_authorized_user_file(
credentials_path,
['https://www.googleapis.com/auth/drive.file']
)
self.drive_service = build('drive', 'v3', credentials=self.creds)
with open(credentials_path, "r", encoding="utf-8") as json_file:
creds_data = json.load(json_file)
self.user = {"access_token": creds_data['token'], "refresh_token": creds_data['refresh_token']}
self.client = {"client_id": creds_data['client_id'], "client_secret": creds_data['client_secret'], "scopes": creds_data['scopes']}

def get_or_create_folder(self, folder_name, parent_folder_id=None):
async def get_or_create_folder(self, folder_name, parent_folder_id=None):
"""
Find or create a folder in Google Drive.
Expand All @@ -30,40 +28,48 @@ def get_or_create_folder(self, folder_name, parent_folder_id=None):
Returns:
str: Folder ID
"""
# Build query to find folder
query = [f"name='{folder_name}'", "mimeType='application/vnd.google-apps.folder'"]
if parent_folder_id:
query.append(f"'{parent_folder_id}' in parents")

results = self.drive_service.files().list(
q=' and '.join(query),
spaces='drive'
).execute()

folders = results.get('files', [])

# If folder exists, return its ID
if folders:
return folders[0]['id']

# If folder doesn't exist, create it
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}

# Add parent if specified
if parent_folder_id:
folder_metadata['parents'] = [parent_folder_id]

folder = self.drive_service.files().create(
body=folder_metadata,
fields='id'
).execute()

return folder['id']
async with Aiogoogle(user_creds=self.user, client_creds=self.client) as aiogoogle:
drive_v3 = await aiogoogle.discover('drive', 'v3')

# Build query to find folder
query = [f"name='{folder_name}'", "mimeType='application/vnd.google-apps.folder'"]
if parent_folder_id:
query.append(f"'{parent_folder_id}' in parents")

# Search for existing folder
results = await aiogoogle.as_user(
drive_v3.files.list(
q=' and '.join(query),
spaces='drive'
)
)

folders = results.get('files', [])

# If folder exists, return its ID
if folders:
return folders[0]['id']

# If folder doesn't exist, create it
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}

# Add parent if specified
if parent_folder_id:
folder_metadata['parents'] = [parent_folder_id]

folder = await aiogoogle.as_user(
drive_v3.files.create(
json=folder_metadata,
fields='id'
)
)

return folder['id']

def make_public_with_link(self, file_or_folder_id):
async def make_public_with_link(self, file_or_folder_id):
"""
Make a file or folder publicly accessible and get a shareable link.
Expand All @@ -73,26 +79,33 @@ def make_public_with_link(self, file_or_folder_id):
Returns:
str: Public sharing link
"""
try:
# Create a public permission
self.drive_service.permissions().create(
fileId=file_or_folder_id,
body={'type': 'anyone', 'role': 'reader'}
).execute()
async with Aiogoogle(user_creds=self.user, client_creds=self.client) as aiogoogle:
drive_v3 = await aiogoogle.discover('drive', 'v3')

# Get the file/folder details to retrieve the web view link
file_metadata = self.drive_service.files().get(
fileId=file_or_folder_id,
fields='webViewLink'
).execute()
try:
# Create a public permission
await aiogoogle.as_user(
drive_v3.permissions.create(
fileId=file_or_folder_id,
json={'type': 'anyone', 'role': 'reader'}
)
)

# Get the file/folder details to retrieve the web view link
file_metadata = await aiogoogle.as_user(
drive_v3.files.get(
fileId=file_or_folder_id,
fields='webViewLink'
)
)

return file_metadata.get('webViewLink', None)

return file_metadata.get('webViewLink', None)

except Exception as e:
print(f"Error making file/folder public: {e}")
return None
except Exception as e:
print(f"Error making file/folder public: {e}")
return None

def batch_upload(self, paths, folder_in_drive='Uploaded', make_public=False, recursive=False):
async def batch_upload(self, paths, folder_in_drive='Uploaded', make_public=False, recursive=False):
"""
Batch upload multiple files and folders.
Expand All @@ -106,7 +119,7 @@ def batch_upload(self, paths, folder_in_drive='Uploaded', make_public=False, rec
list: List of upload results
"""
# Get or create the root folder in Drive
root_folder_id = self.get_or_create_folder(folder_in_drive)
root_folder_id = await self.get_or_create_folder(folder_in_drive)

# Results storage
upload_results = []
Expand All @@ -126,7 +139,7 @@ def batch_upload(self, paths, folder_in_drive='Uploaded', make_public=False, rec
# Determine upload method based on path type
if os.path.isfile(matched_path):
# Upload single file
result = self._upload_single_file(
result = await self._upload_single_file(
matched_path,
root_folder_id,
make_public
Expand All @@ -136,14 +149,14 @@ def batch_upload(self, paths, folder_in_drive='Uploaded', make_public=False, rec
elif os.path.isdir(matched_path):
# Upload folder
if recursive:
result = self._upload_folder(
result = await self._upload_folder(
matched_path,
root_folder_id,
make_public
)
else:
# Non-recursive folder upload (only files in root)
result = self._upload_non_recursive_folder(
result = await self._upload_non_recursive_folder(
matched_path,
root_folder_id,
make_public
Expand All @@ -155,7 +168,7 @@ def batch_upload(self, paths, folder_in_drive='Uploaded', make_public=False, rec

return upload_results

def _upload_single_file(self, file_path, parent_folder_id=None, make_public=False):
async def _upload_single_file(self, file_path, parent_folder_id=None, make_public=False):
"""
Upload a single file to Google Drive.
Expand All @@ -167,54 +180,52 @@ def _upload_single_file(self, file_path, parent_folder_id=None, make_public=Fals
Returns:
dict: Information about uploaded file
"""
# Get file size
file_size = os.path.getsize(file_path)
async with Aiogoogle(user_creds=self.user, client_creds=self.client) as aiogoogle:
drive_v3 = await aiogoogle.discover('drive', 'v3')

# Get file size
file_size = os.path.getsize(file_path)

# Prepare file metadata
file_metadata = {
'name': os.path.basename(file_path)
}

# Add parent folder if specified
if parent_folder_id:
file_metadata['parents'] = [parent_folder_id]

# Detect MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = 'application/octet-stream'

# Create media upload
media = MediaFileUpload(
file_path,
mimetype=mime_type,
resumable=True
)

# Upload the file
print(file_path)
file = self.drive_service.files().create(
body=file_metadata,
media_body=media,
fields='id,webViewLink'
).execute()

# Make file public if requested
public_link = None
if make_public:
public_link = self.make_public_with_link(file['id'])

return {
'type': 'file',
'name': os.path.basename(file_path),
'full_path': file_path,
'id': file['id'],
'size_bytes': file_size,
'size_human_readable': self._format_file_size(file_size),
'link': public_link or file.get('webViewLink')
}
# Prepare file metadata
file_metadata = {
'name': os.path.basename(file_path)
}

# Add parent folder if specified
if parent_folder_id:
file_metadata['parents'] = [parent_folder_id]

# Detect MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = 'application/octet-stream'

# Upload the file
print(file_path)
file = await aiogoogle.as_user(
drive_v3.files.create(
json=file_metadata,
upload_file=file_path,
fields='id,webViewLink'
)
)

# Make file public if requested
public_link = None
if make_public:
public_link = await self.make_public_with_link(file['id'])

return {
'type': 'file',
'name': os.path.basename(file_path),
'full_path': file_path,
'id': file['id'],
'size_bytes': file_size,
'size_human_readable': self._format_file_size(file_size),
'link': public_link or file.get('webViewLink')
}

def _upload_folder(self, local_path, parent_folder_id=None, make_public=False):
async def _upload_folder(self, local_path, parent_folder_id=None, make_public=False):
"""
Recursively upload a local folder to Google Drive.
Expand All @@ -230,15 +241,15 @@ def _upload_folder(self, local_path, parent_folder_id=None, make_public=False):
folder_name = os.path.basename(local_path)

# Create the folder in Google Drive
current_folder_id = self.get_or_create_folder(
current_folder_id = await self.get_or_create_folder(
folder_name,
parent_folder_id
)

# Make folder public if requested
public_link = None
if make_public:
public_link = self.make_public_with_link(current_folder_id)
public_link = await self.make_public_with_link(current_folder_id)

# Track uploaded files
uploaded_files = []
Expand All @@ -250,15 +261,15 @@ def _upload_folder(self, local_path, parent_folder_id=None, make_public=False):

# Recursively upload files and subdirectories
if os.path.isfile(local_item_path):
file_result = self._upload_single_file(
file_result = await self._upload_single_file(
local_item_path,
current_folder_id,
make_public
)
total_size += file_result.get('size_bytes', 0)
uploaded_files.append(file_result)
elif os.path.isdir(local_item_path):
subfolder_result = self._upload_folder(
subfolder_result = await self._upload_folder(
local_item_path,
current_folder_id,
make_public
Expand All @@ -277,7 +288,7 @@ def _upload_folder(self, local_path, parent_folder_id=None, make_public=False):
'files': uploaded_files
}

def _upload_non_recursive_folder(self, local_path, parent_folder_id=None, make_public=False):
async def _upload_non_recursive_folder(self, local_path, parent_folder_id=None, make_public=False):
"""
Upload only files in the root of a folder (non-recursive).
Expand All @@ -293,15 +304,15 @@ def _upload_non_recursive_folder(self, local_path, parent_folder_id=None, make_p
folder_name = os.path.basename(local_path)

# Create the folder in Google Drive
current_folder_id = self.get_or_create_folder(
current_folder_id = await self.get_or_create_folder(
folder_name,
parent_folder_id
)

# Make folder public if requested
public_link = None
if make_public:
public_link = self.make_public_with_link(current_folder_id)
public_link = await self.make_public_with_link(current_folder_id)

# Track uploaded files
uploaded_files = []
Expand All @@ -313,7 +324,7 @@ def _upload_non_recursive_folder(self, local_path, parent_folder_id=None, make_p

# Upload only files, skip subdirectories
if os.path.isfile(local_item_path):
file_result = self._upload_single_file(
file_result = await self._upload_single_file(
local_item_path,
current_folder_id,
make_public
Expand Down
Loading

0 comments on commit 7930a19

Please sign in to comment.