diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 33e781be0..5bf64a441 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -23,65 +23,40 @@ def __init__(self, data: dict): self.tags = data.get("description", None) self.createdAt = data.get("createTime", None) self.updatedAt = data.get("updateTime", None) - self.size = _get_size(data.get("fileSize",None)) + self.size = _get_size(data.get("fileSize", None)) self.tag_list = data.get("tags", None) self.download_url = data.get("fileUrl", None) + class DataType(Enum): FILE = "file" DIRECTORY = "directory" INVALID = "invalid" + # Todo (alaydshah): Store service name in metadata # Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command # Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later. def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc, - metadata) -> FedMLResponse: - api_key = authenticate(api_key) - + metadata, encrypted_api_key_flag=False, byte_data=None) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - data_type = _get_data_type(data_path) - - if(data_type == DataType.INVALID): - return FedMLResponse(code=ResponseCode.FAILURE,message="Invalid data path") + if byte_data: + file_uploaded_url, message, file_size = _upload_bytes(api_key, user_id, name, show_progress, + out_progress_to_err, progress_desc, metadata, + byte_data, encrypted_api_key_flag=encrypted_api_key_flag) - if(data_type == DataType.DIRECTORY): - to_upload_path, message = _archive_data(data_path) - name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name - file_name = name + ".zip" else: - to_upload_path = data_path - base_name = os.path.basename(to_upload_path) - file_extension = os.path.splitext(base_name)[1] - given_extension = None - if name is not None: - given_extension = os.path.splitext(name)[1] - if given_extension is None or given_extension == "": - name = name + file_extension - else: - name = base_name - - file_name = name + file_uploaded_url, message, file_size, name = _upload_file(api_key, user_id, name, data_path, show_progress, + out_progress_to_err, progress_desc, metadata, + encrypted_api_key_flag=encrypted_api_key_flag) - if not to_upload_path: - return FedMLResponse(code=ResponseCode.FAILURE, message=message) - - #TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. - dest_path = os.path.join(user_id, file_name) - file_size = os.path.getsize(to_upload_path) - - file_uploaded_url, message = _upload_multipart(api_key, dest_path, to_upload_path, show_progress, - out_progress_to_err, - progress_desc, metadata) - - if(data_type == "dir"): - os.remove(to_upload_path) if not file_uploaded_url: - return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {to_upload_path}") + return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {data_path}") json_data = { "datasetName": name, @@ -92,7 +67,8 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre } try: - response = _create_dataset(api_key=api_key, json_data=json_data) + response = _create_dataset(api_key=api_key, json_data=json_data, encrypted_api_key_flag=encrypted_api_key_flag) + print("create dataset ", response) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}") @@ -103,14 +79,14 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre # Todo(alaydshah): Query service from object metadata -def download(data_name, api_key, service, dest_path, show_progress=True) -> FedMLResponse: - api_key = authenticate(api_key) +def download(data_name, api_key, service, dest_path, show_progress=True, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - metadata_response = get_metadata(data_name, api_key) + metadata_response = get_metadata(data_name, api_key, encrypted_api_key_flag=encrypted_api_key_flag) if metadata_response.code == ResponseCode.SUCCESS: metadata = metadata_response.data if not metadata or not isinstance(metadata, StorageMetadata): @@ -120,7 +96,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM download_url = metadata.download_url given_extension = os.path.splitext(data_name)[1] is_file = True - if(given_extension is None or given_extension ==""): + if (given_extension is None or given_extension == ""): is_file = False if not is_file: @@ -137,7 +113,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM else: if not os.path.exists(dest_path): os.makedirs(dest_path) - shutil.move(path_local,dest_path) + shutil.move(path_local, dest_path) abs_dest_path = os.path.abspath(dest_path) return FedMLResponse(code=ResponseCode.SUCCESS, message=f"Successfully downloaded and unzipped data at " f"{abs_dest_path}", data=abs_dest_path) @@ -156,8 +132,8 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM return FedMLResponse(code=ResponseCode.FAILURE, message=error_message) -def get_user_metadata(data_name, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def get_user_metadata(data_name, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: @@ -176,11 +152,12 @@ def get_user_metadata(data_name, api_key=None) -> FedMLResponse: return FedMLResponse(code=ResponseCode.SUCCESS, message=message, data=data) -def get_metadata(data_name, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def get_metadata(data_name, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) try: - response = _get_dataset_metadata(api_key=api_key, data_name=data_name) + response = _get_dataset_metadata(api_key=api_key, data_name=data_name, + encrypted_api_key_flag=encrypted_api_key_flag) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to get metadata of '{data_name}' with " @@ -197,10 +174,10 @@ def get_metadata(data_name, api_key=None) -> FedMLResponse: return FedMLResponse(code=code, message=message) -def list_objects(api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def list_objects(api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) try: - response = _list_dataset(api_key=api_key) + response = _list_dataset(api_key=api_key, encrypted_api_key_flag=encrypted_api_key_flag) except Exception as e: message = f"Failed to list stored objects for account linked with api_key {api_key} with exception {e}" logging.error(message) @@ -218,8 +195,8 @@ def list_objects(api_key=None) -> FedMLResponse: # Todo(alaydshah): Query service from object metadata. Make the transaction atomic or rollback if partially failed -def delete(data_name, service, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def delete(data_name, service, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: @@ -232,7 +209,8 @@ def delete(data_name, service, api_key=None) -> FedMLResponse: if result: logging.info(f"Successfully deleted object from storage service.") try: - response = _delete_dataset(api_key=api_key, data_name=data_name) + response = _delete_dataset(api_key=api_key, data_name=data_name, + encrypted_api_key_flag=encrypted_api_key_flag) code, message, data = _get_data_from_response(message="Failed to delete data", response=response) except Exception as e: message = (f"Deleted object from storage service but failed to delete object metadata from Nexus Backend " @@ -249,6 +227,7 @@ def delete(data_name, service, api_key=None) -> FedMLResponse: logging.error(message, data_name, service) return FedMLResponse(code=ResponseCode.FAILURE, message=message, data=False) + def _get_num_chunks(file_size, max_chunk_size): num_chunks = math.ceil(file_size / max_chunk_size) return num_chunks @@ -263,10 +242,19 @@ def get_chunks(file_path, chunk_size): yield chunk -def _get_presigned_url(api_key, request_url, file_name, part_number=None): +def get_chunks_from_byte_data(byte_data, chunk_size): + while True: + chunk = byte_data.read(chunk_size) + if not chunk: + break + yield chunk + + +def _get_presigned_url(api_key, request_url, file_name, part_number=None, encrypted_api_key_flag=False): cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) params_dict = {'fileKey': file_name} if part_number is not None: params_dict['partNumber'] = part_number @@ -282,26 +270,27 @@ def _get_presigned_url(api_key, request_url, file_name, part_number=None): return response -def _upload_part(url,part_data,session): - response = session.put(url,data=part_data,verify=True) +def _upload_part(url, part_data, session): + response = session.put(url, data=part_data, verify=True) return response -def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None): +def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20, session=None): for retry_attempt in range(max_retries): try: - response = _upload_part(presigned_url,chunk,session) + response = _upload_part(presigned_url, chunk, session) except requests.exceptions.RequestException as e: if retry_attempt < max_retries: continue else: raise requests.exceptions.RequestException - if(pbar is not None): - pbar.update(chunk.__sizeof__()) + if pbar is not None: + pbar.update(len(chunk)) return {'etag': response.headers['ETag'], 'partNumber': part} raise requests.exceptions.RequestException + def _process_post_response(response): if response.status_code != 200: message = (f"Failed to complete multipart upload with status code = {response.status_code}, " @@ -322,13 +311,14 @@ def _process_post_response(response): return data_url, "Successfully uploaded the data! " -def _complete_multipart_upload(api_key, file_key, part_info, upload_id): +def _complete_multipart_upload(api_key, file_key, part_info, upload_id, encrypted_api_key_flag=False): complete_multipart_url = ServerConstants.get_complete_multipart_upload_url() body_dict = {"fileKey": file_key, 'partETags': part_info, 'uploadId': upload_id} cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is None: try: requests.session().verify = cert_path @@ -345,20 +335,18 @@ def _complete_multipart_upload(api_key, file_key, part_info, upload_id): return _process_post_response(complete_multipart_response) -def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_progress_to_err, - progress_desc_text, metadata): +def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, show_progress, + out_progress_to_err, + progress_desc_text, chunks, encrypted_api_key_flag=False): request_url = ServerConstants.get_presigned_multi_part_url() - file_size = os.path.getsize(archive_path) - - max_chunk_size = 20 * 1024 * 1024 - num_chunks = _get_num_chunks(file_size, max_chunk_size) upload_id = "" presigned_urls = [] - presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks) + presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks, + encrypted_api_key_flag=encrypted_api_key_flag) if presigned_url_response.status_code != 200: message = (f"Failed to get presigned URL with status code = {presigned_url_response.status_code}, " @@ -379,8 +367,6 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p upload_id = data['uploadId'] presigned_urls = data['urls'] - parts = [] - chunks = get_chunks(archive_path, max_chunk_size) part_info = [] chunk_count = 0 successful_chunks = 0 @@ -396,7 +382,7 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p if show_progress: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session) + pbar=pbar, session=atomic_session) part_info.append(part_data) successful_chunks += 1 except Exception as e: @@ -405,18 +391,87 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p else: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session) + pbar=pbar, session=atomic_session) part_info.append(part_data) successful_chunks += 1 except Exception as e: return None, "unsuccessful" if successful_chunks == chunk_count: - return _complete_multipart_upload(api_key, file_key, part_info, upload_id) + return _complete_multipart_upload(api_key, file_key, part_info, upload_id, + encrypted_api_key_flag=encrypted_api_key_flag) else: return None, "Unsuccessful!" +def _upload_bytes(api_key, user_id, file_name, + show_progress, out_progress_to_err, progress_desc, metadata, byte_data, encrypted_api_key_flag=False): + if file_name is None: + return None, "name cannot be None" + + dest_path = os.path.join(user_id, file_name) + max_chunk_size = 20 * 1024 * 1024 + + file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) + + byte_data.seek(0) + chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, + show_progress, + out_progress_to_err, + progress_desc, chunks, encrypted_api_key_flag=encrypted_api_key_flag) + + return file_uploaded_url, message, file_size + + +def _upload_file(api_key, user_id, name, data_path, show_progress, out_progress_to_err, progress_desc, + metadata, encrypted_api_key_flag=False): + data_type = _get_data_type(data_path) + message = "" + + if data_type == DataType.INVALID: + return FedMLResponse(code=ResponseCode.FAILURE, message="Invalid data path") + + if data_type == DataType.DIRECTORY: + to_upload_path, message = _archive_data(data_path) + name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name + file_name = name + ".zip" + else: + to_upload_path = data_path + base_name = os.path.basename(to_upload_path) + file_extension = os.path.splitext(base_name)[1] + if name is not None: + given_extension = os.path.splitext(name)[1] + if given_extension is None or given_extension == "": + name = name + file_extension + else: + name = base_name + + file_name = name + + if not to_upload_path: + return FedMLResponse(code=ResponseCode.FAILURE, message=message) + + # TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. + dest_path = os.path.join(user_id, file_name) + max_chunk_size = 20 * 1024 * 1024 + + file_size = os.path.getsize(to_upload_path) + + chunks = get_chunks(to_upload_path, max_chunk_size) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, + show_progress, + out_progress_to_err, + progress_desc, chunks, encrypted_api_key_flag=encrypted_api_key_flag) + + if data_type == DataType.DIRECTORY: + os.remove(to_upload_path) + + return file_uploaded_url, message, file_size, name + + def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progress=True): download_response = requests.get(url, verify=True, stream=True) if download_response.status_code == 200: @@ -439,6 +494,7 @@ def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progr return True return False + def _get_user_id_from_api_key(api_key: str) -> (str, str): user_url = ServerConstants.get_user_url() json_data = { @@ -474,6 +530,7 @@ def _get_storage_service(service): else: raise NotImplementedError(f"Service {service} not implemented") + def _get_data_type(data_path): if os.path.isdir(data_path): return DataType.DIRECTORY @@ -497,11 +554,12 @@ def _archive_data(data_path: str) -> (str, str): return None, f"Error archiving data: {e}" -def _create_dataset(api_key: str, json_data: dict) -> requests.Response: +def _create_dataset(api_key: str, json_data: dict, encrypted_api_key_flag=False) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -520,11 +578,12 @@ def _create_dataset(api_key: str, json_data: dict) -> requests.Response: return response -def _list_dataset(api_key: str) -> requests.Response: +def _list_dataset(api_key: str, encrypted_api_key_flag=False) -> requests.Response: list_dataset_url = ServerConstants.list_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is None: try: requests.session().verify = cert_path @@ -537,11 +596,12 @@ def _list_dataset(api_key: str) -> requests.Response: return response -def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response: +def _get_dataset_metadata(api_key: str, data_name: str, encrypted_api_key_flag=False) -> requests.Response: dataset_metadata_url = ServerConstants.get_dataset_metadata_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -563,11 +623,12 @@ def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response: return response -def _delete_dataset(api_key: str, data_name: str) -> requests.Response: +def _delete_dataset(api_key: str, data_name: str, encrypted_api_key_flag=False) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -604,19 +665,20 @@ def _get_data_from_response(message: str, response: requests.Response) -> (Respo return ResponseCode.SUCCESS, "Successfully parsed data from response", data -def _get_size(size_in_bytes:str)->str: + +def _get_size(size_in_bytes: str) -> str: size_str = "" - if(size_in_bytes): + if (size_in_bytes): size = int(size_in_bytes) size_in_gb = size / (1024 * 1024 * 1024) size_in_mb = size / (1024 * 1024) size_in_kb = size / 1024 - if(size_in_gb >= 1): + if (size_in_gb >= 1): size_str = f"{size_in_gb:.2f} GB" - elif(size_in_mb >= 1): + elif (size_in_mb >= 1): size_str = f"{size_in_mb:.2f} MB" - elif(size_in_kb >= 1): + elif (size_in_kb >= 1): size_str = f"{size_in_kb:.2f} KB" else: size_str = f"{size} B" - return size_str \ No newline at end of file + return size_str diff --git a/python/fedml/api/modules/utils.py b/python/fedml/api/modules/utils.py index 76801ffe8..d1d555d0f 100644 --- a/python/fedml/api/modules/utils.py +++ b/python/fedml/api/modules/utils.py @@ -9,15 +9,15 @@ FEDML_MLOPS_BUILD_PRE_IGNORE_LIST = 'dist-packages,client-package.zip,server-package.zip,__pycache__,*.pyc,*.git' -def fedml_login(api_key): - api_key_is_valid, api_key = _check_api_key(api_key=api_key) +def fedml_login(api_key, encrypted_api_key_flag=False): + api_key_is_valid, api_key = _check_api_key(api_key=api_key, encrypted_api_key_flag=encrypted_api_key_flag) if api_key_is_valid: return 0, api_key return -1, api_key -def _check_api_key(api_key=None): +def _check_api_key(api_key=None, encrypted_api_key_flag=False): if api_key is None or api_key == "": saved_api_key = get_api_key() if saved_api_key is None or saved_api_key == "": @@ -25,7 +25,7 @@ def _check_api_key(api_key=None): else: api_key = saved_api_key - is_valid_heartbeat = FedMLResourceManager.get_instance().check_heartbeat(api_key) + is_valid_heartbeat = FedMLResourceManager.get_instance().check_heartbeat(api_key, encrypted_api_key_flag) if not is_valid_heartbeat: return False, api_key else: @@ -33,9 +33,9 @@ def _check_api_key(api_key=None): return True, api_key -def authenticate(api_key): +def authenticate(api_key, encrypted_api_key_flag=False): - error_code, api_key = fedml_login(api_key) + error_code, api_key = fedml_login(api_key, encrypted_api_key_flag) # Exit if not able to authenticate successfully if error_code: diff --git a/python/fedml/computing/scheduler/master/server_constants.py b/python/fedml/computing/scheduler/master/server_constants.py index ebd8b2aef..2c59fbedf 100644 --- a/python/fedml/computing/scheduler/master/server_constants.py +++ b/python/fedml/computing/scheduler/master/server_constants.py @@ -251,7 +251,7 @@ def get_user_url(): @staticmethod def get_dataset_url(): - create_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset".format( + create_dataset_url = "{}/system/api/v1/cli/storage".format( ServerConstants.get_mlops_url()) return create_dataset_url @@ -271,13 +271,13 @@ def get_complete_multipart_upload_url(): @staticmethod def list_dataset_url(): - list_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset/list".format( + list_dataset_url = "{}/system/api/v1/cli/storage/list".format( ServerConstants.get_mlops_url()) return list_dataset_url @staticmethod def get_dataset_metadata_url(): - get_dataset_metadata_url = "{}/fedmlOpsServer/api/v1/cli/dataset/meta".format( + get_dataset_metadata_url = "{}/system/api/v1/cli/storage/meta".format( ServerConstants.get_mlops_url()) return get_dataset_metadata_url diff --git a/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py b/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py index 615b77041..3d910da3a 100644 --- a/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py +++ b/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py @@ -15,9 +15,10 @@ def __init__(self): def get_instance(): return FedMLResourceManager() - def check_heartbeat(self, api_key): + def check_heartbeat(self, api_key, encrypted_api_key_flag=False): heartbeat_url = ServerConstants.get_heartbeat_url() - heartbeat_api_headers = {'Content-Type': 'application/json', 'Connection': 'close'} + heartbeat_api_headers = {'Content-Type': 'application/json', 'Connection': 'close', + 'Encrypted': str(encrypted_api_key_flag)} heartbeat_json = { "apiKey": api_key }