Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get files method #22

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
88f731f
Initial SBG implementation
golharam Sep 4, 2024
fbee93a
Add download_file method
golharam Sep 4, 2024
fa8ede2
Return name of file downloaded
golharam Sep 4, 2024
666df31
Add flexibility to download_file
golharam Sep 4, 2024
69adb23
Add export_file method
golharam Sep 4, 2024
be0359c
Get a few file more methods to base class that need implementations a…
golharam Sep 4, 2024
e9a57a9
Merge branch 'main' into add_get_files_method
golharam Sep 9, 2024
db573d6
Implement Arvados::export_file
golharam Sep 9, 2024
8bba3e3
Add check for output_uuid
golharam Sep 12, 2024
5b8cc4d
Filling out methods to support get_files
golharam Oct 25, 2024
d4c7884
Make filter parameter on get_files default to None
golharam Oct 25, 2024
7e85336
Comment out get_file and get_file_name from base class
golharam Oct 25, 2024
90bf7fd
Fix arv to api
golharam Oct 25, 2024
81d1b52
Add debug logging statements
golharam Oct 25, 2024
a2d4f37
Append filter critera to search in get_files
golharam Oct 25, 2024
d848b38
Add code to add suffix filter criteria to filter parameter
golharam Oct 25, 2024
0ebf63e
Add code to add suffix filter criteria to filter parameter
golharam Oct 25, 2024
59f7d4a
Check for filter criteria in get_files
golharam Oct 25, 2024
900ffbf
Explicitly get file_name for logging/debugging
golharam Oct 25, 2024
8149cf2
rework filter criteria
golharam Oct 25, 2024
1a98022
remove white space
golharam Oct 25, 2024
1fbea12
Make filter a named parameter
golharam Oct 25, 2024
6523e0d
Fix call to stream_name()
golharam Oct 25, 2024
dcbb278
Add logging statements
golharam Oct 25, 2024
252cc27
Use file.name instead of file.stream_name()
golharam Oct 25, 2024
0d3791f
Fix return value
golharam Oct 25, 2024
3e7b898
Merge in main
golharam Nov 11, 2024
963998c
Add smartopen to requirements.txt
golharam Nov 11, 2024
784417c
Add add_user_to_project method
golharam Nov 12, 2024
6ae8304
Handle case-sensitivity in SBG
golharam Nov 12, 2024
a4564a8
Make user lowercase
golharam Nov 12, 2024
942a8ad
Update ArvadosPlatform::get_user
golharam Nov 12, 2024
03f2444
Fix reference to api
golharam Nov 13, 2024
cf97911
Check if filter is None before accessing
golharam Nov 13, 2024
248da09
Make arvados get_user a bit more efficient
golharam Nov 13, 2024
4c8ca8b
Group functions
golharam Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
chardet
arvados-python-client
sevenbridges-python
smart_open
161 changes: 158 additions & 3 deletions src/cwl_platform/arvados_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import chardet

import googleapiclient
import smart_open

import arvados
from .base_platform import Platform
Expand Down Expand Up @@ -70,24 +71,74 @@ def __init__(self, name):
self.keep_client = None
self.logger = logging.getLogger(__name__)

def _get_files_list_in_collection(self, collection_uuid, subdirectory_path=None):
def _find_collection_key(self, project_uuid):
"""
This is a helper function that given an collection path such that the path is of
the form: collection/key
It will return the collection and the key represented by the path, eg
if collection_path == keep:collection_uuid/path/to/file.txt
"""
if collection_path.startswith('keep:'):
collection_path = collection_path[5:]
components = collection_path.split('/')
collection_uuid = components[0]
key = ""
if len(components) > 1:
key = '/'.join(components[1:])
return collection_uuid, key

def _get_collection(self, filter):
'''
Get a list of collection objects matching filter criteria from Arvados
:param filter: List of filters to search for collection, e.g. [ ["owner_uuid", "=", project_uuid], ["name", "=", collName] ]
:return: Matching Collection objects or None
'''
search_result = self.api.collections().list(filters=filter, offset=0, limit=1000).execute()
collections = search_result['items']
while len(collections) < search_result['items_available']:
search_result = self.api.collections().list(filters=filter, offset=len(collections), limit=1000).execute()
collections += search_result['items']
return collections

def _get_files_list_in_collection(self, collection_uuid, subdirectory_path=None, filter=None):
'''
Get list of files in collection, if subdirectory_path is provided, return only files in that subdirectory.

:param collection_uuid: uuid of the collection
:param subdirectory_path: subdirectory path to filter files in the collection
:param filter: Dictionary containing filter criteria
{
'name': 'file_name',
'prefix': 'file_prefix',
'suffix': 'file_suffix',
'recursive': True/False
}
:return: list of files in the collection
'''
the_col = arvados.collection.CollectionReader(manifest_locator_or_text=collection_uuid)
file_list = the_col.all_files()
if subdirectory_path:
return [fl for fl in file_list if os.path.basename(fl.stream_name()) == subdirectory_path]
if filter:
files = []
for file in file_list:
file_name = file.name
self.logger.debug("Checking %s for filter criteria", file_name)
if 'name' in filter and filter['name'] == file.name:
files.append(file)
elif 'prefix' in filter and file.name.startswith(filter['prefix']):
files.append(file)
elif 'suffix' in filter and file.name.endswith(filter['suffix']):
files.append(file)
return files
return list(file_list)

def _load_cwl_output(self, task: ArvadosTask):
'''
Load CWL output from task
'''
if not task.container_request['output_uuid']:
return None
cwl_output_collection = arvados.collection.Collection(task.container_request['output_uuid'],
api_client=self.api,
keep_client=self.keep_client)
Expand All @@ -99,6 +150,32 @@ def _load_cwl_output(self, task: ArvadosTask):
cwl_output = json.load(cwl_output_file)
return cwl_output

def add_user_to_project(self, user, project, permission):
'''
Add a user to a project

:param user: User to add to project
:param project: Project to add user to
:param permission: Permission level to grant user
{'admin', 'write', 'read', 'manage'}
:return: None
'''
if permission == 'admin':
aPermission = 'can_manage'
elif permission == 'write':
aPermission = 'can_write'
elif permission == 'read':
aPermission = 'can_read'

self.api.links().create(body={"link": {
"link_class": "permission",
"name": aPermission,
"tail_uuid": user["uuid"],
"head_uuid": project["uuid"]
}
}
).execute()

def connect(self, **kwargs):
''' Connect to Arvados '''
self.api = arvados.api_from_config(version='v1', apiconfig=self.api_config)
Expand Down Expand Up @@ -268,6 +345,36 @@ def detect(cls):
return True
return False

def download_file(self, file, dest_folder):
'''
Download a file to a local directory

:param file: File to download e.g. keep:asdf-asdf-asdf/some/file.txt
:param dest_folder: Destination folder to download file to
:return: Name of local file downloaded or None
'''
raise NotImplementedError("Method not implemented")

def export_file(self, file, bucket_name, prefix):
'''
Use platform specific functionality to copy a file from a platform to an S3 bucket.

:param file: File to export, keep:<collection_uuid>/<file_path>
:param bucket_name: S3 bucket name
:param prefix: Destination S3 folder to export file to, path/to/folder
:return: s3 file path or None
'''
collection_uuid, key = self._find_collection_key(file)
c = arvados.collection.CollectionReader(manifest_locator_or_text=collection_uuid, api_client=self.api)
# If the file is in the keep collection
if key in c:
with c.open(key, "rb") as reader:
with smart_open.smart_open(f"s3://{bucket_name}/{prefix}/{key}", "wb") as writer:
content = reader.read(128*1024)
while content:
writer.write(content)
content = reader.read(128*1024)

def get_current_task(self) -> ArvadosTask:
'''
Get the current task
Expand Down Expand Up @@ -318,6 +425,38 @@ def get_file_id(self, project, file_path):
# for the file. Lets see if this comes up before implementing it.
return f"keep:{collection['portable_data_hash']}/{'/'.join(folder_tree[1:])}"

# @override
def get_files(self, project, filter=None):
'''
Retrieve files in a project matching the filter criteria.

:param project: Project to search for files
:param filter: Dictionary containing filter criteria
{
'name': 'file_name',
'prefix': 'file_prefix',
'suffix': 'file_suffix',
'folder': 'folder_name', # Here, a root folder is the name of the collection.
'recursive': True/False
}
:return: List of file objects matching filter criteria
'''
# Iterate over all collections and find files matching filter criteria.
collection_filter = [
["owner_uuid", "=", project["uuid"]]
]
if filter and 'folder' in filter:
collection_filter['name'] = filter['folder']
self.logger.debug("Fetching list of collections matching filter, %s, in project %s", collection_filter, project["uuid"])
collections = self._get_collection(collection_filter)

files = []
for num, collection in enumerate(collections):
self.logger.debug("[%d/%d] Fetching list of files in collection %s", num+1, len(collections), collection["uuid"])
files += self._get_files_list_in_collection(collection['uuid'], filter=filter)
self.logger.debug("Return list of %d files", len(files))
return files

def get_folder_id(self, project, folder_path):
'''
Get the folder id in a project
Expand Down Expand Up @@ -471,12 +610,28 @@ def get_user(self, user):
:param user: BMS user id or email address
:return: User object or None
"""

# Implementation 1: Get the full list of users and scan for users with matching username/email
# Get the full list of users
#user = user.lower()
#search_result = self.api.users().list(offset=0, limit=1000).execute()
#users = search_result['items']
#while len(users) < search_result['items_available']:
# search_result = self.api.users().list(offset=len(users), limit=1000).execute()
# users += search_result['items']

#for platform_user in users:
# if platform_user['email'].lower() == user or platform_user['username'].lower() == user:
# return platform_user

# Implementation 2: Search for user with matching username/email
if '@' in user:
user_resp = self.api.users().list(filters=[["email","=",user]]).execute()
user_resp = self.api.users().list(filters=[["email","ilike",user]]).execute()
else:
user_resp = self.api.users().list(filters=[["username","=",user]]).execute()
user_resp = self.api.users().list(filters=[["username","ilike",user]]).execute()
if len(user_resp['items']) > 0:
return user_resp["items"][0]

return None

def rename_file(self, fileid, new_filename):
Expand Down
63 changes: 63 additions & 0 deletions src/cwl_platform/base_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,61 @@ def __init__(self, name):
self.connected = False

# File methods
# TODO: These file objects should be abstracted to a common File object interface
@abstractmethod
def copy_folder(self, source_project, source_folder, destination_project):
''' Copy source folder to destination project '''

@abstractmethod
def download_file(self, file, dest_folder):
'''
Download a file to a local directory

:param fileid: File to download
:param dest_folder: Destination folder to download file to
:return: Name of local file downloaded or None
'''

@abstractmethod
def export_file(self, file, bucket_name, prefix):
'''
Use platform specific functionality to copy a file from a platform to an S3 bucket.

:param file: File to export
:param bucket_name: S3 bucket name
:param prefix: Destination S3 folder to export file to, path/to/folder
:return: s3 file path or None
'''

#@abstractmethod
#def get_file(self, file_id):
# ''' Get a file by its id '''

#@abstractmethod
#def get_file_name(self, file):
# ''' Get a file name by its object or id '''

@abstractmethod
def get_files(self, project, filter=None):
'''
Retrieve files in a project matching the filter criteria

:param project: Project to search for files
:param filter: Dictionary containing filter criteria
{
'name': 'file_name',
'prefix': 'file_prefix',
'suffix': 'file_suffix',
'folder': 'folder_name',
'recursive': True/False
}
:return: List of file objects matching filter criteria
'''

@abstractmethod
def get_folder_id(self, project, folder_path):
''' Get a folder id by its full path name '''

@abstractmethod
def get_file_id(self, project, file_path):
''' Get a file id by its full path name '''
Expand Down Expand Up @@ -63,6 +114,18 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil
'''

# Project methods
@abstractmethod
def add_user_to_project(self, user, project, permission):
'''
Add a user to a project

:param user: User to add to project
:param project: Project to add user to
:param permission: Permission level to grant user
{'admin', 'write', 'read', 'manage'}
:return: None
'''

@abstractmethod
def create_project(self, project_name, project_description, **kwargs):
'''
Expand Down
Loading
Loading