From 1c33ce5c299bddb8a81088e88499b223916bc0ac Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Tue, 17 Sep 2024 19:47:51 +0530 Subject: [PATCH 01/30] initial implementation of checkpoint state --- notebooks/notebook_helpers/checkpoint.py | 137 ++++++++++++++++++ ...tart-and-configure-server-and-admins.ipynb | 77 +++++++++- .../001-scale-delete-worker-pools.ipynb | 67 ++++++--- 3 files changed, 257 insertions(+), 24 deletions(-) create mode 100644 notebooks/notebook_helpers/checkpoint.py diff --git a/notebooks/notebook_helpers/checkpoint.py b/notebooks/notebook_helpers/checkpoint.py new file mode 100644 index 00000000000..28f8ae76384 --- /dev/null +++ b/notebooks/notebook_helpers/checkpoint.py @@ -0,0 +1,137 @@ +# stdlib +import datetime +import json +from pathlib import Path + +# third party +import ipykernel + +# syft absolute +from syft import SyftError +from syft import SyftException +from syft.client.client import SyftClient +from syft.service.user.user_roles import ServiceRole +from syft.util.util import get_root_data_path + +NB_STATE_DIRECTORY = "nb_checkpoints" + + +def current_nbname() -> Path: + """Get the current notebook name""" + curr_kernel_file = Path(ipykernel.get_connection_file()) + kernel_file = json.loads(curr_kernel_file.read_text()) + nb_name = kernel_file["jupyter_session"] + return Path(nb_name) + + +def get_or_create_state_dir(filename: str) -> Path: + """Get or create the state directory for the given filename.""" + + # Generate the current timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + checkpoint_dir = f"checkpoint_{timestamp}" + + nb_path = get_root_data_path() / NB_STATE_DIRECTORY + filepath = nb_path / filename / checkpoint_dir + if not filepath.exists(): + filepath.mkdir(parents=True, exist_ok=True) + return filepath + + +def state_dir_for_nb(server_uid: str) -> Path: + """State directory for the current notebook""" + nb_name = current_nbname().stem # Get the filename without extension + return get_or_create_state_dir(filename=f"{nb_name}/{server_uid}") + + +def is_admin(client: SyftClient) -> bool: + """Is User an admin.""" + return client._SyftClient__user_role == ServiceRole.ADMIN + + +def checkpoint_db( + client: SyftClient, + root_email: str = "info@openmined.org", + root_pwd: str = "changethis", +) -> None: + """Save checkpoint for database.""" + + # Get root client (login if not already admin) + root_client = ( + client + if is_admin(client) + else client.login(email=root_email, password=root_pwd) + ) + + # Get migration data from the database + migration_data = root_client.get_migration_data(include_blobs=True) + if isinstance(migration_data, SyftError): + raise SyftException(message=migration_data.message) + + # Define the state directory for the current notebook and server + state_dir = state_dir_for_nb(server_uid=client.id.to_string()) + + # Save migration data in blob and yaml format + migration_data.save( + path=state_dir / "migration.blob", yaml_path=state_dir / "migration.yaml" + ) + + print(f"Checkpoint saved at: \n {state_dir}") + return state_dir + + +def last_db_checkpoint_dir(filename: str, server_id: str) -> Path | None: + """Return the directory of the latest checkpoint for the given filename.""" + + filename = filename.split(".json")[0] + checkpoint_parent_dir = get_or_create_state_dir(f"{filename}/{server_id}").parent + + # Get all checkpoint directory matching the pattern + checkpoint_dirs = [ + d for d in checkpoint_parent_dir.glob("checkpoint_*") if d.is_dir() + ] + + checkpoints_dirs_with_blob_entry = [ + d for d in checkpoint_dirs if any(d.glob("*.blob")) + ] + + if checkpoints_dirs_with_blob_entry: + # Return the latest directory based on modification time + return max(checkpoints_dirs_with_blob_entry, key=lambda d: d.stat().st_mtime) + + return None + + +def load_from_checkpoint( + prev_nb_filename: str, + client: SyftClient, + root_email: str = "info@openmined.org", + root_pwd: str = "changethis", +) -> None: + """Load the last saved checkpoint for the given notebook state.""" + + # Get root client (login if not already admin) + root_client = ( + client + if is_admin(client) + else client.login(email=root_email, password=root_pwd) + ) + + lastest_checkpoint_dir = last_db_checkpoint_dir( + prev_nb_filename, client.id.to_string() + ) + + if lastest_checkpoint_dir is None: + print("No previous checkpoint found !") + return + + print(f"Loading from checkpoint: {lastest_checkpoint_dir}") + + result = root_client.load_migration_data( + path=lastest_checkpoint_dir / "migration.blob" + ) + + if isinstance(result, SyftError): + raise SyftException(message=result.message) + + print("Successfully loaded data from checkpoint.") diff --git a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb index f89a05e61aa..a8407714dcf 100644 --- a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb +++ b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb @@ -200,6 +200,79 @@ "root_client.users.delete(new_user_id2)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# import os\n", + "# from syft.client.client import SyftClient\n", + "# from syft.service.user.user_roles import ServiceRole\n", + "# import ipykernel\n", + "# import json\n", + "# from pathlib import Path\n", + "# import uuid\n", + "# from syft.util.util import get_root_data_path\n", + "# from syft import SyftError, SyftException\n", + "\n", + "# NB_STATE_DIRECTORY = \"nb_states\"\n", + "\n", + "# def current_nbname() -> Path:\n", + "# \"\"\"Get the \"\"\"\n", + "# curr_kernel_file = Path(ipykernel.get_connection_file())\n", + "# kernel_file = json.loads(curr_kernel_file.read_text())\n", + "# nb_name = kernel_file['jupyter_session']\n", + "# return Path(nb_name)\n", + "\n", + "\n", + "# def get_or_create_state_dir(filename: str) -> Path:\n", + "# nb_path = get_root_data_path() / NB_STATE_DIRECTORY\n", + "# filepath = nb_path / filename\n", + "# if not filepath.exists():\n", + "# filepath.mkdir(parents=True, exist_ok=True)\n", + "# return filepath\n", + "\n", + "\n", + "# def state_dir_for_nb() -> Path:\n", + "# nb_name = current_nbname()\n", + "# filename = nb_name.name.split(\".json\")[0]\n", + "# return get_or_create_state_dir(filename=nb_name.name)\n", + "\n", + "\n", + "# def is_admin(client: SyftClient) -> bool:\n", + "# return client._SyftClient__user_role == ServiceRole.ADMIN\n", + "\n", + "# def checkpoint_db(client: SyftClient, root_email: str | None = None, root_pwd: str | None = None) -> None:\n", + "# migration_data = client.get_migration_data(include_blobs=True)\n", + "# if isinstance(migration_data, SyftError):\n", + "# raise SyftException(message=migration_data.message)\n", + "\n", + "# state_dir = state_dir_for_nb()\n", + "# state_dir_server = state_dir / client.id.to_string()\n", + "# migration_data.save(path=state_dir_server/\"migration.blob\", yaml_path=state_dir_server/\"migration.yaml\")\n", + "# return state_dir_server" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# third party\n", + "from checkpoint import checkpoint_db" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "checkpoint_db(root_client)" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -235,7 +308,7 @@ ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -249,7 +322,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.4" + "version": "3.11.5" } }, "nbformat": 4, diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 6c5f07a1c19..f6a66b148e4 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -112,7 +112,8 @@ "metadata": {}, "outputs": [], "source": [ - "high_client.worker_pools" + "# third party\n", + "from checkpoint import load_from_checkpoint" ] }, { @@ -121,6 +122,28 @@ "id": "9", "metadata": {}, "outputs": [], + "source": [ + "load_from_checkpoint(\n", + " prev_nb_filename=\"000-start-and-configure-server-and-admins\", client=high_client\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "high_client.worker_pools" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], "source": [ "default_worker_pool = high_client.worker_pools.get_by_name(\"default-pool\")\n", "default_worker_pool" @@ -128,7 +151,7 @@ }, { "cell_type": "markdown", - "id": "10", + "id": "12", "metadata": {}, "source": [ "### Scale Worker pool" @@ -136,7 +159,7 @@ }, { "cell_type": "markdown", - "id": "11", + "id": "13", "metadata": {}, "source": [ "##### Scale up" @@ -145,7 +168,7 @@ { "cell_type": "code", "execution_count": null, - "id": "12", + "id": "14", "metadata": {}, "outputs": [], "source": [ @@ -159,7 +182,7 @@ { "cell_type": "code", "execution_count": null, - "id": "13", + "id": "15", "metadata": {}, "outputs": [], "source": [ @@ -169,7 +192,7 @@ { "cell_type": "code", "execution_count": null, - "id": "14", + "id": "16", "metadata": {}, "outputs": [], "source": [ @@ -189,7 +212,7 @@ }, { "cell_type": "markdown", - "id": "15", + "id": "17", "metadata": {}, "source": [ "##### Scale down" @@ -198,7 +221,7 @@ { "cell_type": "code", "execution_count": null, - "id": "16", + "id": "18", "metadata": {}, "outputs": [], "source": [ @@ -213,7 +236,7 @@ { "cell_type": "code", "execution_count": null, - "id": "17", + "id": "19", "metadata": {}, "outputs": [], "source": [ @@ -232,7 +255,7 @@ { "cell_type": "code", "execution_count": null, - "id": "18", + "id": "20", "metadata": {}, "outputs": [], "source": [ @@ -245,7 +268,7 @@ }, { "cell_type": "markdown", - "id": "19", + "id": "21", "metadata": {}, "source": [ "#### Delete Worker Pool" @@ -254,7 +277,7 @@ { "cell_type": "code", "execution_count": null, - "id": "20", + "id": "22", "metadata": {}, "outputs": [], "source": [ @@ -267,7 +290,7 @@ { "cell_type": "code", "execution_count": null, - "id": "21", + "id": "23", "metadata": {}, "outputs": [], "source": [ @@ -277,7 +300,7 @@ }, { "cell_type": "markdown", - "id": "22", + "id": "24", "metadata": {}, "source": [ "#### Re-launch the default worker pool" @@ -286,7 +309,7 @@ { "cell_type": "code", "execution_count": null, - "id": "23", + "id": "25", "metadata": {}, "outputs": [], "source": [ @@ -296,7 +319,7 @@ { "cell_type": "code", "execution_count": null, - "id": "24", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -310,7 +333,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "27", "metadata": {}, "outputs": [], "source": [ @@ -324,7 +347,7 @@ { "cell_type": "code", "execution_count": null, - "id": "26", + "id": "28", "metadata": {}, "outputs": [], "source": [ @@ -334,7 +357,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "29", "metadata": {}, "outputs": [], "source": [ @@ -344,7 +367,7 @@ { "cell_type": "code", "execution_count": null, - "id": "28", + "id": "30", "metadata": {}, "outputs": [], "source": [] @@ -352,7 +375,7 @@ ], "metadata": { "kernelspec": { - "display_name": "syft", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -366,7 +389,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.5" } }, "nbformat": 4, From 78f3cdbdc97dff72bb309b16e9b49d2d318d58c2 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 18 Sep 2024 18:18:55 +0530 Subject: [PATCH 02/30] clean up code --- notebooks/notebook_helpers/checkpoint.py | 137 ----------------- ...tart-and-configure-server-and-admins.ipynb | 58 +------ .../001-scale-delete-worker-pools.ipynb | 68 ++++----- test_helpers/checkpoint.py | 142 ++++++++++++++++++ 4 files changed, 178 insertions(+), 227 deletions(-) delete mode 100644 notebooks/notebook_helpers/checkpoint.py create mode 100644 test_helpers/checkpoint.py diff --git a/notebooks/notebook_helpers/checkpoint.py b/notebooks/notebook_helpers/checkpoint.py deleted file mode 100644 index 28f8ae76384..00000000000 --- a/notebooks/notebook_helpers/checkpoint.py +++ /dev/null @@ -1,137 +0,0 @@ -# stdlib -import datetime -import json -from pathlib import Path - -# third party -import ipykernel - -# syft absolute -from syft import SyftError -from syft import SyftException -from syft.client.client import SyftClient -from syft.service.user.user_roles import ServiceRole -from syft.util.util import get_root_data_path - -NB_STATE_DIRECTORY = "nb_checkpoints" - - -def current_nbname() -> Path: - """Get the current notebook name""" - curr_kernel_file = Path(ipykernel.get_connection_file()) - kernel_file = json.loads(curr_kernel_file.read_text()) - nb_name = kernel_file["jupyter_session"] - return Path(nb_name) - - -def get_or_create_state_dir(filename: str) -> Path: - """Get or create the state directory for the given filename.""" - - # Generate the current timestamp - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - checkpoint_dir = f"checkpoint_{timestamp}" - - nb_path = get_root_data_path() / NB_STATE_DIRECTORY - filepath = nb_path / filename / checkpoint_dir - if not filepath.exists(): - filepath.mkdir(parents=True, exist_ok=True) - return filepath - - -def state_dir_for_nb(server_uid: str) -> Path: - """State directory for the current notebook""" - nb_name = current_nbname().stem # Get the filename without extension - return get_or_create_state_dir(filename=f"{nb_name}/{server_uid}") - - -def is_admin(client: SyftClient) -> bool: - """Is User an admin.""" - return client._SyftClient__user_role == ServiceRole.ADMIN - - -def checkpoint_db( - client: SyftClient, - root_email: str = "info@openmined.org", - root_pwd: str = "changethis", -) -> None: - """Save checkpoint for database.""" - - # Get root client (login if not already admin) - root_client = ( - client - if is_admin(client) - else client.login(email=root_email, password=root_pwd) - ) - - # Get migration data from the database - migration_data = root_client.get_migration_data(include_blobs=True) - if isinstance(migration_data, SyftError): - raise SyftException(message=migration_data.message) - - # Define the state directory for the current notebook and server - state_dir = state_dir_for_nb(server_uid=client.id.to_string()) - - # Save migration data in blob and yaml format - migration_data.save( - path=state_dir / "migration.blob", yaml_path=state_dir / "migration.yaml" - ) - - print(f"Checkpoint saved at: \n {state_dir}") - return state_dir - - -def last_db_checkpoint_dir(filename: str, server_id: str) -> Path | None: - """Return the directory of the latest checkpoint for the given filename.""" - - filename = filename.split(".json")[0] - checkpoint_parent_dir = get_or_create_state_dir(f"{filename}/{server_id}").parent - - # Get all checkpoint directory matching the pattern - checkpoint_dirs = [ - d for d in checkpoint_parent_dir.glob("checkpoint_*") if d.is_dir() - ] - - checkpoints_dirs_with_blob_entry = [ - d for d in checkpoint_dirs if any(d.glob("*.blob")) - ] - - if checkpoints_dirs_with_blob_entry: - # Return the latest directory based on modification time - return max(checkpoints_dirs_with_blob_entry, key=lambda d: d.stat().st_mtime) - - return None - - -def load_from_checkpoint( - prev_nb_filename: str, - client: SyftClient, - root_email: str = "info@openmined.org", - root_pwd: str = "changethis", -) -> None: - """Load the last saved checkpoint for the given notebook state.""" - - # Get root client (login if not already admin) - root_client = ( - client - if is_admin(client) - else client.login(email=root_email, password=root_pwd) - ) - - lastest_checkpoint_dir = last_db_checkpoint_dir( - prev_nb_filename, client.id.to_string() - ) - - if lastest_checkpoint_dir is None: - print("No previous checkpoint found !") - return - - print(f"Loading from checkpoint: {lastest_checkpoint_dir}") - - result = root_client.load_migration_data( - path=lastest_checkpoint_dir / "migration.blob" - ) - - if isinstance(result, SyftError): - raise SyftException(message=result.message) - - print("Successfully loaded data from checkpoint.") diff --git a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb index b0be4a13119..b1c04d1e16f 100644 --- a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb +++ b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb @@ -200,60 +200,6 @@ "root_client.users.delete(new_user_id2)" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# import os\n", - "# from syft.client.client import SyftClient\n", - "# from syft.service.user.user_roles import ServiceRole\n", - "# import ipykernel\n", - "# import json\n", - "# from pathlib import Path\n", - "# import uuid\n", - "# from syft.util.util import get_root_data_path\n", - "# from syft import SyftError, SyftException\n", - "\n", - "# NB_STATE_DIRECTORY = \"nb_states\"\n", - "\n", - "# def current_nbname() -> Path:\n", - "# \"\"\"Get the \"\"\"\n", - "# curr_kernel_file = Path(ipykernel.get_connection_file())\n", - "# kernel_file = json.loads(curr_kernel_file.read_text())\n", - "# nb_name = kernel_file['jupyter_session']\n", - "# return Path(nb_name)\n", - "\n", - "\n", - "# def get_or_create_state_dir(filename: str) -> Path:\n", - "# nb_path = get_root_data_path() / NB_STATE_DIRECTORY\n", - "# filepath = nb_path / filename\n", - "# if not filepath.exists():\n", - "# filepath.mkdir(parents=True, exist_ok=True)\n", - "# return filepath\n", - "\n", - "\n", - "# def state_dir_for_nb() -> Path:\n", - "# nb_name = current_nbname()\n", - "# filename = nb_name.name.split(\".json\")[0]\n", - "# return get_or_create_state_dir(filename=nb_name.name)\n", - "\n", - "\n", - "# def is_admin(client: SyftClient) -> bool:\n", - "# return client._SyftClient__user_role == ServiceRole.ADMIN\n", - "\n", - "# def checkpoint_db(client: SyftClient, root_email: str | None = None, root_pwd: str | None = None) -> None:\n", - "# migration_data = client.get_migration_data(include_blobs=True)\n", - "# if isinstance(migration_data, SyftError):\n", - "# raise SyftException(message=migration_data.message)\n", - "\n", - "# state_dir = state_dir_for_nb()\n", - "# state_dir_server = state_dir / client.id.to_string()\n", - "# migration_data.save(path=state_dir_server/\"migration.blob\", yaml_path=state_dir_server/\"migration.yaml\")\n", - "# return state_dir_server" - ] - }, { "cell_type": "code", "execution_count": null, @@ -261,7 +207,7 @@ "outputs": [], "source": [ "# third party\n", - "from checkpoint import checkpoint_db" + "from checkpoint import create_checkpoint" ] }, { @@ -270,7 +216,7 @@ "metadata": {}, "outputs": [], "source": [ - "checkpoint_db(root_client)" + "create_checkpoint(root_client)" ] }, { diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index b8b4290b389..dd49fc1f198 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -40,14 +40,6 @@ "id": "2", "metadata": {}, "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3", - "metadata": {}, - "outputs": [], "source": [ "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", "\n", @@ -60,7 +52,7 @@ }, { "cell_type": "markdown", - "id": "4", + "id": "3", "metadata": {}, "source": [ "### Launch server & login" @@ -69,7 +61,7 @@ { "cell_type": "code", "execution_count": null, - "id": "5", + "id": "4", "metadata": {}, "outputs": [], "source": [ @@ -86,7 +78,7 @@ { "cell_type": "code", "execution_count": null, - "id": "6", + "id": "5", "metadata": {}, "outputs": [], "source": [ @@ -96,7 +88,7 @@ { "cell_type": "code", "execution_count": null, - "id": "7", + "id": "6", "metadata": {}, "outputs": [], "source": [ @@ -108,7 +100,7 @@ { "cell_type": "code", "execution_count": null, - "id": "8", + "id": "7", "metadata": {}, "outputs": [], "source": [ @@ -119,7 +111,7 @@ { "cell_type": "code", "execution_count": null, - "id": "9", + "id": "8", "metadata": {}, "outputs": [], "source": [ @@ -131,7 +123,7 @@ { "cell_type": "code", "execution_count": null, - "id": "10", + "id": "9", "metadata": {}, "outputs": [], "source": [ @@ -141,7 +133,7 @@ { "cell_type": "code", "execution_count": null, - "id": "11", + "id": "10", "metadata": {}, "outputs": [], "source": [ @@ -151,7 +143,7 @@ }, { "cell_type": "markdown", - "id": "12", + "id": "11", "metadata": {}, "source": [ "### Scale Worker pool" @@ -159,7 +151,7 @@ }, { "cell_type": "markdown", - "id": "13", + "id": "12", "metadata": {}, "source": [ "##### Scale up" @@ -168,7 +160,7 @@ { "cell_type": "code", "execution_count": null, - "id": "14", + "id": "13", "metadata": {}, "outputs": [], "source": [ @@ -182,7 +174,7 @@ { "cell_type": "code", "execution_count": null, - "id": "15", + "id": "14", "metadata": {}, "outputs": [], "source": [ @@ -192,7 +184,7 @@ { "cell_type": "code", "execution_count": null, - "id": "16", + "id": "15", "metadata": {}, "outputs": [], "source": [ @@ -212,7 +204,7 @@ }, { "cell_type": "markdown", - "id": "17", + "id": "16", "metadata": {}, "source": [ "##### Scale down" @@ -221,7 +213,7 @@ { "cell_type": "code", "execution_count": null, - "id": "18", + "id": "17", "metadata": {}, "outputs": [], "source": [ @@ -236,7 +228,7 @@ { "cell_type": "code", "execution_count": null, - "id": "19", + "id": "18", "metadata": {}, "outputs": [], "source": [ @@ -255,7 +247,7 @@ { "cell_type": "code", "execution_count": null, - "id": "20", + "id": "19", "metadata": {}, "outputs": [], "source": [ @@ -268,7 +260,7 @@ }, { "cell_type": "markdown", - "id": "21", + "id": "20", "metadata": {}, "source": [ "#### Delete Worker Pool" @@ -277,7 +269,7 @@ { "cell_type": "code", "execution_count": null, - "id": "22", + "id": "21", "metadata": {}, "outputs": [], "source": [ @@ -290,7 +282,7 @@ { "cell_type": "code", "execution_count": null, - "id": "23", + "id": "22", "metadata": {}, "outputs": [], "source": [ @@ -300,7 +292,7 @@ }, { "cell_type": "markdown", - "id": "24", + "id": "23", "metadata": {}, "source": [ "#### Re-launch the default worker pool" @@ -309,7 +301,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "24", "metadata": {}, "outputs": [], "source": [ @@ -319,7 +311,7 @@ { "cell_type": "code", "execution_count": null, - "id": "26", + "id": "25", "metadata": {}, "outputs": [], "source": [ @@ -333,7 +325,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -347,7 +339,7 @@ { "cell_type": "code", "execution_count": null, - "id": "28", + "id": "27", "metadata": {}, "outputs": [], "source": [ @@ -357,13 +349,21 @@ { "cell_type": "code", "execution_count": null, - "id": "29", + "id": "28", "metadata": {}, "outputs": [], "source": [ "server.land()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "29", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, diff --git a/test_helpers/checkpoint.py b/test_helpers/checkpoint.py new file mode 100644 index 00000000000..d7733943f5a --- /dev/null +++ b/test_helpers/checkpoint.py @@ -0,0 +1,142 @@ +# stdlib +import datetime +import json +from pathlib import Path + +# third party +import ipykernel + +# syft absolute +from syft import SyftError +from syft import SyftException +from syft.client.client import SyftClient +from syft.service.user.user_roles import ServiceRole +from syft.util.util import get_root_data_path +from syft.util.util import is_interpreter_jupyter + +CHECKPOINT_ROOT = "checkpoints" +CHECKPOINT_DIR_PREFIX = "chkpt" + + +def current_nbname() -> Path: + """Retrieve the current Jupyter notebook name.""" + curr_kernel_file = Path(ipykernel.get_connection_file()) + kernel_file = json.loads(curr_kernel_file.read_text()) + nb_name = kernel_file["jupyter_session"] + return Path(nb_name) + + +def root_checkpoint_path() -> Path: + return get_root_data_path() / CHECKPOINT_ROOT + + +def checkpoint_parent_dir(server_uid: str, nb_name: str | None = None) -> Path: + """Return the checkpoint directory for the current notebook and server.""" + if is_interpreter_jupyter: + nb_name = nb_name if nb_name else current_nbname().stem + return Path(f"{nb_name}/{server_uid}") + return Path(server_uid) + + +def get_checkpoints_dir(server_uid: str, nb_name: str) -> Path: + return root_checkpoint_path() / checkpoint_parent_dir(server_uid, nb_name) + + +def get_checkpoint_dir( + server_uid: str, checkpoint_dir: str, nb_name: str | None = None +) -> Path: + return get_checkpoints_dir(server_uid, nb_name) / checkpoint_dir + + +def create_checkpoint_dir(server_uid: str) -> Path: + """Create a checkpoint directory for the current notebook and server.""" + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + checkpoint_dir = f"{CHECKPOINT_DIR_PREFIX}_{timestamp}" + checkpoint_full_path = get_checkpoint_dir(server_uid, checkpoint_dir=checkpoint_dir) + + checkpoint_full_path.mkdir(parents=True, exist_ok=True) + return checkpoint_full_path + + +def is_admin(client: SyftClient) -> bool: + return client._SyftClient__user_role == ServiceRole.ADMIN + + +def create_checkpoint( + client: SyftClient, + root_email: str = "info@openmined.org", + root_pwd: str = "changethis", +) -> None: + """Save a checkpoint for the database.""" + root_client = ( + client + if is_admin(client) + else client.login(email=root_email, password=root_pwd) + ) + migration_data = root_client.get_migration_data(include_blobs=True) + + if isinstance(migration_data, SyftError): + raise SyftException(message=migration_data.message) + + if not is_interpreter_jupyter(): + raise SyftException( + message="Checkpoint can only be created in Jupyter Notebook." + ) + + checkpoint_dir = create_checkpoint_dir(server_uid=client.id.to_string()) + migration_data.save( + path=checkpoint_dir / "migration.blob", + yaml_path=checkpoint_dir / "migration.yaml", + ) + print(f"Checkpoint saved at: \n {checkpoint_dir}") + + +def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | None: + """Return the directory of the latest checkpoint for the given notebook.""" + nb_name = nb_name if nb_name else current_nbname().stem + filename = nb_name.split(".ipynb")[0] + checkpoint_parent_dir = get_checkpoints_dir(server_uid, filename) + checkpoint_dirs = [ + d + for d in checkpoint_parent_dir.glob(f"{CHECKPOINT_DIR_PREFIX}_*") + if d.is_dir() + ] + checkpoints_dirs_with_blob_entry = [ + d for d in checkpoint_dirs if any(d.glob("*.blob")) + ] + + if checkpoints_dirs_with_blob_entry: + return max(checkpoints_dirs_with_blob_entry, key=lambda d: d.stat().st_mtime) + + return None + + +def load_from_checkpoint( + prev_nb_filename: str, + client: SyftClient, + root_email: str = "info@openmined.org", + root_pwd: str = "changethis", +) -> None: + """Load the last saved checkpoint for the given notebook state.""" + root_client = ( + client + if is_admin(client) + else client.login(email=root_email, password=root_pwd) + ) + latest_checkpoint_dir = last_checkpoint_path_for_nb( + client.id.to_string(), prev_nb_filename + ) + + if latest_checkpoint_dir is None: + print("No previous checkpoint found!") + return + + print(f"Loading from checkpoint: {latest_checkpoint_dir}") + result = root_client.load_migration_data( + path=latest_checkpoint_dir / "migration.blob" + ) + + if isinstance(result, SyftError): + raise SyftException(message=result.message) + + print("Successfully loaded data from checkpoint.") From 2db9c38e9405e921e3475edd8c6ad66f501ac1dc Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 20 Sep 2024 12:24:55 +0530 Subject: [PATCH 03/30] move checkpoint to syft util test_helpers fix notebooks --- ...tart-and-configure-server-and-admins.ipynb | 11 +--- .../001-scale-delete-worker-pools.ipynb | 54 ++++++++----------- .../src/syft/util/test_helpers}/checkpoint.py | 0 3 files changed, 23 insertions(+), 42 deletions(-) rename {test_helpers => packages/syft/src/syft/util/test_helpers}/checkpoint.py (100%) diff --git a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb index d7e167f78af..80542a00296 100644 --- a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb +++ b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb @@ -25,6 +25,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", + "from syft.util.test_helpers.checkpoint import create_checkpoint\n", "from syft.util.test_helpers.email_helpers import get_email_server" ] }, @@ -188,16 +189,6 @@ "root_client.users.delete(new_user_id2)" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# third party\n", - "from checkpoint import create_checkpoint" - ] - }, { "cell_type": "code", "execution_count": null, diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 3d8301db9da..831ee782bec 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -25,6 +25,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", + "from syft.util.test_helpers.checkpoint import load_from_checkpoint\n", "from syft.util.test_helpers.email_helpers import Timeout\n", "from syft.util.test_helpers.email_helpers import get_email_server" ] @@ -98,17 +99,6 @@ "id": "7", "metadata": {}, "outputs": [], - "source": [ - "# third party\n", - "from checkpoint import load_from_checkpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8", - "metadata": {}, - "outputs": [], "source": [ "load_from_checkpoint(\n", " prev_nb_filename=\"000-start-and-configure-server-and-admins\", client=high_client\n", @@ -118,7 +108,7 @@ { "cell_type": "code", "execution_count": null, - "id": "9", + "id": "8", "metadata": {}, "outputs": [], "source": [ @@ -128,7 +118,7 @@ { "cell_type": "code", "execution_count": null, - "id": "10", + "id": "9", "metadata": {}, "outputs": [], "source": [ @@ -138,7 +128,7 @@ }, { "cell_type": "markdown", - "id": "11", + "id": "10", "metadata": {}, "source": [ "### Scale Worker pool" @@ -146,7 +136,7 @@ }, { "cell_type": "markdown", - "id": "12", + "id": "11", "metadata": {}, "source": [ "##### Scale up" @@ -155,7 +145,7 @@ { "cell_type": "code", "execution_count": null, - "id": "13", + "id": "12", "metadata": {}, "outputs": [], "source": [ @@ -169,7 +159,7 @@ { "cell_type": "code", "execution_count": null, - "id": "14", + "id": "13", "metadata": {}, "outputs": [], "source": [ @@ -179,7 +169,7 @@ { "cell_type": "code", "execution_count": null, - "id": "15", + "id": "14", "metadata": {}, "outputs": [], "source": [ @@ -199,7 +189,7 @@ }, { "cell_type": "markdown", - "id": "16", + "id": "15", "metadata": {}, "source": [ "##### Scale down" @@ -208,7 +198,7 @@ { "cell_type": "code", "execution_count": null, - "id": "17", + "id": "16", "metadata": {}, "outputs": [], "source": [ @@ -223,7 +213,7 @@ { "cell_type": "code", "execution_count": null, - "id": "18", + "id": "17", "metadata": {}, "outputs": [], "source": [ @@ -242,7 +232,7 @@ { "cell_type": "code", "execution_count": null, - "id": "19", + "id": "18", "metadata": {}, "outputs": [], "source": [ @@ -255,7 +245,7 @@ }, { "cell_type": "markdown", - "id": "20", + "id": "19", "metadata": {}, "source": [ "#### Delete Worker Pool" @@ -264,7 +254,7 @@ { "cell_type": "code", "execution_count": null, - "id": "21", + "id": "20", "metadata": {}, "outputs": [], "source": [ @@ -277,7 +267,7 @@ { "cell_type": "code", "execution_count": null, - "id": "22", + "id": "21", "metadata": {}, "outputs": [], "source": [ @@ -287,7 +277,7 @@ }, { "cell_type": "markdown", - "id": "23", + "id": "22", "metadata": {}, "source": [ "#### Re-launch the default worker pool" @@ -296,7 +286,7 @@ { "cell_type": "code", "execution_count": null, - "id": "24", + "id": "23", "metadata": {}, "outputs": [], "source": [ @@ -306,7 +296,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "24", "metadata": {}, "outputs": [], "source": [ @@ -320,7 +310,7 @@ { "cell_type": "code", "execution_count": null, - "id": "26", + "id": "25", "metadata": {}, "outputs": [], "source": [ @@ -334,7 +324,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -344,7 +334,7 @@ { "cell_type": "code", "execution_count": null, - "id": "28", + "id": "27", "metadata": {}, "outputs": [], "source": [ @@ -354,7 +344,7 @@ { "cell_type": "code", "execution_count": null, - "id": "29", + "id": "28", "metadata": {}, "outputs": [], "source": [] diff --git a/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py similarity index 100% rename from test_helpers/checkpoint.py rename to packages/syft/src/syft/util/test_helpers/checkpoint.py From 972649a3ea928c6299d75d7110a632da65b50134 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 23 Sep 2024 11:31:55 +0530 Subject: [PATCH 04/30] add an API for database reset - move env getters to new file to prevent circular imports - move create admin method to user module --- packages/syft/src/syft/abstract_server.py | 2 + packages/syft/src/syft/server/env.py | 120 ++++++++++++ packages/syft/src/syft/server/server.py | 182 ++---------------- .../syft/service/settings/settings_service.py | 31 +++ packages/syft/src/syft/service/user/utils.py | 63 ++++++ .../src/syft/util/test_helpers/checkpoint.py | 23 ++- 6 files changed, 247 insertions(+), 174 deletions(-) create mode 100644 packages/syft/src/syft/server/env.py create mode 100644 packages/syft/src/syft/service/user/utils.py diff --git a/packages/syft/src/syft/abstract_server.py b/packages/syft/src/syft/abstract_server.py index 3b7885f0a0e..8c945f9abbb 100644 --- a/packages/syft/src/syft/abstract_server.py +++ b/packages/syft/src/syft/abstract_server.py @@ -6,6 +6,7 @@ # relative from .serde.serializable import serializable from .store.db.db import DBConfig +from .store.db.db import DBManager from .types.uid import UID if TYPE_CHECKING: @@ -43,6 +44,7 @@ class AbstractServer: in_memory_workers: bool services: "ServiceRegistry" db_config: DBConfig + db: DBManager[DBConfig] def get_service(self, path_or_func: str | Callable) -> "AbstractService": raise NotImplementedError diff --git a/packages/syft/src/syft/server/env.py b/packages/syft/src/syft/server/env.py new file mode 100644 index 00000000000..28ba186c974 --- /dev/null +++ b/packages/syft/src/syft/server/env.py @@ -0,0 +1,120 @@ +# stdlib +import json +import subprocess +import sys + +# relative +from ..service.worker.utils import DEFAULT_WORKER_POOL_NAME +from ..types.uid import UID +from ..util.util import get_env +from ..util.util import str_to_bool + +SERVER_PRIVATE_KEY = "SERVER_PRIVATE_KEY" +SERVER_UID = "SERVER_UID" +SERVER_TYPE = "SERVER_TYPE" +SERVER_NAME = "SERVER_NAME" +SERVER_SIDE_TYPE = "SERVER_SIDE_TYPE" + +DEFAULT_ROOT_EMAIL = "DEFAULT_ROOT_EMAIL" +DEFAULT_ROOT_USERNAME = "DEFAULT_ROOT_USERNAME" +DEFAULT_ROOT_PASSWORD = "DEFAULT_ROOT_PASSWORD" # nosec + + +def get_private_key_env() -> str | None: + return get_env(SERVER_PRIVATE_KEY) + + +def get_server_type() -> str | None: + return get_env(SERVER_TYPE, "datasite") + + +def get_server_name() -> str | None: + return get_env(SERVER_NAME, None) + + +def get_server_side_type() -> str | None: + return get_env(SERVER_SIDE_TYPE, "high") + + +def get_server_uid_env() -> str | None: + return get_env(SERVER_UID) + + +def get_default_root_email() -> str | None: + return get_env(DEFAULT_ROOT_EMAIL, "info@openmined.org") + + +def get_default_root_username() -> str | None: + return get_env(DEFAULT_ROOT_USERNAME, "Jane Doe") + + +def get_default_root_password() -> str | None: + return get_env(DEFAULT_ROOT_PASSWORD, "changethis") # nosec + + +def get_enable_warnings() -> bool: + return str_to_bool(get_env("ENABLE_WARNINGS", "False")) + + +def get_container_host() -> str | None: + return get_env("CONTAINER_HOST") + + +def get_default_worker_image() -> str | None: + return get_env("DEFAULT_WORKER_POOL_IMAGE") + + +def get_default_worker_pool_name() -> str | None: + return get_env("DEFAULT_WORKER_POOL_NAME", DEFAULT_WORKER_POOL_NAME) + + +def get_default_bucket_name() -> str: + env = get_env("DEFAULT_BUCKET_NAME") + server_id = get_server_uid_env() or "syft-bucket" + return env or server_id or "syft-bucket" + + +def get_default_worker_pool_pod_annotations() -> dict[str, str] | None: + annotations = get_env("DEFAULT_WORKER_POOL_POD_ANNOTATIONS", "null") + return json.loads(annotations) + + +def get_default_worker_pool_pod_labels() -> dict[str, str] | None: + labels = get_env("DEFAULT_WORKER_POOL_POD_LABELS", "null") + return json.loads(labels) + + +def in_kubernetes() -> bool: + return get_container_host() == "k8s" + + +def get_venv_packages() -> str: + try: + # subprocess call is safe because it uses a fully qualified path and fixed arguments + result = subprocess.run( + [sys.executable, "-m", "pip", "list", "--format=freeze"], # nosec + capture_output=True, + check=True, + text=True, + ) + return result.stdout + except subprocess.CalledProcessError as e: + return f"An error occurred: {e.stderr}" + + +def get_syft_worker() -> bool: + return str_to_bool(get_env("SYFT_WORKER", "false")) + + +def get_k8s_pod_name() -> str | None: + return get_env("K8S_POD_NAME") + + +def get_syft_worker_uid() -> str | None: + is_worker = get_syft_worker() + pod_name = get_k8s_pod_name() + uid = get_env("SYFT_WORKER_UID") + # if uid is empty is a K8S worker, generate a uid from the pod name + if (not uid) and is_worker and pod_name: + uid = str(UID.with_seed(pod_name)) + return uid diff --git a/packages/syft/src/syft/server/server.py b/packages/syft/src/syft/server/server.py index 3f0161057bf..514d8b874b6 100644 --- a/packages/syft/src/syft/server/server.py +++ b/packages/syft/src/syft/server/server.py @@ -8,12 +8,9 @@ from datetime import datetime from functools import partial import hashlib -import json import logging import os from pathlib import Path -import subprocess # nosec -import sys from time import sleep import traceback from typing import Any @@ -72,10 +69,9 @@ from ..service.service import UserServiceConfigRegistry from ..service.settings.settings import ServerSettings from ..service.settings.settings import ServerSettingsUpdate -from ..service.user.user import User -from ..service.user.user import UserCreate from ..service.user.user import UserView from ..service.user.user_roles import ServiceRole +from ..service.user.utils import create_root_admin_if_not_exists from ..service.worker.utils import DEFAULT_WORKER_IMAGE_TAG from ..service.worker.utils import DEFAULT_WORKER_POOL_NAME from ..service.worker.utils import create_default_image @@ -116,10 +112,20 @@ from ..util.util import get_env from ..util.util import get_queue_address from ..util.util import random_name -from ..util.util import str_to_bool from ..util.util import thread_ident from .credentials import SyftSigningKey from .credentials import SyftVerifyKey +from .env import get_default_root_email +from .env import get_default_root_password +from .env import get_default_root_username +from .env import get_default_worker_image +from .env import get_default_worker_pool_name +from .env import get_default_worker_pool_pod_annotations +from .env import get_default_worker_pool_pod_labels +from .env import get_private_key_env +from .env import get_server_uid_env +from .env import get_syft_worker_uid +from .env import in_kubernetes from .service_registry import ServiceRegistry from .utils import get_named_server_uid from .utils import get_temp_dir_for_server @@ -135,71 +141,6 @@ CODE_RELOADER: dict[int, Callable] = {} -SERVER_PRIVATE_KEY = "SERVER_PRIVATE_KEY" -SERVER_UID = "SERVER_UID" -SERVER_TYPE = "SERVER_TYPE" -SERVER_NAME = "SERVER_NAME" -SERVER_SIDE_TYPE = "SERVER_SIDE_TYPE" - -DEFAULT_ROOT_EMAIL = "DEFAULT_ROOT_EMAIL" -DEFAULT_ROOT_USERNAME = "DEFAULT_ROOT_USERNAME" -DEFAULT_ROOT_PASSWORD = "DEFAULT_ROOT_PASSWORD" # nosec - - -def get_private_key_env() -> str | None: - return get_env(SERVER_PRIVATE_KEY) - - -def get_server_type() -> str | None: - return get_env(SERVER_TYPE, "datasite") - - -def get_server_name() -> str | None: - return get_env(SERVER_NAME, None) - - -def get_server_side_type() -> str | None: - return get_env(SERVER_SIDE_TYPE, "high") - - -def get_server_uid_env() -> str | None: - return get_env(SERVER_UID) - - -def get_default_root_email() -> str | None: - return get_env(DEFAULT_ROOT_EMAIL, "info@openmined.org") - - -def get_default_root_username() -> str | None: - return get_env(DEFAULT_ROOT_USERNAME, "Jane Doe") - - -def get_default_root_password() -> str | None: - return get_env(DEFAULT_ROOT_PASSWORD, "changethis") # nosec - - -def get_enable_warnings() -> bool: - return str_to_bool(get_env("ENABLE_WARNINGS", "False")) - - -def get_container_host() -> str | None: - return get_env("CONTAINER_HOST") - - -def get_default_worker_image() -> str | None: - return get_env("DEFAULT_WORKER_POOL_IMAGE") - - -def get_default_worker_pool_name() -> str | None: - return get_env("DEFAULT_WORKER_POOL_NAME", DEFAULT_WORKER_POOL_NAME) - - -def get_default_bucket_name() -> str: - env = get_env("DEFAULT_BUCKET_NAME") - server_id = get_server_uid_env() or "syft-bucket" - return env or server_id or "syft-bucket" - - def get_default_worker_pool_count(server: Server) -> int: return int( get_env( @@ -208,52 +149,6 @@ def get_default_worker_pool_count(server: Server) -> int: ) -def get_default_worker_pool_pod_annotations() -> dict[str, str] | None: - annotations = get_env("DEFAULT_WORKER_POOL_POD_ANNOTATIONS", "null") - return json.loads(annotations) - - -def get_default_worker_pool_pod_labels() -> dict[str, str] | None: - labels = get_env("DEFAULT_WORKER_POOL_POD_LABELS", "null") - return json.loads(labels) - - -def in_kubernetes() -> bool: - return get_container_host() == "k8s" - - -def get_venv_packages() -> str: - try: - # subprocess call is safe because it uses a fully qualified path and fixed arguments - result = subprocess.run( - [sys.executable, "-m", "pip", "list", "--format=freeze"], # nosec - capture_output=True, - check=True, - text=True, - ) - return result.stdout - except subprocess.CalledProcessError as e: - return f"An error occurred: {e.stderr}" - - -def get_syft_worker() -> bool: - return str_to_bool(get_env("SYFT_WORKER", "false")) - - -def get_k8s_pod_name() -> str | None: - return get_env("K8S_POD_NAME") - - -def get_syft_worker_uid() -> str | None: - is_worker = get_syft_worker() - pod_name = get_k8s_pod_name() - uid = get_env("SYFT_WORKER_UID") - # if uid is empty is a K8S worker, generate a uid from the pod name - if (not uid) and is_worker and pod_name: - uid = str(UID.with_seed(pod_name)) - return uid - - signing_key_env = get_private_key_env() server_uid_env = get_server_uid_env() @@ -1720,59 +1615,6 @@ def create_initial_settings(self, admin_email: str) -> ServerSettings: ).unwrap() -def create_root_admin_if_not_exists( - name: str, - email: str, - password: str, - server: Server, -) -> User | None: - """ - If no root admin exists: - - all exists checks on the user stash will fail, as we cannot get the role for the admin to check if it exists - - result: a new admin is always created - - If a root admin exists with a different email: - - cause: DEFAULT_USER_EMAIL env variable is set to a different email than the root admin in the db - - verify_key_exists will return True - - result: no new admin is created, as the server already has a root admin - """ - user_stash = server.services.user.stash - - email_exists = user_stash.email_exists(email=email).unwrap() - if email_exists: - logger.debug("Admin not created, a user with this email already exists") - return None - - verify_key_exists = user_stash.verify_key_exists(server.verify_key).unwrap() - if verify_key_exists: - logger.debug("Admin not created, this server already has a root admin") - return None - - create_user = UserCreate( - name=name, - email=email, - password=password, - password_verify=password, - role=ServiceRole.ADMIN, - ) - - # New User Initialization - # 🟡 TODO: change later but for now this gives the main user super user automatically - user = create_user.to(User) - user.signing_key = server.signing_key - user.verify_key = server.verify_key - - new_user = user_stash.set( - credentials=server.verify_key, - obj=user, - ignore_duplicates=False, - ).unwrap() - - logger.debug(f"Created admin {new_user.email}") - - return new_user - - class ServerRegistry: __server_registry__: dict[UID, Server] = {} diff --git a/packages/syft/src/syft/service/settings/settings_service.py b/packages/syft/src/syft/service/settings/settings_service.py index 10890350e2d..195abfcfbda 100644 --- a/packages/syft/src/syft/service/settings/settings_service.py +++ b/packages/syft/src/syft/service/settings/settings_service.py @@ -8,6 +8,9 @@ # relative from ...abstract_server import ServerSideType from ...serde.serializable import serializable +from ...server.env import get_default_root_email +from ...server.env import get_default_root_password +from ...server.env import get_default_root_username from ...store.db.db import DBManager from ...store.document_store_errors import NotFoundException from ...store.document_store_errors import StashException @@ -26,12 +29,14 @@ from ..context import AuthedServiceContext from ..context import UnauthedServiceContext from ..notifier.notifier_enums import EMAIL_TYPES +from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService from ..service import service_method from ..user.user_roles import ADMIN_ROLE_LEVEL from ..user.user_roles import GUEST_ROLE_LEVEL from ..user.user_roles import ServiceRole +from ..user.utils import create_root_admin_if_not_exists from ..warnings import HighSideCRUDWarning from .settings import ServerSettings from .settings import ServerSettingsUpdate @@ -397,6 +402,32 @@ def welcome_show( return welcome_msg_class(text=result) raise SyftException(public_message="There's no welcome message") + @service_method( + path="settings.reset_database", + name="reset_database", + roles=ADMIN_ROLE_LEVEL, + unwrap_on_success=False, + ) + def reset_database( + self, + context: AuthedServiceContext, + ) -> SyftSuccess | SyftError: + try: + context.server.db.init_tables(reset=True) + create_root_admin_if_not_exists( + name=get_default_root_username(), + email=get_default_root_email(), + password=get_default_root_password(), + server=context.server, + ) + except Exception as e: + return SyftError.from_exception( + context=context, + exc=e, + include_traceback=True, + ) + return SyftSuccess(message="Database reset successfully.") + @service_method( path="settings.get_server_config", name="get_server_config", diff --git a/packages/syft/src/syft/service/user/utils.py b/packages/syft/src/syft/service/user/utils.py new file mode 100644 index 00000000000..191fc4fe181 --- /dev/null +++ b/packages/syft/src/syft/service/user/utils.py @@ -0,0 +1,63 @@ +# stdlib +import logging + +# relative +from ...abstract_server import AbstractServer +from .user import User +from .user import UserCreate +from .user_roles import ServiceRole + +logger = logging.getLogger(__name__) + + +def create_root_admin_if_not_exists( + name: str, + email: str, + password: str, + server: AbstractServer, +) -> User | None: + """ + If no root admin exists: + - all exists checks on the user stash will fail, as we cannot get the role for the admin to check if it exists + - result: a new admin is always created + + If a root admin exists with a different email: + - cause: DEFAULT_USER_EMAIL env variable is set to a different email than the root admin in the db + - verify_key_exists will return True + - result: no new admin is created, as the server already has a root admin + """ + user_stash = server.services.user.stash + + email_exists = user_stash.email_exists(email=email).unwrap() + if email_exists: + logger.debug("Admin not created, a user with this email already exists") + return None + + verify_key_exists = user_stash.verify_key_exists(server.verify_key).unwrap() + if verify_key_exists: + logger.debug("Admin not created, this server already has a root admin") + return None + + create_user = UserCreate( + name=name, + email=email, + password=password, + password_verify=password, + role=ServiceRole.ADMIN, + ) + + # New User Initialization + # 🟡 TODO: change later but for now this gives the main user super user automatically + user = create_user.to(User) + user.signing_key = server.signing_key + user.verify_key = server.verify_key + + new_user = user_stash.set( + credentials=server.verify_key, + obj=user, + ignore_duplicates=False, + ).unwrap() + + logger.debug(f"Created admin {new_user.email}") + + return new_user diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index d7733943f5a..ff8ae77c7a7 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -112,12 +112,20 @@ def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | def load_from_checkpoint( - prev_nb_filename: str, client: SyftClient, - root_email: str = "info@openmined.org", - root_pwd: str = "changethis", + prev_nb_filename: str | None = None, + root_email: str | None = None, + root_pwd: str | None = None, + reset_db: bool = False, ) -> None: """Load the last saved checkpoint for the given notebook state.""" + if prev_nb_filename is None: + print("Loading from the last checkpoint of the current notebook.") + prev_nb_filename = current_nbname().stem + + root_email = "info@openmined.org" if root_email is None else root_email + root_pwd = "changethis" if root_pwd is None else root_pwd + root_client = ( client if is_admin(client) @@ -128,9 +136,16 @@ def load_from_checkpoint( ) if latest_checkpoint_dir is None: - print("No previous checkpoint found!") + print(f"No last checkpoint found for notebook: {prev_nb_filename}") return + if reset_db: + print("Resetting the database before loading the checkpoint.") + result = root_client.settings.reset_database() + + if isinstance(result, SyftError): + raise SyftException(message=result.message) + print(f"Loading from checkpoint: {latest_checkpoint_dir}") result = root_client.load_migration_data( path=latest_checkpoint_dir / "migration.blob" From 1b5b0832a76aee21f1f32ae6e6c5565f0dddbd47 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Tue, 24 Sep 2024 08:49:25 +0530 Subject: [PATCH 05/30] disable reset database API - add a stash method to filter by unique attrs - overwrite objs that are init at startup with the incoming migrated obj --- .../service/migration/migration_service.py | 32 ++++++++++- .../syft/service/settings/settings_service.py | 55 +++++++++---------- packages/syft/src/syft/store/db/stash.py | 48 ++++++++++++++++ 3 files changed, 104 insertions(+), 31 deletions(-) diff --git a/packages/syft/src/syft/service/migration/migration_service.py b/packages/syft/src/syft/service/migration/migration_service.py index b1d461e4e80..bf5870fe8a0 100644 --- a/packages/syft/src/syft/service/migration/migration_service.py +++ b/packages/syft/src/syft/service/migration/migration_service.py @@ -1,5 +1,6 @@ # stdlib from collections import defaultdict +import logging # syft absolute import syft @@ -21,16 +22,23 @@ from ..action.action_permissions import StoragePermission from ..action.action_store import ActionObjectStash from ..context import AuthedServiceContext +from ..notifier.notifier import NotifierSettings from ..response import SyftSuccess from ..service import AbstractService from ..service import service_method +from ..settings.settings import ServerSettings +from ..user.user import User from ..user.user_roles import ADMIN_ROLE_LEVEL from ..worker.utils import DEFAULT_WORKER_POOL_NAME +from ..worker.worker_pool import SyftWorker +from ..worker.worker_pool import WorkerPool from .object_migration_state import MigrationData from .object_migration_state import StoreMetadata from .object_migration_state import SyftMigrationStateStash from .object_migration_state import SyftObjectMigrationState +logger = logging.getLogger(__name__) + @serializable(canonical_name="MigrationService", version=1) class MigrationService(AbstractService): @@ -257,11 +265,33 @@ def _create_migrated_objects( migrated_objects: list[SyftObject], ignore_existing: bool = True, ) -> SyftSuccess: + # These tables are considered as startup tables, if the object is in this table and already exists, + # we need to overwrite the existing object with the migrated object. + STARTUP_TABLES = [ + User, + ServerSettings, + NotifierSettings, + WorkerPool, + SyftWorker, + ] for migrated_object in migrated_objects: stash = self._search_stash_for_klass( context, type(migrated_object) ).unwrap() + # If the object is in the startup tables, we need to overwrite the existing object + if type(migrated_object) in STARTUP_TABLES: + try: + existing_obj = stash.get_by_unique_fields( + credentials=context.credentials, + obj=migrated_object, + ).unwrap() + stash.delete_by_uid( + context.credentials, uid=existing_obj.id + ).unwrap() + except NotFoundException: + pass + result = stash.set( context.credentials, obj=migrated_object, @@ -309,7 +339,7 @@ def _migrate_objects( latest_version = SyftObjectRegistry.get_latest_version(canonical_name) # Migrate data for objects in document store - print( + logger.info( f"Migrating data for: {canonical_name} table to version {latest_version}" ) for object in objects: diff --git a/packages/syft/src/syft/service/settings/settings_service.py b/packages/syft/src/syft/service/settings/settings_service.py index 195abfcfbda..bc37b555904 100644 --- a/packages/syft/src/syft/service/settings/settings_service.py +++ b/packages/syft/src/syft/service/settings/settings_service.py @@ -8,9 +8,6 @@ # relative from ...abstract_server import ServerSideType from ...serde.serializable import serializable -from ...server.env import get_default_root_email -from ...server.env import get_default_root_password -from ...server.env import get_default_root_username from ...store.db.db import DBManager from ...store.document_store_errors import NotFoundException from ...store.document_store_errors import StashException @@ -29,14 +26,12 @@ from ..context import AuthedServiceContext from ..context import UnauthedServiceContext from ..notifier.notifier_enums import EMAIL_TYPES -from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService from ..service import service_method from ..user.user_roles import ADMIN_ROLE_LEVEL from ..user.user_roles import GUEST_ROLE_LEVEL from ..user.user_roles import ServiceRole -from ..user.utils import create_root_admin_if_not_exists from ..warnings import HighSideCRUDWarning from .settings import ServerSettings from .settings import ServerSettingsUpdate @@ -402,31 +397,31 @@ def welcome_show( return welcome_msg_class(text=result) raise SyftException(public_message="There's no welcome message") - @service_method( - path="settings.reset_database", - name="reset_database", - roles=ADMIN_ROLE_LEVEL, - unwrap_on_success=False, - ) - def reset_database( - self, - context: AuthedServiceContext, - ) -> SyftSuccess | SyftError: - try: - context.server.db.init_tables(reset=True) - create_root_admin_if_not_exists( - name=get_default_root_username(), - email=get_default_root_email(), - password=get_default_root_password(), - server=context.server, - ) - except Exception as e: - return SyftError.from_exception( - context=context, - exc=e, - include_traceback=True, - ) - return SyftSuccess(message="Database reset successfully.") + # @service_method( + # path="settings.reset_database", + # name="reset_database", + # roles=ADMIN_ROLE_LEVEL, + # unwrap_on_success=False, + # ) + # def reset_database( + # self, + # context: AuthedServiceContext, + # ) -> SyftSuccess | SyftError: + # try: + # context.server.db.init_tables(reset=True) + # create_root_admin_if_not_exists( + # name=get_default_root_username(), + # email=get_default_root_email(), + # password=get_default_root_password(), + # server=context.server, + # ) + # except Exception as e: + # return SyftError.from_exception( + # context=context, + # exc=e, + # include_traceback=True, + # ) + # return SyftSuccess(message="Database reset successfully.") @service_method( path="settings.get_server_config", diff --git a/packages/syft/src/syft/store/db/stash.py b/packages/syft/src/syft/store/db/stash.py index aec2a2ed9c5..9056578173b 100644 --- a/packages/syft/src/syft/store/db/stash.py +++ b/packages/syft/src/syft/store/db/stash.py @@ -176,6 +176,54 @@ def _print_query(self, stmt: sa.sql.select) -> None: def unique_fields(self) -> list[str]: return getattr(self.object_type, "__attr_unique__", []) + @as_result(SyftException, StashException, NotFoundException) + @with_session + def get_by_unique_fields( + self, + credentials: SyftVerifyKey, + obj: StashT, + session: Session = None, + has_permission: bool = False, + ) -> StashT: + query = self.query() + + if not has_permission: + role = self.get_role(credentials, session=session) + query = query.with_permissions(credentials, role) + + unique_fields = self.unique_fields + + filters = [] + for field_name in unique_fields: + field_value = getattr(obj, field_name, None) + if not is_json_primitive(field_value): + raise StashException( + f"Cannot check uniqueness of non-primitive field {field_name}" + ) + if field_value is None: + continue + filters.append((field_name, "eq", field_value)) + + query = self.query() + query = query.filter_or( + *filters, + ) + + results = query.execute(session).all() + + if len(results) == 1: + result = results[0] + elif len(results) > 1: + raise StashException( + f"Multiple objects found for unique fields: {unique_fields}" + ) + else: + raise NotFoundException( + f"No object found for unique fields: {unique_fields}" + ) + + return result + @with_session def is_unique(self, obj: StashT, session: Session = None) -> bool: unique_fields = self.unique_fields From ec251502ed52f3adb8db1928ee055929a8dedabe Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Tue, 24 Sep 2024 09:29:32 +0530 Subject: [PATCH 06/30] update scenario notebooks with checkpoint --- ...tart-and-configure-server-and-admins.ipynb | 23 +++++++++++++------ .../001-scale-delete-worker-pools.ipynb | 15 +++++++----- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb index 80542a00296..7320426de06 100644 --- a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb +++ b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb @@ -88,6 +88,15 @@ ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "root_client.users" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -190,19 +199,19 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "create_checkpoint(root_client)" + "# Cleanup" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ - "# Cleanup" + "create_checkpoint(root_client)" ] }, { @@ -247,7 +256,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.5" } }, "nbformat": 4, diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 831ee782bec..6a1482e3677 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -68,6 +68,8 @@ " port=\"8080\",\n", " n_consumers=num_workers, # How many workers to be spawned\n", " create_producer=True, # Can produce more workers\n", + " reset=True,\n", + " log_level=10,\n", ")" ] }, @@ -78,7 +80,10 @@ "metadata": {}, "outputs": [], "source": [ - "email_server, smtp_server = get_email_server(reset=True)" + "load_from_checkpoint(\n", + " prev_nb_filename=\"000-start-and-configure-server-and-admins\",\n", + " client=server.client,\n", + ")" ] }, { @@ -88,9 +93,7 @@ "metadata": {}, "outputs": [], "source": [ - "high_client = sy.login(\n", - " url=\"http://localhost:8080\", email=ROOT_EMAIL, password=ROOT_PASSWORD\n", - ")" + "email_server, smtp_server = get_email_server(reset=True)" ] }, { @@ -100,8 +103,8 @@ "metadata": {}, "outputs": [], "source": [ - "load_from_checkpoint(\n", - " prev_nb_filename=\"000-start-and-configure-server-and-admins\", client=high_client\n", + "high_client = sy.login(\n", + " url=\"http://localhost:8080\", email=ROOT_EMAIL, password=ROOT_PASSWORD\n", ")" ] }, From 66552da64792bab63eb1bd0072ec4215a4ae1a60 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 25 Sep 2024 16:15:03 +0530 Subject: [PATCH 07/30] update load checkpoint to prune initial pools and images - restore and spin woker image and pools from checkpoint - fix generation of UID from seed of worker name for inmem workers Co-authored-by: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> --- .../service/migration/migration_service.py | 30 ++++- .../syft/service/settings/settings_service.py | 26 ---- .../service/worker/image_registry_service.py | 5 +- .../syft/src/syft/service/worker/utils.py | 1 + .../src/syft/util/test_helpers/checkpoint.py | 112 ++++++++++++++++-- .../syft/util/test_helpers/worker_helpers.py | 41 +++++++ 6 files changed, 175 insertions(+), 40 deletions(-) diff --git a/packages/syft/src/syft/service/migration/migration_service.py b/packages/syft/src/syft/service/migration/migration_service.py index bf5870fe8a0..d6435fe9801 100644 --- a/packages/syft/src/syft/service/migration/migration_service.py +++ b/packages/syft/src/syft/service/migration/migration_service.py @@ -23,6 +23,7 @@ from ..action.action_store import ActionObjectStash from ..context import AuthedServiceContext from ..notifier.notifier import NotifierSettings +from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService from ..service import service_method @@ -30,8 +31,6 @@ from ..user.user import User from ..user.user_roles import ADMIN_ROLE_LEVEL from ..worker.utils import DEFAULT_WORKER_POOL_NAME -from ..worker.worker_pool import SyftWorker -from ..worker.worker_pool import WorkerPool from .object_migration_state import MigrationData from .object_migration_state import StoreMetadata from .object_migration_state import SyftMigrationStateStash @@ -271,8 +270,6 @@ def _create_migrated_objects( User, ServerSettings, NotifierSettings, - WorkerPool, - SyftWorker, ] for migrated_object in migrated_objects: stash = self._search_stash_for_klass( @@ -480,3 +477,28 @@ def apply_migration_data( # apply metadata self._update_store_metadata(context, migration_data.metadata).unwrap() return SyftSuccess(message="Migration completed successfully") + + # @service_method( + # path="migration.reset_and_restore", + # name="reset_and_restore", + # roles=ADMIN_ROLE_LEVEL, + # unwrap_on_success=False, + # ) + def reset_and_migrate( + self, + context: AuthedServiceContext, + migration_data: MigrationData, + ) -> SyftSuccess | SyftError: + try: + root_verify_key = context.server.verify_key + context.server.db.init_tables(reset=True) + context.credentials = root_verify_key + self.apply_migration_data(context, migration_data) + except Exception as e: + return SyftError.from_exception( + context=context, + exc=e, + include_traceback=True, + ) + + return SyftSuccess(message="Database reset successfully.") diff --git a/packages/syft/src/syft/service/settings/settings_service.py b/packages/syft/src/syft/service/settings/settings_service.py index bc37b555904..10890350e2d 100644 --- a/packages/syft/src/syft/service/settings/settings_service.py +++ b/packages/syft/src/syft/service/settings/settings_service.py @@ -397,32 +397,6 @@ def welcome_show( return welcome_msg_class(text=result) raise SyftException(public_message="There's no welcome message") - # @service_method( - # path="settings.reset_database", - # name="reset_database", - # roles=ADMIN_ROLE_LEVEL, - # unwrap_on_success=False, - # ) - # def reset_database( - # self, - # context: AuthedServiceContext, - # ) -> SyftSuccess | SyftError: - # try: - # context.server.db.init_tables(reset=True) - # create_root_admin_if_not_exists( - # name=get_default_root_username(), - # email=get_default_root_email(), - # password=get_default_root_password(), - # server=context.server, - # ) - # except Exception as e: - # return SyftError.from_exception( - # context=context, - # exc=e, - # include_traceback=True, - # ) - # return SyftSuccess(message="Database reset successfully.") - @service_method( path="settings.get_server_config", name="get_server_config", diff --git a/packages/syft/src/syft/service/worker/image_registry_service.py b/packages/syft/src/syft/service/worker/image_registry_service.py index 83a30bb670b..f87e9818027 100644 --- a/packages/syft/src/syft/service/worker/image_registry_service.py +++ b/packages/syft/src/syft/service/worker/image_registry_service.py @@ -41,9 +41,10 @@ def add( except Exception as e: raise SyftException(public_message=f"Failed to create registry. {e}") - self.stash.set(context.credentials, registry).unwrap() + stored_registry = self.stash.set(context.credentials, registry).unwrap() return SyftSuccess( - message=f"Image Registry ID: {registry.id} created successfully" + message=f"Image Registry ID: {registry.id} created successfully", + value=stored_registry, ) @service_method( diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 64171c5f90b..d1698614a6f 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -251,6 +251,7 @@ def run_workers_in_threads( error = None worker_name = f"{pool_name}-{worker_count}" worker = SyftWorker( + id=UID.with_seed(worker_name), name=worker_name, status=WorkerStatus.RUNNING, worker_pool_name=pool_name, diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index ff8ae77c7a7..736480c91a1 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -1,6 +1,7 @@ # stdlib import datetime import json +import os from pathlib import Path # third party @@ -14,6 +15,14 @@ from syft.util.util import get_root_data_path from syft.util.util import is_interpreter_jupyter +# relative +from ...service.migration.object_migration_state import MigrationData +from ...service.response import SyftSuccess +from ...service.worker.worker_image import SyftWorkerImage +from ...service.worker.worker_pool import WorkerPool +from .worker_helpers import build_and_push_image +from .worker_helpers import prune_worker_pool_and_images + CHECKPOINT_ROOT = "checkpoints" CHECKPOINT_DIR_PREFIX = "chkpt" @@ -111,12 +120,19 @@ def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | return None +def get_registry_credentials() -> tuple[str, str]: + return os.environ.get("REGISTRY_USERNAME", ""), os.environ.get( + "REGISTRY_PASSWORD", "" + ) + + def load_from_checkpoint( client: SyftClient, prev_nb_filename: str | None = None, root_email: str | None = None, root_pwd: str | None = None, - reset_db: bool = False, + registry_username: str | None = None, + registry_password: str | None = None, ) -> None: """Load the last saved checkpoint for the given notebook state.""" if prev_nb_filename is None: @@ -139,13 +155,6 @@ def load_from_checkpoint( print(f"No last checkpoint found for notebook: {prev_nb_filename}") return - if reset_db: - print("Resetting the database before loading the checkpoint.") - result = root_client.settings.reset_database() - - if isinstance(result, SyftError): - raise SyftException(message=result.message) - print(f"Loading from checkpoint: {latest_checkpoint_dir}") result = root_client.load_migration_data( path=latest_checkpoint_dir / "migration.blob" @@ -155,3 +164,90 @@ def load_from_checkpoint( raise SyftException(message=result.message) print("Successfully loaded data from checkpoint.") + + migration_data = MigrationData.from_file(latest_checkpoint_dir / "migration.blob") + + # klass_for_migrate_data = [ + # WorkerPool.__canonical_name__, + # SyftWorkerImage.__canonical_name__, + # ] + + # pool_and_image_data = MigrationData( + # server_uid=migration_data.server_uid, + # signing_key=migration_data.signing_key, + # store_objects={ + # k: v + # for k, v in migration_data.store_objects.items() + # if k.__canonical_name__ in klass_for_migrate_data + # }, + # metadata={ + # k: v + # for k, v in migration_data.metadata.items() + # if k.__canonical_name__ not in klass_for_migrate_data + # }, + # action_objects=[], + # blob_storage_objects=[], + # blobs={}, + # ) + + prune_worker_pool_and_images(root_client) + + worker_images: list[SyftWorkerImage] = migration_data.get_items_by_canonical_name( + SyftWorkerImage.__canonical_name__ + ) + + # Overwrite the registry credentials if provided, else use the environment variables + env_registry_username, env_registry_password = get_registry_credentials() + registry_password = ( + registry_password if registry_password else env_registry_password + ) + registry_username = ( + registry_username if registry_username else env_registry_username + ) + + old_image_to_new_image_id_map = {} + for old_image in worker_images: + submit_result = root_client.api.services.worker_image.submit( + worker_config=old_image.config + ) + + assert isinstance(submit_result, SyftSuccess) + + new_image = submit_result.value + + old_image_to_new_image_id_map[old_image.id] = new_image.id + + if not new_image.is_prebuilt: + registry_uid = ( + old_image.image_identifier.registry.id + if old_image.image_identifier.registry + else None + ) + + # TODO: Later add prompt support for registry credentials + + build_and_push_image( + root_client, + new_image, + registry_uid=registry_uid, + tag=old_image.image_identifier.repo_with_tag, + reg_password=registry_username, + reg_username=registry_password, + ) + + worker_pools: list[WorkerPool] = migration_data.get_items_by_canonical_name( + WorkerPool.__canonical_name__ + ) + + for old_pool in worker_pools: + new_image_uid = old_image_to_new_image_id_map[old_image.id] + + root_client.worker_pools.launch( + pool_name=old_pool.name, + image_uid=new_image_uid, + num_workers=old_pool.max_count, + registry_username=registry_username, + registry_password=registry_username, + ) + + print("Successfully loaded worker pool data from checkpoint.") diff --git a/packages/syft/src/syft/util/test_helpers/worker_helpers.py b/packages/syft/src/syft/util/test_helpers/worker_helpers.py index 3c2667fecc8..ea3e027f1ae 100644 --- a/packages/syft/src/syft/util/test_helpers/worker_helpers.py +++ b/packages/syft/src/syft/util/test_helpers/worker_helpers.py @@ -1,6 +1,12 @@ # syft absolute import syft as sy +# relative +from ...client.client import SyftClient +from ...service.response import SyftSuccess +from ...service.worker.worker_image import SyftWorkerImage +from ...types.uid import UID + def build_and_launch_worker_pool_from_docker_str( environment: str, @@ -84,3 +90,38 @@ def launch_worker_pool_from_docker_tag_and_registry( print(result) return launch_result + + +def prune_worker_pool_and_images(client: SyftClient) -> None: + for pool in client.worker_pools.get_all(): + client.worker_pools.delete(pool.id) + + for image in client.images.get_all(): + client.images.remove(image.id) + + +def build_and_push_image( + client: SyftClient, + image: SyftWorkerImage, + tag: str, + registry_uid: UID | None = None, + reg_username: str | None = None, + reg_password: str | None = None, +) -> None: + """Build and push the image to the given registry.""" + if image.is_prebuilt: + return + + build_result = client.api.services.worker_image.build( + image_uid=image.id, registry_uid=registry_uid, tag=tag + ) + print(build_result.message) + + if isinstance(build_result, SyftSuccess): + push_result = client.api.services.worker_image.push( + image.id, + username=reg_username, + password=reg_password, + ) + assert isinstance(push_result, SyftSuccess) + print(push_result.message) From 859a6024ab8bb98068b1b885c4743ba36ef00b73 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 25 Sep 2024 16:43:41 +0530 Subject: [PATCH 08/30] fix argument name in load_checkpoint --- packages/syft/src/syft/util/test_helpers/checkpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 736480c91a1..bca17646c77 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -157,7 +157,7 @@ def load_from_checkpoint( print(f"Loading from checkpoint: {latest_checkpoint_dir}") result = root_client.load_migration_data( - path=latest_checkpoint_dir / "migration.blob" + path_or_data=latest_checkpoint_dir / "migration.blob" ) if isinstance(result, SyftError): From c7a24e03321e944ccded21369131f42678f12abc Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 25 Sep 2024 17:33:08 +0530 Subject: [PATCH 09/30] fix checkpoint path if not running from notebook --- packages/syft/src/syft/util/test_helpers/checkpoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index bca17646c77..2c5ece68476 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -31,7 +31,7 @@ def current_nbname() -> Path: """Retrieve the current Jupyter notebook name.""" curr_kernel_file = Path(ipykernel.get_connection_file()) kernel_file = json.loads(curr_kernel_file.read_text()) - nb_name = kernel_file["jupyter_session"] + nb_name = kernel_file.get("jupyter_session", "") return Path(nb_name) @@ -43,7 +43,7 @@ def checkpoint_parent_dir(server_uid: str, nb_name: str | None = None) -> Path: """Return the checkpoint directory for the current notebook and server.""" if is_interpreter_jupyter: nb_name = nb_name if nb_name else current_nbname().stem - return Path(f"{nb_name}/{server_uid}") + return Path(f"{nb_name}/{server_uid}") if nb_name else Path(server_uid) return Path(server_uid) From ed74096cbc33b9126b4422fb3d3cae97d68b9df2 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 26 Sep 2024 08:56:03 +0530 Subject: [PATCH 10/30] extract notebook name from pytest environment if running in pytest - fix imports in 0.9.1_helpers --- .../upgradability/0.9.1_helpers/apis/live/schema.py | 4 +++- .../0.9.1_helpers/apis/live/test_query.py | 4 +++- .../syft/src/syft/util/test_helpers/checkpoint.py | 12 ++++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/schema.py b/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/schema.py index 03012c7d0cf..5b39d9d9066 100644 --- a/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/schema.py +++ b/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/schema.py @@ -4,7 +4,9 @@ # syft absolute import syft as sy from syft import test_settings -from syft.rate_limiter import is_within_rate_limit + +# relative +from ..rate_limiter import is_within_rate_limit def make_schema(settings: dict, worker_pool: str) -> Callable: diff --git a/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/test_query.py b/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/test_query.py index ccd3c75b599..344879dcb62 100644 --- a/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/test_query.py +++ b/notebooks/scenarios/bigquery/upgradability/0.9.1_helpers/apis/live/test_query.py @@ -4,7 +4,9 @@ # syft absolute import syft as sy from syft import test_settings -from syft.rate_limiter import is_within_rate_limit + +# relative +from ..rate_limiter import is_within_rate_limit def make_test_query(settings) -> Callable: diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 2c5ece68476..1e9cdf13ec1 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -27,11 +27,23 @@ CHECKPOINT_DIR_PREFIX = "chkpt" +def get_notebook_name_from_pytest_env() -> str | None: + """ + Returns the notebook file name from the test environment variable 'PYTEST_CURRENT_TEST'. + If not available, returns None. + """ + pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST", "") + # Split by "::" and return the first part, which is the file path + return os.path.basename(pytest_current_test.split("::")[0]) + + def current_nbname() -> Path: """Retrieve the current Jupyter notebook name.""" curr_kernel_file = Path(ipykernel.get_connection_file()) kernel_file = json.loads(curr_kernel_file.read_text()) nb_name = kernel_file.get("jupyter_session", "") + if not nb_name: + nb_name = get_notebook_name_from_pytest_env() return Path(nb_name) From c7e12ec1bc97b8d26cd23c33ec50a688bdf8f69d Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Thu, 26 Sep 2024 11:23:39 +0530 Subject: [PATCH 11/30] add ability to reset and load migrated data with custom worker pools --- .../syft/src/syft/client/datasite_client.py | 13 ++- .../service/migration/migration_service.py | 14 +-- .../service/worker/worker_image_service.py | 2 + .../service/worker/worker_pool_service.py | 40 ++++++- .../src/syft/util/test_helpers/checkpoint.py | 107 ++++-------------- .../syft/util/test_helpers/worker_helpers.py | 3 +- 6 files changed, 78 insertions(+), 101 deletions(-) diff --git a/packages/syft/src/syft/client/datasite_client.py b/packages/syft/src/syft/client/datasite_client.py index 0129b4a17ac..b305303911e 100644 --- a/packages/syft/src/syft/client/datasite_client.py +++ b/packages/syft/src/syft/client/datasite_client.py @@ -417,7 +417,9 @@ def get_migration_data(self, include_blobs: bool = True) -> MigrationData: return res def load_migration_data( - self, path_or_data: str | Path | MigrationData + self, + path_or_data: str | Path | MigrationData, + include_worker_pools: bool = False, ) -> SyftSuccess: if isinstance(path_or_data, MigrationData): migration_data = path_or_data @@ -437,7 +439,7 @@ def load_migration_data( public_message="Root verify key in migration data does not match this client's verify key" ) - if migration_data.includes_custom_workerpools: + if migration_data.includes_custom_workerpools and not include_worker_pools: prompt_warning_message( "This migration data includes custom workers, " "which need to be migrated separately with `sy.upgrade_custom_workerpools` " @@ -445,9 +447,12 @@ def load_migration_data( ) migration_data.migrate_and_upload_blobs() + migration_data = migration_data.copy_without_blobs() + + if not include_worker_pools: + migration_data = migration_data.copy_without_workerpools() - migration_data = migration_data.copy_without_workerpools().copy_without_blobs() - return self.api.services.migration.apply_migration_data(migration_data) + return self.api.services.migration.reset_and_restore(migration_data) def dump_state(self, path: str | Path) -> None: if isinstance(path, str): diff --git a/packages/syft/src/syft/service/migration/migration_service.py b/packages/syft/src/syft/service/migration/migration_service.py index 76d16f42565..96ac85f5496 100644 --- a/packages/syft/src/syft/service/migration/migration_service.py +++ b/packages/syft/src/syft/service/migration/migration_service.py @@ -492,13 +492,13 @@ def apply_migration_data( self._update_store_metadata(context, migration_data.metadata).unwrap() return SyftSuccess(message="Migration completed successfully") - # @service_method( - # path="migration.reset_and_restore", - # name="reset_and_restore", - # roles=ADMIN_ROLE_LEVEL, - # unwrap_on_success=False, - # ) - def reset_and_migrate( + @service_method( + path="migration.reset_and_restore", + name="reset_and_restore", + roles=ADMIN_ROLE_LEVEL, + unwrap_on_success=False, + ) + def reset_and_restore( self, context: AuthedServiceContext, migration_data: MigrationData, diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index a5f05f94dac..277ad4e158e 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -89,6 +89,7 @@ def build( tag: str, registry_uid: UID | None = None, pull_image: bool = True, + force_build: bool = False, ) -> SyftSuccess: registry: SyftImageRegistry | None = None @@ -122,6 +123,7 @@ def build( and worker_image.image_identifier and worker_image.image_identifier.full_name_with_tag == image_identifier.full_name_with_tag + and not force_build ): raise SyftException( public_message=f"Image ID: {image_uid} is already built" diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 55b103ba369..3918f277b2e 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -585,6 +585,32 @@ def delete( uid = worker_pool.id + self.purge_worker_pool(context=context, pool_id=pool_id, pool_name=pool_name) + + self.stash.delete_by_uid(credentials=context.credentials, uid=uid).unwrap( + public_message=f"Failed to delete WorkerPool: {worker_pool.name} from stash" + ) + + return SyftSuccess(message=f"Successfully deleted worker pool with id {uid}") + + @service_method( + path="worker_pool.purge_worker_pool", + name="purge_worker_pool", + roles=DATA_OWNER_ROLE_LEVEL, + unwrap_on_success=False, + ) + def purge_worker_pool( + self, + context: AuthedServiceContext, + pool_id: UID | None = None, + pool_name: str | None = None, + ) -> SyftSuccess: + worker_pool = self._get_worker_pool( + context, pool_id=pool_id, pool_name=pool_name + ).unwrap(public_message=f"Failed to get WorkerPool: {pool_id or pool_name}") + + uid = worker_pool.id + # relative from ..queue.queue_stash import Status @@ -632,11 +658,19 @@ def delete( context=context, uid=id_, force=True ) - self.stash.delete_by_uid(credentials=context.credentials, uid=uid).unwrap( - public_message=f"Failed to delete WorkerPool: {worker_pool.name} from stash" + worker_pool.max_count = 0 + worker_pool.worker_list = [] + self.stash.update( + credentials=context.credentials, + obj=worker_pool, + ).unwrap( + public_message=( + f"Pool {worker_pool.name} was purged, " + f"but failed to update the stash" + ) ) - return SyftSuccess(message=f"Successfully deleted worker pool with id {uid}") + return SyftSuccess(message=f"Successfully Purged worker pool with id {uid}") @as_result(StashException, SyftException) def _get_worker_pool( diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 1e9cdf13ec1..17aacd37872 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -16,12 +16,7 @@ from syft.util.util import is_interpreter_jupyter # relative -from ...service.migration.object_migration_state import MigrationData -from ...service.response import SyftSuccess -from ...service.worker.worker_image import SyftWorkerImage -from ...service.worker.worker_pool import WorkerPool from .worker_helpers import build_and_push_image -from .worker_helpers import prune_worker_pool_and_images CHECKPOINT_ROOT = "checkpoints" CHECKPOINT_DIR_PREFIX = "chkpt" @@ -169,7 +164,7 @@ def load_from_checkpoint( print(f"Loading from checkpoint: {latest_checkpoint_dir}") result = root_client.load_migration_data( - path_or_data=latest_checkpoint_dir / "migration.blob" + path_or_data=latest_checkpoint_dir / "migration.blob", include_worker_pools=True ) if isinstance(result, SyftError): @@ -177,89 +172,29 @@ def load_from_checkpoint( print("Successfully loaded data from checkpoint.") - migration_data = MigrationData.from_file(latest_checkpoint_dir / "migration.blob") - - # klass_for_migrate_data = [ - # WorkerPool.__canonical_name__, - # SyftWorkerImage.__canonical_name__, - # ] - - # pool_and_image_data = MigrationData( - # server_uid=migration_data.server_uid, - # signing_key=migration_data.signing_key, - # store_objects={ - # k: v - # for k, v in migration_data.store_objects.items() - # if k.__canonical_name__ in klass_for_migrate_data - # }, - # metadata={ - # k: v - # for k, v in migration_data.metadata.items() - # if k.__canonical_name__ not in klass_for_migrate_data - # }, - # action_objects=[], - # blob_storage_objects=[], - # blobs={}, - # ) - - prune_worker_pool_and_images(root_client) - - worker_images: list[SyftWorkerImage] = migration_data.get_items_by_canonical_name( - SyftWorkerImage.__canonical_name__ - ) - - # Overwrite the registry credentials if provided, else use the environment variables - env_registry_username, env_registry_password = get_registry_credentials() - registry_password = ( - registry_password if registry_password else env_registry_password - ) - registry_username = ( - registry_username if registry_username else env_registry_username - ) - - old_image_to_new_image_id_map = {} - for old_image in worker_images: - submit_result = root_client.api.services.worker_image.submit( - worker_config=old_image.config + # Step 1: Build and push the worker images + for worker_image in root_client.worker_images: + build_and_push_image( + root_client, + worker_image, + registry_uid=worker_image.image_identifier.registry.id, + tag=worker_image.image_identifier.repo_with_tag, + reg_password=registry_username, + reg_username=registry_password, + force_build=True, ) - assert isinstance(submit_result, SyftSuccess) - - new_image = submit_result.value - - old_image_to_new_image_id_map[old_image.id] = new_image.id - - if not new_image.is_prebuilt: - registry_uid = ( - old_image.image_identifier.registry.id - if old_image.image_identifier.registry - else None - ) - - # TODO: Later add prompt support for registry credentials - - build_and_push_image( - root_client, - new_image, - registry_uid=registry_uid, - tag=old_image.image_identifier.repo_with_tag, - reg_password=registry_username, - reg_username=registry_password, - ) - - worker_pools: list[WorkerPool] = migration_data.get_items_by_canonical_name( - WorkerPool.__canonical_name__ - ) - - for old_pool in worker_pools: - new_image_uid = old_image_to_new_image_id_map[old_image.id] - - root_client.worker_pools.launch( - pool_name=old_pool.name, - image_uid=new_image_uid, - num_workers=old_pool.max_count, + # Step 2: Recreate the worker pools + for worker_pool in root_client.worker_pools: + previous_worker_cnt = worker_pool.max_count + purge_res = root_client.worker_pools.purge_worker_pool(pool_id=worker_pool.id) + print(purge_res) + add_res = root_client.worker_pools.add_workers( + number=previous_worker_cnt, + pool_id=worker_pool.id, registry_username=registry_username, - registry_password=registry_username, + registry_password=registry_password, ) + print(add_res) print("Successfully loaded worker pool data from checkpoint.") diff --git a/packages/syft/src/syft/util/test_helpers/worker_helpers.py b/packages/syft/src/syft/util/test_helpers/worker_helpers.py index ea3e027f1ae..9bb7252dbcc 100644 --- a/packages/syft/src/syft/util/test_helpers/worker_helpers.py +++ b/packages/syft/src/syft/util/test_helpers/worker_helpers.py @@ -107,13 +107,14 @@ def build_and_push_image( registry_uid: UID | None = None, reg_username: str | None = None, reg_password: str | None = None, + force_build: bool = False, ) -> None: """Build and push the image to the given registry.""" if image.is_prebuilt: return build_result = client.api.services.worker_image.build( - image_uid=image.id, registry_uid=registry_uid, tag=tag + image_uid=image.id, registry_uid=registry_uid, tag=tag, force_build=force_build ) print(build_result.message) From 8aa464ad02b9916c3c84bf1125ee776d15426472 Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Thu, 26 Sep 2024 11:36:41 +0530 Subject: [PATCH 12/30] fix worker pool list --- .../syft/src/syft/util/test_helpers/checkpoint.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 17aacd37872..5c7e8106e36 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -173,7 +173,10 @@ def load_from_checkpoint( print("Successfully loaded data from checkpoint.") # Step 1: Build and push the worker images - for worker_image in root_client.worker_images: + worker_image_list = ( + [] if root_client.worker_images is None else root_client.worker_images + ) + for worker_image in worker_image_list: build_and_push_image( root_client, worker_image, @@ -184,8 +187,13 @@ def load_from_checkpoint( force_build=True, ) + print("Successfully Built worker image data from checkpoint.") + # Step 2: Recreate the worker pools - for worker_pool in root_client.worker_pools: + worker_pool_list = ( + [] if root_client.worker_pools is None else root_client.worker_pools + ) + for worker_pool in worker_pool_list: previous_worker_cnt = worker_pool.max_count purge_res = root_client.worker_pools.purge_worker_pool(pool_id=worker_pool.id) print(purge_res) From a43ef8a1368c7e4ca650f5bd717fbac9ef023efc Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Thu, 26 Sep 2024 13:06:26 +0530 Subject: [PATCH 13/30] Change env imports paths in grid/server.py modified purge_worker_pool to purge_workers Co-authored-by: Shubham Gupta --- packages/grid/backend/grid/core/server.py | 12 ++++++------ .../src/syft/service/worker/worker_pool_service.py | 8 ++++---- .../syft/src/syft/util/test_helpers/checkpoint.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/grid/backend/grid/core/server.py b/packages/grid/backend/grid/core/server.py index c6e4568afe7..fa674d94dcb 100644 --- a/packages/grid/backend/grid/core/server.py +++ b/packages/grid/backend/grid/core/server.py @@ -6,13 +6,13 @@ from syft.server.datasite import Datasite from syft.server.datasite import Server from syft.server.enclave import Enclave +from syft.server.env import get_default_bucket_name +from syft.server.env import get_enable_warnings +from syft.server.env import get_server_name +from syft.server.env import get_server_side_type +from syft.server.env import get_server_type +from syft.server.env import get_server_uid_env from syft.server.gateway import Gateway -from syft.server.server import get_default_bucket_name -from syft.server.server import get_enable_warnings -from syft.server.server import get_server_name -from syft.server.server import get_server_side_type -from syft.server.server import get_server_type -from syft.server.server import get_server_uid_env from syft.service.queue.zmq_client import ZMQClientConfig from syft.service.queue.zmq_client import ZMQQueueConfig from syft.store.blob_storage.seaweedfs import SeaweedFSClientConfig diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 3918f277b2e..b77baecb056 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -585,7 +585,7 @@ def delete( uid = worker_pool.id - self.purge_worker_pool(context=context, pool_id=pool_id, pool_name=pool_name) + self.purge_workers(context=context, pool_id=pool_id, pool_name=pool_name) self.stash.delete_by_uid(credentials=context.credentials, uid=uid).unwrap( public_message=f"Failed to delete WorkerPool: {worker_pool.name} from stash" @@ -594,12 +594,12 @@ def delete( return SyftSuccess(message=f"Successfully deleted worker pool with id {uid}") @service_method( - path="worker_pool.purge_worker_pool", - name="purge_worker_pool", + path="worker_pool.purge_workers", + name="purge_workers", roles=DATA_OWNER_ROLE_LEVEL, unwrap_on_success=False, ) - def purge_worker_pool( + def purge_workers( self, context: AuthedServiceContext, pool_id: UID | None = None, diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 5c7e8106e36..70b0521eeea 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -195,7 +195,7 @@ def load_from_checkpoint( ) for worker_pool in worker_pool_list: previous_worker_cnt = worker_pool.max_count - purge_res = root_client.worker_pools.purge_worker_pool(pool_id=worker_pool.id) + purge_res = root_client.worker_pools.purge_workers(pool_id=worker_pool.id) print(purge_res) add_res = root_client.worker_pools.add_workers( number=previous_worker_cnt, From 580805f2809c4f084be6422896a5e3c1ede0672a Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Thu, 26 Sep 2024 13:10:12 +0530 Subject: [PATCH 14/30] Fix load_migration_data to separate out reset db Co-authored-by: Shubham Gupta --- packages/syft/src/syft/client/datasite_client.py | 6 +++++- packages/syft/src/syft/util/test_helpers/checkpoint.py | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/client/datasite_client.py b/packages/syft/src/syft/client/datasite_client.py index b305303911e..b88a7081299 100644 --- a/packages/syft/src/syft/client/datasite_client.py +++ b/packages/syft/src/syft/client/datasite_client.py @@ -420,6 +420,7 @@ def load_migration_data( self, path_or_data: str | Path | MigrationData, include_worker_pools: bool = False, + with_reset_db: bool = False, ) -> SyftSuccess: if isinstance(path_or_data, MigrationData): migration_data = path_or_data @@ -452,7 +453,10 @@ def load_migration_data( if not include_worker_pools: migration_data = migration_data.copy_without_workerpools() - return self.api.services.migration.reset_and_restore(migration_data) + if with_reset_db: + return self.api.services.migration.reset_and_restore(migration_data) + else: + return self.api.services.migration.apply_migration_data(migration_data) def dump_state(self, path: str | Path) -> None: if isinstance(path, str): diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 70b0521eeea..a2a67d1aaee 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -164,7 +164,9 @@ def load_from_checkpoint( print(f"Loading from checkpoint: {latest_checkpoint_dir}") result = root_client.load_migration_data( - path_or_data=latest_checkpoint_dir / "migration.blob", include_worker_pools=True + path_or_data=latest_checkpoint_dir / "migration.blob", + include_worker_pools=True, + with_reset_db=True, ) if isinstance(result, SyftError): From 0bed0a1e9db8ede7f7a0d775c3a6b07fc0d1a25b Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 26 Sep 2024 13:33:19 +0530 Subject: [PATCH 15/30] pass root email and pwd to load checkpoint in scenario notebooks --- .../001-scale-delete-worker-pools.ipynb | 67 ++++++++++++------- .../src/syft/util/test_helpers/checkpoint.py | 6 +- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 6a1482e3677..91edb30918e 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -22,6 +22,7 @@ "source": [ "# stdlib\n", "import os\n", + "from os import environ as env\n", "\n", "# syft absolute\n", "import syft as sy\n", @@ -47,9 +48,21 @@ ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": null, "id": "3", "metadata": {}, + "outputs": [], + "source": [ + "# in case we are not in k8s we set them here for orchestra to use\n", + "env[\"DEFAULT_ROOT_EMAIL\"] = ROOT_EMAIL\n", + "env[\"DEFAULT_ROOT_PASSWORD\"] = ROOT_PASSWORD" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, "source": [ "### Launch server & login" ] @@ -57,7 +70,7 @@ { "cell_type": "code", "execution_count": null, - "id": "4", + "id": "5", "metadata": {}, "outputs": [], "source": [ @@ -76,20 +89,22 @@ { "cell_type": "code", "execution_count": null, - "id": "5", + "id": "6", "metadata": {}, "outputs": [], "source": [ "load_from_checkpoint(\n", " prev_nb_filename=\"000-start-and-configure-server-and-admins\",\n", " client=server.client,\n", + " root_email=ROOT_EMAIL,\n", + " root_password=ROOT_PASSWORD,\n", ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "6", + "id": "7", "metadata": {}, "outputs": [], "source": [ @@ -99,7 +114,7 @@ { "cell_type": "code", "execution_count": null, - "id": "7", + "id": "8", "metadata": {}, "outputs": [], "source": [ @@ -111,7 +126,7 @@ { "cell_type": "code", "execution_count": null, - "id": "8", + "id": "9", "metadata": {}, "outputs": [], "source": [ @@ -121,7 +136,7 @@ { "cell_type": "code", "execution_count": null, - "id": "9", + "id": "10", "metadata": {}, "outputs": [], "source": [ @@ -131,7 +146,7 @@ }, { "cell_type": "markdown", - "id": "10", + "id": "11", "metadata": {}, "source": [ "### Scale Worker pool" @@ -139,7 +154,7 @@ }, { "cell_type": "markdown", - "id": "11", + "id": "12", "metadata": {}, "source": [ "##### Scale up" @@ -148,7 +163,7 @@ { "cell_type": "code", "execution_count": null, - "id": "12", + "id": "13", "metadata": {}, "outputs": [], "source": [ @@ -162,7 +177,7 @@ { "cell_type": "code", "execution_count": null, - "id": "13", + "id": "14", "metadata": {}, "outputs": [], "source": [ @@ -172,7 +187,7 @@ { "cell_type": "code", "execution_count": null, - "id": "14", + "id": "15", "metadata": {}, "outputs": [], "source": [ @@ -192,7 +207,7 @@ }, { "cell_type": "markdown", - "id": "15", + "id": "16", "metadata": {}, "source": [ "##### Scale down" @@ -201,7 +216,7 @@ { "cell_type": "code", "execution_count": null, - "id": "16", + "id": "17", "metadata": {}, "outputs": [], "source": [ @@ -216,7 +231,7 @@ { "cell_type": "code", "execution_count": null, - "id": "17", + "id": "18", "metadata": {}, "outputs": [], "source": [ @@ -235,7 +250,7 @@ { "cell_type": "code", "execution_count": null, - "id": "18", + "id": "19", "metadata": {}, "outputs": [], "source": [ @@ -248,7 +263,7 @@ }, { "cell_type": "markdown", - "id": "19", + "id": "20", "metadata": {}, "source": [ "#### Delete Worker Pool" @@ -257,7 +272,7 @@ { "cell_type": "code", "execution_count": null, - "id": "20", + "id": "21", "metadata": {}, "outputs": [], "source": [ @@ -270,7 +285,7 @@ { "cell_type": "code", "execution_count": null, - "id": "21", + "id": "22", "metadata": {}, "outputs": [], "source": [ @@ -280,7 +295,7 @@ }, { "cell_type": "markdown", - "id": "22", + "id": "23", "metadata": {}, "source": [ "#### Re-launch the default worker pool" @@ -289,7 +304,7 @@ { "cell_type": "code", "execution_count": null, - "id": "23", + "id": "24", "metadata": {}, "outputs": [], "source": [ @@ -299,7 +314,7 @@ { "cell_type": "code", "execution_count": null, - "id": "24", + "id": "25", "metadata": {}, "outputs": [], "source": [ @@ -313,7 +328,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -327,7 +342,7 @@ { "cell_type": "code", "execution_count": null, - "id": "26", + "id": "27", "metadata": {}, "outputs": [], "source": [ @@ -337,7 +352,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "28", "metadata": {}, "outputs": [], "source": [ @@ -347,7 +362,7 @@ { "cell_type": "code", "execution_count": null, - "id": "28", + "id": "29", "metadata": {}, "outputs": [], "source": [] diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index a2a67d1aaee..03c843171f0 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -137,7 +137,7 @@ def load_from_checkpoint( client: SyftClient, prev_nb_filename: str | None = None, root_email: str | None = None, - root_pwd: str | None = None, + root_password: str | None = None, registry_username: str | None = None, registry_password: str | None = None, ) -> None: @@ -147,12 +147,12 @@ def load_from_checkpoint( prev_nb_filename = current_nbname().stem root_email = "info@openmined.org" if root_email is None else root_email - root_pwd = "changethis" if root_pwd is None else root_pwd + root_password = "changethis" if root_password is None else root_password root_client = ( client if is_admin(client) - else client.login(email=root_email, password=root_pwd) + else client.login(email=root_email, password=root_password) ) latest_checkpoint_dir = last_checkpoint_path_for_nb( client.id.to_string(), prev_nb_filename From b81affba82f6353a51d05719976db5de4926abcf Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 26 Sep 2024 13:49:46 +0530 Subject: [PATCH 16/30] fix security bugs --- packages/syft/src/syft/server/env.py | 2 +- .../syft/src/syft/util/test_helpers/checkpoint.py | 13 +++++++++++-- .../src/syft/util/test_helpers/worker_helpers.py | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/server/env.py b/packages/syft/src/syft/server/env.py index 28ba186c974..c101f05bad1 100644 --- a/packages/syft/src/syft/server/env.py +++ b/packages/syft/src/syft/server/env.py @@ -1,6 +1,6 @@ # stdlib import json -import subprocess +import subprocess # nosec import sys # relative diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 03c843171f0..7e975c8a421 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -16,6 +16,8 @@ from syft.util.util import is_interpreter_jupyter # relative +from ...server.env import get_default_root_email +from ...server.env import get_default_root_password from .worker_helpers import build_and_push_image CHECKPOINT_ROOT = "checkpoints" @@ -80,10 +82,17 @@ def is_admin(client: SyftClient) -> bool: def create_checkpoint( client: SyftClient, - root_email: str = "info@openmined.org", - root_pwd: str = "changethis", + root_email: str | None = None, + root_pwd: str | None = None, ) -> None: """Save a checkpoint for the database.""" + + if root_email is None: + root_email = get_default_root_email() + + if root_pwd is None: + root_pwd = get_default_root_password() + root_client = ( client if is_admin(client) diff --git a/packages/syft/src/syft/util/test_helpers/worker_helpers.py b/packages/syft/src/syft/util/test_helpers/worker_helpers.py index 9bb7252dbcc..7a9d2f18842 100644 --- a/packages/syft/src/syft/util/test_helpers/worker_helpers.py +++ b/packages/syft/src/syft/util/test_helpers/worker_helpers.py @@ -124,5 +124,5 @@ def build_and_push_image( username=reg_username, password=reg_password, ) - assert isinstance(push_result, SyftSuccess) + assert isinstance(push_result, SyftSuccess) # nosec: B101 print(push_result.message) From 44751d50f0f188e6f456477c79f9adc13e9434f5 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 26 Sep 2024 22:17:33 +0530 Subject: [PATCH 17/30] fix uid mismatch when external uid is passed via values.yml in k8s --- packages/grid/backend/grid/bootstrap.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/grid/backend/grid/bootstrap.py b/packages/grid/backend/grid/bootstrap.py index 914411ec864..3da7eb9b600 100644 --- a/packages/grid/backend/grid/bootstrap.py +++ b/packages/grid/backend/grid/bootstrap.py @@ -121,9 +121,9 @@ def get_credential( # supplying a different key means something has gone wrong so raise Exception if ( - file_credential != env_credential - and file_credential is not None + file_credential is not None and env_credential is not None + and validation_func(file_credential) != validation_func(env_credential) ): raise Exception(f"{key} from ENV must match {key} in {CREDENTIALS_PATH}") From 79c22711dc7e45dda2cf7730e9908648125bf853 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 27 Sep 2024 13:34:54 +0530 Subject: [PATCH 18/30] update tags to latest in devspace --- packages/grid/devspace.yaml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/grid/devspace.yaml b/packages/grid/devspace.yaml index bf796f8bc42..5bde585ddb0 100644 --- a/packages/grid/devspace.yaml +++ b/packages/grid/devspace.yaml @@ -42,6 +42,7 @@ images: target: "backend" context: ../ tags: + - dev-latest - dev-${DEVSPACE_TIMESTAMP} frontend: image: "${CONTAINER_REGISTRY}/${DOCKER_IMAGE_FRONTEND}" @@ -51,6 +52,7 @@ images: target: "syft-ui-production" context: ./frontend tags: + - dev-latest - dev-${DEVSPACE_TIMESTAMP} seaweedfs: image: "${CONTAINER_REGISTRY}/${DOCKER_IMAGE_SEAWEEDFS}" @@ -59,6 +61,7 @@ images: dockerfile: ./seaweedfs/seaweedfs.dockerfile context: ./seaweedfs tags: + - dev-latest - dev-${DEVSPACE_TIMESTAMP} # This is a list of `deployments` that DevSpace can create for this project @@ -72,7 +75,7 @@ deployments: values: global: registry: ${CONTAINER_REGISTRY} - version: dev-${DEVSPACE_TIMESTAMP} + version: dev-latest # anything that does not need templating should go in helm/examples/dev/base.yaml # or profile specific values files valuesFiles: @@ -164,6 +167,7 @@ profiles: dockerfile: ./rathole/rathole.dockerfile context: ./rathole tags: + - dev-latest - dev-${DEVSPACE_TIMESTAMP} # use rathole client-specific chart values - op: add @@ -184,6 +188,7 @@ profiles: dockerfile: ./rathole/rathole.dockerfile context: ./rathole tags: + - dev-latest - dev-${DEVSPACE_TIMESTAMP} # enable rathole `devspace dev` config - op: add @@ -256,6 +261,7 @@ profiles: dockerfile: ./enclave/attestation/attestation.dockerfile context: ./enclave/attestation tags: + - dev-latest - dev-${DEVSPACE_TIMESTAMP} - op: add path: dev.backend.containers From 43a70ad77a2706d3457efa63782513266eccaa00 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 27 Sep 2024 21:00:43 +0530 Subject: [PATCH 19/30] fix worker image.get_all API not returning all images - add check for pool exists in Kubernetes before scaling it down - fix images.get_all in load checkpoint to rebuilt images --- .../src/syft/service/worker/worker_image_service.py | 13 +------------ .../src/syft/service/worker/worker_pool_service.py | 5 +++-- .../syft/src/syft/util/test_helpers/checkpoint.py | 9 +++++++-- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index 277ad4e158e..a2bc64bb204 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -194,18 +194,7 @@ def get_all(self, context: AuthedServiceContext) -> DictTuple[str, SyftWorkerIma One image one docker file for now """ images = self.stash.get_all(credentials=context.credentials).unwrap() - - res = {} - # if image is built, index it by full_name_with_tag - for im in images: - if im.is_built and im.image_identifier is not None: - res[im.image_identifier.full_name_with_tag] = im - # and then index all images by id - # TODO: jupyter repr needs to be updated to show unique values - # (even if multiple keys point to same value) - res.update({im.id.to_string(): im for im in images if not im.is_built}) - - return DictTuple(res) + return DictTuple({image.id: image for image in images}) @service_method( path="worker_image.remove", diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index b77baecb056..3db794a2834 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -640,9 +640,10 @@ def purge_workers( if IN_KUBERNETES: # Scale the workers to zero - self.scale(context=context, number=0, pool_id=uid) runner = KubernetesRunner() - runner.delete_pool(pool_name=worker_pool.name) + if runner.exists(worker_pool.name): + self.scale(context=context, number=0, pool_id=uid) + runner.delete_pool(pool_name=worker_pool.name) else: workers = ( worker.resolve_with_context(context=context).unwrap() diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 7e975c8a421..87dc4165026 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -185,13 +185,18 @@ def load_from_checkpoint( # Step 1: Build and push the worker images worker_image_list = ( - [] if root_client.worker_images is None else root_client.worker_images + [] if root_client.images.get_all() is None else root_client.images.get_all() ) for worker_image in worker_image_list: + if worker_image.is_prebuilt: + continue + + registry = worker_image.image_identifier.registry + build_and_push_image( root_client, worker_image, - registry_uid=worker_image.image_identifier.registry.id, + registry_uid=registry.id if registry else None, tag=worker_image.image_identifier.repo_with_tag, reg_password=registry_username, reg_username=registry_password, From 17327d8a405fcbd4c37d104932a6d976bf6b8591 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 27 Sep 2024 21:15:16 +0530 Subject: [PATCH 20/30] add few more prints --- packages/syft/src/syft/util/test_helpers/checkpoint.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 87dc4165026..fc463509d3f 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -184,6 +184,8 @@ def load_from_checkpoint( print("Successfully loaded data from checkpoint.") # Step 1: Build and push the worker images + + print("Recreating worker images from checkpoint.") worker_image_list = ( [] if root_client.images.get_all() is None else root_client.images.get_all() ) @@ -206,6 +208,7 @@ def load_from_checkpoint( print("Successfully Built worker image data from checkpoint.") # Step 2: Recreate the worker pools + print("Recreating worker pools from checkpoint.") worker_pool_list = ( [] if root_client.worker_pools is None else root_client.worker_pools ) From 69de9a7f2d098eefbea1b077a74e588b72918036 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 27 Sep 2024 21:34:01 +0530 Subject: [PATCH 21/30] fix lint --- notebooks/api/0.8/11-container-images-k8s.ipynb | 10 +++++++--- .../src/syft/service/worker/worker_image_service.py | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/notebooks/api/0.8/11-container-images-k8s.ipynb b/notebooks/api/0.8/11-container-images-k8s.ipynb index ef21f427a26..63eee600646 100644 --- a/notebooks/api/0.8/11-container-images-k8s.ipynb +++ b/notebooks/api/0.8/11-container-images-k8s.ipynb @@ -493,8 +493,7 @@ "assert workerimage is not None, str([image.__dict__ for image in image_list])\n", "assert workerimage.is_built is not None, str(workerimage)\n", "assert workerimage.built_at is not None, str(workerimage)\n", - "assert workerimage.image_hash is not None, str(workerimage)\n", - "assert image_list[workerimage.built_image_tag] == workerimage" + "assert workerimage.image_hash is not None, str(workerimage)" ] }, { @@ -1394,6 +1393,11 @@ } ], "metadata": { + "kernelspec": { + "display_name": "syft-3.11", + "language": "python", + "name": "python3" + }, "language_info": { "codemirror_mode": { "name": "ipython", @@ -1404,7 +1408,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.5" + "version": "3.11.5" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index a2bc64bb204..dd0795cf8df 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -194,7 +194,7 @@ def get_all(self, context: AuthedServiceContext) -> DictTuple[str, SyftWorkerIma One image one docker file for now """ images = self.stash.get_all(credentials=context.credentials).unwrap() - return DictTuple({image.id: image for image in images}) + return DictTuple({image.id.to_string(): image for image in images}) @service_method( path="worker_image.remove", From 97c5cd120b4ded1cc8ee58f98eaabf8e3ecefae5 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 27 Sep 2024 22:15:13 +0530 Subject: [PATCH 22/30] fix pytest --- .../syft/tests/syft/worker_pool/worker_pool_service_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py b/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py index f3306ab4fd4..a14cdef3f8b 100644 --- a/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py +++ b/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py @@ -30,7 +30,7 @@ 2, # total number of images. # 2 since we pull a pre-built image (1) as the base image to build a custom image (2) ), - (None, PrebuiltWorkerConfig(tag=PREBUILT_IMAGE_TAG), 1), + (None, PrebuiltWorkerConfig(tag=PREBUILT_IMAGE_TAG), 2), ] WORKER_CONFIG_TEST_CASES = [ From 5771b49cff6337cec7bc3f85dba35d3c41350d8b Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 27 Sep 2024 22:23:59 +0530 Subject: [PATCH 23/30] fix api/notebooks 11-container --- notebooks/api/0.8/11-container-images-k8s.ipynb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/notebooks/api/0.8/11-container-images-k8s.ipynb b/notebooks/api/0.8/11-container-images-k8s.ipynb index 63eee600646..7cca83448f4 100644 --- a/notebooks/api/0.8/11-container-images-k8s.ipynb +++ b/notebooks/api/0.8/11-container-images-k8s.ipynb @@ -1036,10 +1036,6 @@ "assert workerimage_opendp.built_at is not None, str(workerimage_opendp.__dict__)\n", "assert workerimage_opendp.image_hash is not None, str(workerimage_opendp.__dict__)\n", "\n", - "assert _images[workerimage_opendp.built_image_tag] == workerimage_opendp, str(\n", - " workerimage_opendp\n", - ")\n", - "\n", "workerimage_opendp" ] }, From 67d651da06c3a470f2a91077ae784533a3e81c73 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Sat, 28 Sep 2024 12:21:10 +0530 Subject: [PATCH 24/30] checkpoint can load other checkpoint folder apart from last checkpoint --- packages/syft/src/syft/util/test_helpers/checkpoint.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index fc463509d3f..17dece9d38c 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -119,8 +119,16 @@ def create_checkpoint( def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | None: """Return the directory of the latest checkpoint for the given notebook.""" nb_name = nb_name if nb_name else current_nbname().stem + checkpoint_dir = None + if len(nb_name.split("/")) > 1: + nb_name, checkpoint_dir = nb_name.split("/") + filename = nb_name.split(".ipynb")[0] checkpoint_parent_dir = get_checkpoints_dir(server_uid, filename) + + if checkpoint_dir: + return checkpoint_parent_dir / checkpoint_dir + checkpoint_dirs = [ d for d in checkpoint_parent_dir.glob(f"{CHECKPOINT_DIR_PREFIX}_*") @@ -131,6 +139,7 @@ def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | ] if checkpoints_dirs_with_blob_entry: + print("Loading from the last checkpoint of the current notebook.") return max(checkpoints_dirs_with_blob_entry, key=lambda d: d.stat().st_mtime) return None @@ -149,6 +158,7 @@ def load_from_checkpoint( root_password: str | None = None, registry_username: str | None = None, registry_password: str | None = None, + checkpoint_name: str | None = None, ) -> None: """Load the last saved checkpoint for the given notebook state.""" if prev_nb_filename is None: From eb6cc1ec99c83eb18753389ba2877b34b533d344 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 30 Sep 2024 12:19:04 +0530 Subject: [PATCH 25/30] remove startup overwrite logic update bigquery notebook to filter big query image --- .../bigquery/010-setup-bigquery-pool.ipynb | 2 +- .../service/migration/migration_service.py | 23 ------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb index 94aacf20397..b478a79f27f 100644 --- a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb +++ b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb @@ -388,7 +388,7 @@ " (\n", " image\n", " for image in dockerfile_list\n", - " if \"worker-bigquery\" in str(image.image_identifier)\n", + " if image.is_prebuilt and \"worker-bigquery\" in str(image.image_identifier)\n", " ),\n", " None,\n", ")\n", diff --git a/packages/syft/src/syft/service/migration/migration_service.py b/packages/syft/src/syft/service/migration/migration_service.py index 96ac85f5496..62788762acf 100644 --- a/packages/syft/src/syft/service/migration/migration_service.py +++ b/packages/syft/src/syft/service/migration/migration_service.py @@ -22,13 +22,10 @@ from ..action.action_permissions import StoragePermission from ..action.action_store import ActionObjectStash from ..context import AuthedServiceContext -from ..notifier.notifier import NotifierSettings from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService from ..service import service_method -from ..settings.settings import ServerSettings -from ..user.user import User from ..user.user_roles import ADMIN_ROLE_LEVEL from ..worker.utils import DEFAULT_WORKER_POOL_NAME from .object_migration_state import MigrationData @@ -266,13 +263,6 @@ def _create_migrated_objects( ignore_existing: bool = True, skip_check_type: bool = False, ) -> dict[type[SyftObject], list[SyftObject]]: - # These tables are considered as startup tables, if the object is in this table and already exists, - # we need to overwrite the existing object with the migrated object. - STARTUP_TABLES = [ - User, - ServerSettings, - NotifierSettings, - ] created_objects: dict[type[SyftObject], list[SyftObject]] = {} for key, objects in migrated_objects.items(): @@ -282,19 +272,6 @@ def _create_migrated_objects( context, type(migrated_object) ).unwrap() - # If the object is in the startup tables, we need to overwrite the existing object - if type(migrated_object) in STARTUP_TABLES: - try: - existing_obj = stash.get_by_unique_fields( - credentials=context.credentials, - obj=migrated_object, - ).unwrap() - stash.delete_by_uid( - context.credentials, uid=existing_obj.id - ).unwrap() - except NotFoundException: - pass - result = stash.set( context.credentials, obj=migrated_object, From 73b581a23348d2fda89fbedcf296d92d1b2dab32 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 30 Sep 2024 12:32:40 +0530 Subject: [PATCH 26/30] remove reset from 001 scale notebook --- notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb | 1 - 1 file changed, 1 deletion(-) diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 91edb30918e..69b16576fdf 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -81,7 +81,6 @@ " port=\"8080\",\n", " n_consumers=num_workers, # How many workers to be spawned\n", " create_producer=True, # Can produce more workers\n", - " reset=True,\n", " log_level=10,\n", ")" ] From 63b568457bbd6d3dfc06e18fe0add02b977e27f1 Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Mon, 30 Sep 2024 14:33:24 +0530 Subject: [PATCH 27/30] remove checkpoint relation with jupyter --- .../src/syft/util/test_helpers/checkpoint.py | 97 +++++-------------- 1 file changed, 26 insertions(+), 71 deletions(-) diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 17dece9d38c..647c8dc46f9 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -1,19 +1,14 @@ # stdlib import datetime -import json import os from pathlib import Path -# third party -import ipykernel - # syft absolute from syft import SyftError from syft import SyftException from syft.client.client import SyftClient from syft.service.user.user_roles import ServiceRole from syft.util.util import get_root_data_path -from syft.util.util import is_interpreter_jupyter # relative from ...server.env import get_default_root_email @@ -24,53 +19,25 @@ CHECKPOINT_DIR_PREFIX = "chkpt" -def get_notebook_name_from_pytest_env() -> str | None: - """ - Returns the notebook file name from the test environment variable 'PYTEST_CURRENT_TEST'. - If not available, returns None. - """ - pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST", "") - # Split by "::" and return the first part, which is the file path - return os.path.basename(pytest_current_test.split("::")[0]) - - -def current_nbname() -> Path: - """Retrieve the current Jupyter notebook name.""" - curr_kernel_file = Path(ipykernel.get_connection_file()) - kernel_file = json.loads(curr_kernel_file.read_text()) - nb_name = kernel_file.get("jupyter_session", "") - if not nb_name: - nb_name = get_notebook_name_from_pytest_env() - return Path(nb_name) - - def root_checkpoint_path() -> Path: return get_root_data_path() / CHECKPOINT_ROOT -def checkpoint_parent_dir(server_uid: str, nb_name: str | None = None) -> Path: - """Return the checkpoint directory for the current notebook and server.""" - if is_interpreter_jupyter: - nb_name = nb_name if nb_name else current_nbname().stem - return Path(f"{nb_name}/{server_uid}") if nb_name else Path(server_uid) - return Path(server_uid) - - -def get_checkpoints_dir(server_uid: str, nb_name: str) -> Path: - return root_checkpoint_path() / checkpoint_parent_dir(server_uid, nb_name) - +def get_checkpoint_parent_dir(server_uid: str, chkpt_name: str) -> Path: + return root_checkpoint_path() / chkpt_name / server_uid -def get_checkpoint_dir( - server_uid: str, checkpoint_dir: str, nb_name: str | None = None -) -> Path: - return get_checkpoints_dir(server_uid, nb_name) / checkpoint_dir - -def create_checkpoint_dir(server_uid: str) -> Path: - """Create a checkpoint directory for the current notebook and server.""" +def create_checkpoint_dir(server_uid: str, chkpt_name: str) -> Path: + """Create a checkpoint directory by chkpt_name and server_uid.""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") checkpoint_dir = f"{CHECKPOINT_DIR_PREFIX}_{timestamp}" - checkpoint_full_path = get_checkpoint_dir(server_uid, checkpoint_dir=checkpoint_dir) + checkpoint_parent_dir = get_checkpoint_parent_dir( + server_uid=server_uid, chkpt_name=chkpt_name + ) + checkpoint_full_path = checkpoint_parent_dir / checkpoint_dir + + # Format of Checkpoint Directory: + # /checkpoints/chkpt_name//chkpt_ checkpoint_full_path.mkdir(parents=True, exist_ok=True) return checkpoint_full_path @@ -81,6 +48,7 @@ def is_admin(client: SyftClient) -> bool: def create_checkpoint( + name: str, # Name of the checkpoint client: SyftClient, root_email: str | None = None, root_pwd: str | None = None, @@ -103,12 +71,9 @@ def create_checkpoint( if isinstance(migration_data, SyftError): raise SyftException(message=migration_data.message) - if not is_interpreter_jupyter(): - raise SyftException( - message="Checkpoint can only be created in Jupyter Notebook." - ) - - checkpoint_dir = create_checkpoint_dir(server_uid=client.id.to_string()) + checkpoint_dir = create_checkpoint_dir( + server_uid=client.id.to_string(), chkpt_name=name + ) migration_data.save( path=checkpoint_dir / "migration.blob", yaml_path=checkpoint_dir / "migration.yaml", @@ -116,18 +81,12 @@ def create_checkpoint( print(f"Checkpoint saved at: \n {checkpoint_dir}") -def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | None: - """Return the directory of the latest checkpoint for the given notebook.""" - nb_name = nb_name if nb_name else current_nbname().stem - checkpoint_dir = None - if len(nb_name.split("/")) > 1: - nb_name, checkpoint_dir = nb_name.split("/") +def last_checkpoint_path_for(server_uid: str, chkpt_name: str) -> Path | None: + """Return the directory of the latest checkpoint for the given name.""" - filename = nb_name.split(".ipynb")[0] - checkpoint_parent_dir = get_checkpoints_dir(server_uid, filename) - - if checkpoint_dir: - return checkpoint_parent_dir / checkpoint_dir + checkpoint_parent_dir = get_checkpoint_parent_dir( + server_uid=server_uid, chkpt_name=chkpt_name + ) checkpoint_dirs = [ d @@ -139,7 +98,7 @@ def last_checkpoint_path_for_nb(server_uid: str, nb_name: str = None) -> Path | ] if checkpoints_dirs_with_blob_entry: - print("Loading from the last checkpoint of the current notebook.") + print(f"Loading from the last checkpoint for. {chkpt_name}") return max(checkpoints_dirs_with_blob_entry, key=lambda d: d.stat().st_mtime) return None @@ -153,17 +112,13 @@ def get_registry_credentials() -> tuple[str, str]: def load_from_checkpoint( client: SyftClient, - prev_nb_filename: str | None = None, + name: str, root_email: str | None = None, root_password: str | None = None, registry_username: str | None = None, registry_password: str | None = None, - checkpoint_name: str | None = None, ) -> None: - """Load the last saved checkpoint for the given notebook state.""" - if prev_nb_filename is None: - print("Loading from the last checkpoint of the current notebook.") - prev_nb_filename = current_nbname().stem + """Load the last saved checkpoint for the given checkpoint state.""" root_email = "info@openmined.org" if root_email is None else root_email root_password = "changethis" if root_password is None else root_password @@ -173,12 +128,12 @@ def load_from_checkpoint( if is_admin(client) else client.login(email=root_email, password=root_password) ) - latest_checkpoint_dir = last_checkpoint_path_for_nb( - client.id.to_string(), prev_nb_filename + latest_checkpoint_dir = last_checkpoint_path_for( + server_uid=client.id.to_string(), chkpt_name=name ) if latest_checkpoint_dir is None: - print(f"No last checkpoint found for notebook: {prev_nb_filename}") + print(f"No last checkpoint found for : {name}") return print(f"Loading from checkpoint: {latest_checkpoint_dir}") From 9e98cf4e43b46f9a9ae72e0d6d508b76db51c93b Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 30 Sep 2024 15:34:32 +0530 Subject: [PATCH 28/30] use devspace timestamp tag for backend - use dev-latest tag for default worker pool --- packages/grid/devspace.yaml | 11 ++++++----- .../syft/templates/backend/backend-statefulset.yaml | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/packages/grid/devspace.yaml b/packages/grid/devspace.yaml index 5bde585ddb0..390fcd2b00c 100644 --- a/packages/grid/devspace.yaml +++ b/packages/grid/devspace.yaml @@ -52,8 +52,8 @@ images: target: "syft-ui-production" context: ./frontend tags: - - dev-latest - dev-${DEVSPACE_TIMESTAMP} + - dev-latest seaweedfs: image: "${CONTAINER_REGISTRY}/${DOCKER_IMAGE_SEAWEEDFS}" buildKit: @@ -61,8 +61,8 @@ images: dockerfile: ./seaweedfs/seaweedfs.dockerfile context: ./seaweedfs tags: - - dev-latest - dev-${DEVSPACE_TIMESTAMP} + - dev-latest # This is a list of `deployments` that DevSpace can create for this project deployments: @@ -75,7 +75,8 @@ deployments: values: global: registry: ${CONTAINER_REGISTRY} - version: dev-latest + version: dev-${DEVSPACE_TIMESTAMP} + workerVersion: dev-latest # anything that does not need templating should go in helm/examples/dev/base.yaml # or profile specific values files valuesFiles: @@ -167,8 +168,8 @@ profiles: dockerfile: ./rathole/rathole.dockerfile context: ./rathole tags: - - dev-latest - dev-${DEVSPACE_TIMESTAMP} + - dev-latest # use rathole client-specific chart values - op: add path: deployments.syft.helm.valuesFiles @@ -188,8 +189,8 @@ profiles: dockerfile: ./rathole/rathole.dockerfile context: ./rathole tags: - - dev-latest - dev-${DEVSPACE_TIMESTAMP} + - dev-latest # enable rathole `devspace dev` config - op: add path: dev diff --git a/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml b/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml index bb126d3dcd3..67b0db4d452 100644 --- a/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml +++ b/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml @@ -70,7 +70,7 @@ spec: - name: INMEMORY_WORKERS value: {{ .Values.server.inMemoryWorkers | quote }} - name: DEFAULT_WORKER_POOL_IMAGE - value: "{{ .Values.global.registry }}/openmined/syft-backend:{{ .Values.global.version }}" + value: "{{ .Values.global.registry }}/openmined/syft-backend:{{ .Values.global.workerVersion }}" - name: DEFAULT_WORKER_POOL_COUNT value: {{ .Values.server.defaultWorkerPool.count | quote }} - name: DEFAULT_WORKER_POOL_POD_LABELS From bc23abcb9eb47835459db9729e130a61347b27c9 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 30 Sep 2024 15:56:47 +0530 Subject: [PATCH 29/30] remove unused method from db stash --- packages/syft/src/syft/store/db/stash.py | 48 ------------------------ 1 file changed, 48 deletions(-) diff --git a/packages/syft/src/syft/store/db/stash.py b/packages/syft/src/syft/store/db/stash.py index 186074cb955..f2083a1a424 100644 --- a/packages/syft/src/syft/store/db/stash.py +++ b/packages/syft/src/syft/store/db/stash.py @@ -180,54 +180,6 @@ def _print_query(self, stmt: sa.sql.select) -> None: def unique_fields(self) -> list[str]: return getattr(self.object_type, "__attr_unique__", []) - @as_result(SyftException, StashException, NotFoundException) - @with_session - def get_by_unique_fields( - self, - credentials: SyftVerifyKey, - obj: StashT, - session: Session = None, - has_permission: bool = False, - ) -> StashT: - query = self.query() - - if not has_permission: - role = self.get_role(credentials, session=session) - query = query.with_permissions(credentials, role) - - unique_fields = self.unique_fields - - filters = [] - for field_name in unique_fields: - field_value = getattr(obj, field_name, None) - if not is_json_primitive(field_value): - raise StashException( - f"Cannot check uniqueness of non-primitive field {field_name}" - ) - if field_value is None: - continue - filters.append((field_name, "eq", field_value)) - - query = self.query() - query = query.filter_or( - *filters, - ) - - results = query.execute(session).all() - - if len(results) == 1: - result = results[0] - elif len(results) > 1: - raise StashException( - f"Multiple objects found for unique fields: {unique_fields}" - ) - else: - raise NotFoundException( - f"No object found for unique fields: {unique_fields}" - ) - - return result - @with_session def is_unique(self, obj: StashT, session: Session = None) -> bool: unique_fields = self.unique_fields From f7a2e20749af5dc7e9a72dee28c54c2de2d11df0 Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:09:26 +0530 Subject: [PATCH 30/30] update notebooks with new checkpoint function names --- .../000-start-and-configure-server-and-admins.ipynb | 11 ++--------- .../bigquery/001-scale-delete-worker-pools.ipynb | 12 ++---------- .../syft/src/syft/util/test_helpers/checkpoint.py | 2 +- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb index 7320426de06..3aabe924a4c 100644 --- a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb +++ b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb @@ -211,7 +211,7 @@ "metadata": {}, "outputs": [], "source": [ - "create_checkpoint(root_client)" + "create_checkpoint(name=\"000-start-and-config\", client=root_client)" ] }, { @@ -231,13 +231,6 @@ "source": [ "server.land()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { @@ -256,7 +249,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.5" + "version": "3.11.10" } }, "nbformat": 4, diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 69b16576fdf..65ad4ae6dde 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -93,7 +93,7 @@ "outputs": [], "source": [ "load_from_checkpoint(\n", - " prev_nb_filename=\"000-start-and-configure-server-and-admins\",\n", + " name=\"000-start-and-config\",\n", " client=server.client,\n", " root_email=ROOT_EMAIL,\n", " root_password=ROOT_PASSWORD,\n", @@ -357,14 +357,6 @@ "source": [ "server.land()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "29", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { @@ -383,7 +375,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.5" + "version": "3.11.10" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/util/test_helpers/checkpoint.py b/packages/syft/src/syft/util/test_helpers/checkpoint.py index 647c8dc46f9..99b0a3b1e52 100644 --- a/packages/syft/src/syft/util/test_helpers/checkpoint.py +++ b/packages/syft/src/syft/util/test_helpers/checkpoint.py @@ -98,7 +98,7 @@ def last_checkpoint_path_for(server_uid: str, chkpt_name: str) -> Path | None: ] if checkpoints_dirs_with_blob_entry: - print(f"Loading from the last checkpoint for. {chkpt_name}") + print(f"Loading from the last checkpoint for: {chkpt_name}") return max(checkpoints_dirs_with_blob_entry, key=lambda d: d.stat().st_mtime) return None