diff --git a/requirements.txt b/requirements.txt index 5099bf4..efd48d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ chardet arvados-python-client sevenbridges-python +smart_open diff --git a/src/cwl_platform/arvados_platform.py b/src/cwl_platform/arvados_platform.py index e995f2d..8308d99 100644 --- a/src/cwl_platform/arvados_platform.py +++ b/src/cwl_platform/arvados_platform.py @@ -11,6 +11,7 @@ import chardet import googleapiclient +import smart_open import arvados from .base_platform import Platform @@ -70,24 +71,74 @@ def __init__(self, name): self.keep_client = None self.logger = logging.getLogger(__name__) - def _get_files_list_in_collection(self, collection_uuid, subdirectory_path=None): + def _find_collection_key(self, project_uuid): + """ + This is a helper function that given an collection path such that the path is of + the form: collection/key + It will return the collection and the key represented by the path, eg + if collection_path == keep:collection_uuid/path/to/file.txt + """ + if collection_path.startswith('keep:'): + collection_path = collection_path[5:] + components = collection_path.split('/') + collection_uuid = components[0] + key = "" + if len(components) > 1: + key = '/'.join(components[1:]) + return collection_uuid, key + + def _get_collection(self, filter): + ''' + Get a list of collection objects matching filter criteria from Arvados + :param filter: List of filters to search for collection, e.g. [ ["owner_uuid", "=", project_uuid], ["name", "=", collName] ] + :return: Matching Collection objects or None + ''' + search_result = self.api.collections().list(filters=filter, offset=0, limit=1000).execute() + collections = search_result['items'] + while len(collections) < search_result['items_available']: + search_result = self.api.collections().list(filters=filter, offset=len(collections), limit=1000).execute() + collections += search_result['items'] + return collections + + def _get_files_list_in_collection(self, collection_uuid, subdirectory_path=None, filter=None): ''' Get list of files in collection, if subdirectory_path is provided, return only files in that subdirectory. :param collection_uuid: uuid of the collection :param subdirectory_path: subdirectory path to filter files in the collection + :param filter: Dictionary containing filter criteria + { + 'name': 'file_name', + 'prefix': 'file_prefix', + 'suffix': 'file_suffix', + 'recursive': True/False + } :return: list of files in the collection ''' the_col = arvados.collection.CollectionReader(manifest_locator_or_text=collection_uuid) file_list = the_col.all_files() if subdirectory_path: return [fl for fl in file_list if os.path.basename(fl.stream_name()) == subdirectory_path] + if filter: + files = [] + for file in file_list: + file_name = file.name + self.logger.debug("Checking %s for filter criteria", file_name) + if 'name' in filter and filter['name'] == file.name: + files.append(file) + elif 'prefix' in filter and file.name.startswith(filter['prefix']): + files.append(file) + elif 'suffix' in filter and file.name.endswith(filter['suffix']): + files.append(file) + return files return list(file_list) def _load_cwl_output(self, task: ArvadosTask): ''' Load CWL output from task ''' + if not task.container_request['output_uuid']: + return None cwl_output_collection = arvados.collection.Collection(task.container_request['output_uuid'], api_client=self.api, keep_client=self.keep_client) @@ -99,6 +150,32 @@ def _load_cwl_output(self, task: ArvadosTask): cwl_output = json.load(cwl_output_file) return cwl_output + def add_user_to_project(self, user, project, permission): + ''' + Add a user to a project + + :param user: User to add to project + :param project: Project to add user to + :param permission: Permission level to grant user + {'admin', 'write', 'read', 'manage'} + :return: None + ''' + if permission == 'admin': + aPermission = 'can_manage' + elif permission == 'write': + aPermission = 'can_write' + elif permission == 'read': + aPermission = 'can_read' + + self.api.links().create(body={"link": { + "link_class": "permission", + "name": aPermission, + "tail_uuid": user["uuid"], + "head_uuid": project["uuid"] + } + } + ).execute() + def connect(self, **kwargs): ''' Connect to Arvados ''' self.api = arvados.api_from_config(version='v1', apiconfig=self.api_config) @@ -268,6 +345,36 @@ def detect(cls): return True return False + def download_file(self, file, dest_folder): + ''' + Download a file to a local directory + + :param file: File to download e.g. keep:asdf-asdf-asdf/some/file.txt + :param dest_folder: Destination folder to download file to + :return: Name of local file downloaded or None + ''' + raise NotImplementedError("Method not implemented") + + def export_file(self, file, bucket_name, prefix): + ''' + Use platform specific functionality to copy a file from a platform to an S3 bucket. + + :param file: File to export, keep:/ + :param bucket_name: S3 bucket name + :param prefix: Destination S3 folder to export file to, path/to/folder + :return: s3 file path or None + ''' + collection_uuid, key = self._find_collection_key(file) + c = arvados.collection.CollectionReader(manifest_locator_or_text=collection_uuid, api_client=self.api) + # If the file is in the keep collection + if key in c: + with c.open(key, "rb") as reader: + with smart_open.smart_open(f"s3://{bucket_name}/{prefix}/{key}", "wb") as writer: + content = reader.read(128*1024) + while content: + writer.write(content) + content = reader.read(128*1024) + def get_current_task(self) -> ArvadosTask: ''' Get the current task @@ -318,6 +425,38 @@ def get_file_id(self, project, file_path): # for the file. Lets see if this comes up before implementing it. return f"keep:{collection['portable_data_hash']}/{'/'.join(folder_tree[1:])}" + # @override + def get_files(self, project, filter=None): + ''' + Retrieve files in a project matching the filter criteria. + + :param project: Project to search for files + :param filter: Dictionary containing filter criteria + { + 'name': 'file_name', + 'prefix': 'file_prefix', + 'suffix': 'file_suffix', + 'folder': 'folder_name', # Here, a root folder is the name of the collection. + 'recursive': True/False + } + :return: List of file objects matching filter criteria + ''' + # Iterate over all collections and find files matching filter criteria. + collection_filter = [ + ["owner_uuid", "=", project["uuid"]] + ] + if filter and 'folder' in filter: + collection_filter['name'] = filter['folder'] + self.logger.debug("Fetching list of collections matching filter, %s, in project %s", collection_filter, project["uuid"]) + collections = self._get_collection(collection_filter) + + files = [] + for num, collection in enumerate(collections): + self.logger.debug("[%d/%d] Fetching list of files in collection %s", num+1, len(collections), collection["uuid"]) + files += self._get_files_list_in_collection(collection['uuid'], filter=filter) + self.logger.debug("Return list of %d files", len(files)) + return files + def get_folder_id(self, project, folder_path): ''' Get the folder id in a project @@ -471,12 +610,28 @@ def get_user(self, user): :param user: BMS user id or email address :return: User object or None """ + + # Implementation 1: Get the full list of users and scan for users with matching username/email + # Get the full list of users + #user = user.lower() + #search_result = self.api.users().list(offset=0, limit=1000).execute() + #users = search_result['items'] + #while len(users) < search_result['items_available']: + # search_result = self.api.users().list(offset=len(users), limit=1000).execute() + # users += search_result['items'] + + #for platform_user in users: + # if platform_user['email'].lower() == user or platform_user['username'].lower() == user: + # return platform_user + + # Implementation 2: Search for user with matching username/email if '@' in user: - user_resp = self.api.users().list(filters=[["email","=",user]]).execute() + user_resp = self.api.users().list(filters=[["email","ilike",user]]).execute() else: - user_resp = self.api.users().list(filters=[["username","=",user]]).execute() + user_resp = self.api.users().list(filters=[["username","ilike",user]]).execute() if len(user_resp['items']) > 0: return user_resp["items"][0] + return None def rename_file(self, fileid, new_filename): diff --git a/src/cwl_platform/base_platform.py b/src/cwl_platform/base_platform.py index e2dd485..bf3806a 100644 --- a/src/cwl_platform/base_platform.py +++ b/src/cwl_platform/base_platform.py @@ -10,10 +10,61 @@ def __init__(self, name): self.connected = False # File methods + # TODO: These file objects should be abstracted to a common File object interface @abstractmethod def copy_folder(self, source_project, source_folder, destination_project): ''' Copy source folder to destination project ''' + @abstractmethod + def download_file(self, file, dest_folder): + ''' + Download a file to a local directory + + :param fileid: File to download + :param dest_folder: Destination folder to download file to + :return: Name of local file downloaded or None + ''' + + @abstractmethod + def export_file(self, file, bucket_name, prefix): + ''' + Use platform specific functionality to copy a file from a platform to an S3 bucket. + + :param file: File to export + :param bucket_name: S3 bucket name + :param prefix: Destination S3 folder to export file to, path/to/folder + :return: s3 file path or None + ''' + + #@abstractmethod + #def get_file(self, file_id): + # ''' Get a file by its id ''' + + #@abstractmethod + #def get_file_name(self, file): + # ''' Get a file name by its object or id ''' + + @abstractmethod + def get_files(self, project, filter=None): + ''' + Retrieve files in a project matching the filter criteria + + :param project: Project to search for files + :param filter: Dictionary containing filter criteria + { + 'name': 'file_name', + 'prefix': 'file_prefix', + 'suffix': 'file_suffix', + 'folder': 'folder_name', + 'recursive': True/False + } + :return: List of file objects matching filter criteria + ''' + + @abstractmethod + def get_folder_id(self, project, folder_path): + ''' Get a folder id by its full path name ''' + @abstractmethod def get_file_id(self, project, file_path): ''' Get a file id by its full path name ''' @@ -63,6 +114,18 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil ''' # Project methods + @abstractmethod + def add_user_to_project(self, user, project, permission): + ''' + Add a user to a project + + :param user: User to add to project + :param project: Project to add user to + :param permission: Permission level to grant user + {'admin', 'write', 'read', 'manage'} + :return: None + ''' + @abstractmethod def create_project(self, project_name, project_description, **kwargs): ''' diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index 773f6aa..5429d27 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -4,6 +4,7 @@ import os import logging import sevenbridges +from sevenbridges.errors import SbgError from sevenbridges.http.error_handlers import rate_limit_sleeper, maintenance_sleeper, general_error_sleeper from .base_platform import Platform @@ -193,6 +194,54 @@ def _list_files_in_folder(self, project=None, folder=None, recursive=False): file_list = self.api.files.get(id=parent.id).list_files().all() return file_list + def add_user_to_project(self, user, project, permission): + ''' + Add a user to a project + + :param user: User to add to project + :param project: Project to add user to + :param permission: Permission level to grant user + {'admin', 'write', 'read', 'manage'} + :return: None + ''' + if permission == 'write': + user_permissions = { + 'write': True, + 'read': True, + 'copy': True, + 'execute': True, + 'admin': False + } + elif permission == 'manage': + user_permissions = { + 'write': True, + 'read': True, + 'copy': True, + 'execute': True, + 'admin': False + } + elif permission == 'admin': + user_permissions = { + 'write': True, + 'read': True, + 'copy': True, + 'execute': True, + 'admin': True + } + else: + user_permissions = { + 'write': False, + 'read': True, + 'copy': False, + 'execute': False, + 'admin': False + } + + try: + project.add_member(user=user, permissions=user_permissions) + except SbgError as err: + self.logger.error("%s\nUnable to add %s as member", err.message, user) + def connect(self, **kwargs): ''' Connect to Sevenbridges ''' self.api_endpoint = kwargs.get('api_endpoint', self.api_endpoint) @@ -277,6 +326,53 @@ def copy_workflows(self, reference_project, destination_project): destination_workflows.append(workflow.copy(project=destination_project.id)) return destination_workflows + def download_file(self, file, dest_folder): + ''' + Download a file to a local directory + + :param file: SevenBridges file id (or object) to download + :param dest_folder: Destination folder to download file to + :return: Name of local file downloaded or None + ''' + # If file is a str, assume its a file id, else is a file object + if type(file) == str: + file = self.api.files.get(id=file) + file_name = f'{dest_folder}/{file.name}' + file.download(file_name) + return file_name + + def export_file(self, file, bucket_name, prefix): + ''' + Use platform specific functionality to copy a file from a platform to an S3 bucket. + + :param file: File to export + :param bucket_name: S3 bucket name + :param prefix: Destination S3 folder to export file to, path/to/folder + :return: Export job of file + + For SevenBridges, there are two differences from the expected base implementation: + 1. the bucket name is translated to a volume name, replacing all dashes with underscores. + 2. the return value is the export job object, not the S3 file path. + ''' + # If file is a str, assume its a file id, else is a file object + if type(file) == str: + file = self.api.files.get(id=file) + + volume = None + volume_name = bucket_name.replace('-', '_') + for v in self.api.volumes.query().all(): + if v.name == volume_name: + volume = v + break + if not volume: + self.logger.error("Volume %s not found", volume_name) + return None + + export = self.api.exports.submit_export(file=file, + volume=volume, + location=f'{prefix}/{file.name}', + copy_only=True) + return export def create_project(self, project_name, project_description, **kwargs): ''' Create a project @@ -350,6 +446,39 @@ def get_file_id(self, project, file_path): raise ValueError(f"File not found in specified folder: {file_path}") + def get_files(self, project, filter=None): + ''' + Retrieve files in a project matching the filter criteria + + :param project: Project to search for files + :param filter: Dictionary containing filter criteria + { + 'name': 'file_name', + 'prefix': 'file_prefix', + 'suffix': 'file_suffix', + 'folder': 'folder_name', + 'recursive': True/False + } + :return: List of file objects matching filter criteria + ''' + matching_files = [] + + files = self.api.files.query(project=project, limit=100).all() + for file in files: + if filter.get('name') and file.name != filter['name']: + continue + if filter.get('prefix') and not file.name.startswith(filter['prefix']): + continue + if filter.get('suffix') and not file.name.endswith(filter['suffix']): + continue + if filter.get('folder') and not file.parent.name == filter['folder']: + continue + if filter.get('recursive') and file.type == 'folder': + matching_files.extend(self._list_all_files(files=[file])) + else: + matching_files.append(file) + return matching_files + def get_folder_id(self, project, folder_path): ''' Get the folder id in a project @@ -464,11 +593,13 @@ def get_user(self, user): :param user: BMS user id or email address :return: User object or None """ + # TBD: Is there a better way to query for a user? + user = user.lower() divisions = self.api.divisions.query().all() for division in divisions: platform_users = self.api.users.query(division=division, limit=500).all() for platform_user in platform_users: - if user in platform_user.username or platform_user.email == user: + if user in platform_user.username.lower() or platform_user.email.lower() == user: return platform_user return None