From 5d2da7794d8a84e3be0b2744ad96e5a69f9d5b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yuan-Ting=20Hsieh=20=28=E8=AC=9D=E6=B2=85=E5=BB=B7=29?= Date: Wed, 27 Mar 2024 16:42:55 -0700 Subject: [PATCH 1/5] Update github actions (#2450) --- .github/workflows/blossom-ci.yml | 2 +- .github/workflows/codeql.yml | 2 +- .github/workflows/markdown-links-check.yml | 2 +- .github/workflows/premerge.yml | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/blossom-ci.yml b/.github/workflows/blossom-ci.yml index 844cdf1c93..ce13d01fbb 100644 --- a/.github/workflows/blossom-ci.yml +++ b/.github/workflows/blossom-ci.yml @@ -74,7 +74,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: repository: ${{ fromJson(needs.Authorization.outputs.args).repo }} ref: ${{ fromJson(needs.Authorization.outputs.args).ref }} diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 193f7b48e5..0425542192 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -36,7 +36,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL diff --git a/.github/workflows/markdown-links-check.yml b/.github/workflows/markdown-links-check.yml index 56fe4e7982..1a8686ea30 100644 --- a/.github/workflows/markdown-links-check.yml +++ b/.github/workflows/markdown-links-check.yml @@ -23,7 +23,7 @@ jobs: markdown-link-check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v4 - uses: gaurav-nelson/github-action-markdown-link-check@1.0.15 with: max-depth: -1 diff --git a/.github/workflows/premerge.yml b/.github/workflows/premerge.yml index 932275df7e..bea72de229 100644 --- a/.github/workflows/premerge.yml +++ b/.github/workflows/premerge.yml @@ -29,9 +29,9 @@ jobs: os: [ ubuntu-22.04, ubuntu-20.04 ] python-version: [ "3.8", "3.9", "3.10" ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -49,9 +49,9 @@ jobs: os: [ ubuntu-22.04, ubuntu-20.04 ] python-version: [ "3.8", "3.9", "3.10" ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies From 716ff9b05cb3c42d5f2a8c03ce867b99721e0fa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yuan-Ting=20Hsieh=20=28=E8=AC=9D=E6=B2=85=E5=BB=B7=29?= Date: Thu, 4 Apr 2024 10:13:39 -0700 Subject: [PATCH 2/5] Fix premerge (#2467) --- .github/workflows/premerge.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/premerge.yml b/.github/workflows/premerge.yml index bea72de229..d3de85e167 100644 --- a/.github/workflows/premerge.yml +++ b/.github/workflows/premerge.yml @@ -36,8 +36,8 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install -e .[dev] + python3 -m pip install --upgrade pip + python3 -m pip install --no-cache-dir -e .[dev] - name: Run unit test run: ./runtest.sh @@ -56,8 +56,8 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install -e .[dev] - pip install build twine torch torchvision + python3 -m pip install --upgrade pip + python3 -m pip install --no-cache-dir -e .[dev] + python3 -m pip install --no-cache-dir build twine torch torchvision - name: Run wheel build run: python3 -m build --wheel From 72813970dc16ac853b53829b349962c974bb22de Mon Sep 17 00:00:00 2001 From: Isaac Yang Date: Fri, 12 Apr 2024 11:39:00 -0700 Subject: [PATCH 3/5] Fix issues on hello-world TF2 notebook --- .../jobs/hello-tf2/app/custom/trainer.py | 4 ++ examples/hello-world/hello_world.ipynb | 59 ++++--------------- 2 files changed, 14 insertions(+), 49 deletions(-) diff --git a/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py b/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py index 720f795198..bbb8c17b9b 100644 --- a/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py +++ b/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py @@ -12,6 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os + +os.enviro["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" + import numpy as np import tensorflow as tf from tf2_net import Net diff --git a/examples/hello-world/hello_world.ipynb b/examples/hello-world/hello_world.ipynb index a1853e1440..cd8e9b8f23 100644 --- a/examples/hello-world/hello_world.ipynb +++ b/examples/hello-world/hello_world.ipynb @@ -738,8 +738,9 @@ ] }, { + "attachments": {}, "cell_type": "markdown", - "id": "425c7f1d-7cb6-4602-bb88-4b87ff517529", + "id": "696a384e-f6da-4044-a7b9-db4c464f52ac", "metadata": {}, "source": [ "#### Running Tensorflow on local host with GPU \n", @@ -748,53 +749,13 @@ "We are running with 1 server, 2 sites in a local machine, which means three process involved for this federated training. \n", "If the local host has GPU, you might enter OOM error, due to the way Tensorflow consumes GPU memory. By default, TensorFlow maps nearly all of the GPU memory of all GPUs (subject to CUDA_VISIBLE_DEVICES) visible to the process. If one has multiple process, some of the process will be OOM. To avoid multiple processes grabbing all GPU memory in TF, use the options described in [Limiting GPU memory growth]( https://www.tensorflow.org/guide/gpu#limiting_gpu_memory_growth). \n", "\n", - "In our cases, we prefer that the process only allocates a subset of the available memory, or to only grow the memory usage as is needed by the process. TensorFlow provides two methods to control this. \n", + "In our cases, we prefer that the process only allocates a subset of the available memory, or to only grow the memory usage as is needed by the process. TensorFlow provides two methods to control this, as described in the above link.\n", + "\n", + "In this example, we explictly set the environment varialble `TF_FORCE_GPU_ALLOW_GROWTH` to `true` at the very beginning of the trainer.py file, which runs in the clients and will allocate GPU memory for training. With the env var been set, TF will not grab the entire GPU memory and will not cause GPU OOM error when running POC on local host.\n", + "\n", + "Note that setting the env var `TF_FORCE_GPU_ALLOW_GROWTH` inside this notebook takes no effect because the clients of POC have already started and their env vars are set at the starting time.\n", + "\n", "\n", - "The First method is set the environmental variable TF_FORCE_GPU_ALLOW_GROWTH to true. This configuration is platform specific. \n", - "The 2nd method is using the piece of code below" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "026ce33a-90b1-4ef7-8c7c-f25722f2d2ae", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "%env TF_FORCE_GPU_ALLOW_GROWTH=true" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "91cf84ec-57f1-439f-b72b-b33fea1a7f7c", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import tensorflow as tf\n", - "gpus = tf.config.list_physical_devices('GPU')\n", - "if gpus:\n", - " # Restrict TensorFlow to only allocate 1GB of memory on the first GPU\n", - " try:\n", - " tf.config.set_logical_device_configuration(\n", - " gpus[0],\n", - " [tf.config.LogicalDeviceConfiguration(memory_limit=1024)])\n", - " logical_gpus = tf.config.list_logical_devices('GPU')\n", - " print(len(gpus), \"Physical GPUs,\", len(logical_gpus), \"Logical GPUs\")\n", - " except RuntimeError as e:\n", - " # Virtual devices must be set before GPUs have been initialized\n", - " print(e)" - ] - }, - { - "cell_type": "markdown", - "id": "0c3f1149-d54d-4f3b-b497-c06deba22fef", - "metadata": {}, - "source": [ "### 1. Submit job using FLARE API\n", "\n", "Starting a FLARE API session and submit the hello-tf2 job\n", @@ -833,7 +794,7 @@ }, "outputs": [], "source": [ - "! tail -100 /tmp/nvflare/poc/server/log.txt" + "! tail -100 /tmp/nvflare/poc/example_project/prod_00/server/log.txt" ] }, { @@ -977,7 +938,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.17" + "version": "3.8.15" }, "vscode": { "interpreter": { From 90fd4df9feab793361b6a9aecb3d131246d91cff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yuan-Ting=20Hsieh=20=28=E8=AC=9D=E6=B2=85=E5=BB=B7=29?= Date: Sun, 14 Apr 2024 22:13:04 -0700 Subject: [PATCH 4/5] Fix tf integration test (#2504) --- .../hello-tf2/jobs/hello-tf2/app/custom/trainer.py | 2 +- tests/integration_test/src/utils.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py b/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py index bbb8c17b9b..25725687d4 100644 --- a/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py +++ b/examples/hello-world/hello-tf2/jobs/hello-tf2/app/custom/trainer.py @@ -14,7 +14,7 @@ import os -os.enviro["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" +os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" import numpy as np import tensorflow as tf diff --git a/tests/integration_test/src/utils.py b/tests/integration_test/src/utils.py index 0a945607c9..d80f34b13b 100644 --- a/tests/integration_test/src/utils.py +++ b/tests/integration_test/src/utils.py @@ -195,6 +195,10 @@ def check_client_status_ready(response: dict) -> bool: if "client_statuses" not in response["details"]: return False + for row in response["details"]["client_statuses"][1:]: + if row[3] == "No Reply": + return False + return True From 60609b721283b604986e6892d7d698fe30d4d2a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yuan-Ting=20Hsieh=20=28=E8=AC=9D=E6=B2=85=E5=BB=B7=29?= Date: Tue, 16 Apr 2024 12:51:35 -0700 Subject: [PATCH 5/5] Add client api integration tests --- .../app_client/config/config_fed_client.conf | 43 ++++ .../np_loop/app_client/custom/train_diff.py | 60 ++++++ .../np_loop/app_client/custom/train_full.py | 59 ++++++ .../np_loop/app_client/custom/train_loop.py | 68 +++++++ .../app_client/custom/train_metrics.py | 86 ++++++++ .../app_server/config/config_fed_server.conf | 47 +++++ .../data/jobs/np_loop/meta.conf | 15 ++ .../app/config/config_fed_client.conf | 47 +++++ .../app/config/config_fed_server.conf | 47 +++++ .../app/custom/train_diff.py | 60 ++++++ .../app/custom/train_full.py | 59 ++++++ .../app/custom/train_loop.py | 68 +++++++ .../app/custom/train_metrics.py | 86 ++++++++ .../data/jobs/np_loop_cell_pipe/meta.conf | 11 + .../app/config/config_fed_client.conf | 77 +++++++ .../app/config/config_fed_server.conf | 56 ++++++ .../jobs/np_metrics/app/custom/train_diff.py | 60 ++++++ .../jobs/np_metrics/app/custom/train_full.py | 59 ++++++ .../jobs/np_metrics/app/custom/train_loop.py | 68 +++++++ .../np_metrics/app/custom/train_metrics.py | 86 ++++++++ .../data/jobs/np_metrics/meta.conf | 11 + .../app/config/config_fed_client.conf | 77 +++++++ .../app/config/config_fed_server.conf | 62 ++++++ .../pt_client_api/app/custom/cifar10_fl.py | 137 +++++++++++++ .../data/jobs/pt_client_api/app/custom/net.py | 37 ++++ .../data/jobs/pt_client_api/meta.conf | 11 + .../app/config/config_fed_client.conf | 43 ++++ .../app/config/config_fed_server.conf | 38 ++++ .../app/custom/cifar10_fl.py | 132 ++++++++++++ .../pt_client_api_cyclic/app/custom/net.py | 37 ++++ .../data/jobs/pt_client_api_cyclic/meta.conf | 11 + .../app/config/config_fed_client.conf | 77 +++++++ .../app/config/config_fed_server.conf | 62 ++++++ .../app/custom/cifar10_fl.py | 137 +++++++++++++ .../app/custom/net.py | 37 ++++ .../jobs/pt_client_api_launch_once/meta.conf | 11 + .../app/config/config_fed_client.conf | 120 +++++++++++ .../app/config/config_fed_server.conf | 106 ++++++++++ .../jobs/qa_job_4558419/app/custom/net.py | 25 +++ .../jobs/qa_job_4558419/app/custom/train.py | 68 +++++++ .../data/jobs/qa_job_4558419/meta.conf | 11 + .../app/config/config_fed_client.conf | 77 +++++++ .../app/config/config_fed_server.conf | 62 ++++++ .../jobs/qa_job_4561583/app/custom/net.py | 25 +++ .../jobs/qa_job_4561583/app/custom/pl_net.py | 66 ++++++ .../qa_job_4561583/app/custom/poc_executor.py | 41 ++++ .../data/jobs/qa_job_4561583/meta.conf | 11 + .../app/config/config_fed_client.conf | 144 ++++++++++++++ .../app/config/config_fed_server.conf | 33 +++ .../jobs/qa_job_4561872/app/custom/net.py | 37 ++++ .../jobs/qa_job_4561872/app/custom/train.py | 188 ++++++++++++++++++ .../data/jobs/qa_job_4561872/meta.conf | 11 + .../app/config/config_fed_client.conf | 77 +++++++ .../app/config/config_fed_server.conf | 62 ++++++ .../jobs/qa_job_4592780/app/custom/net.py | 25 +++ .../qa_job_4592780/app/custom/poc_executor.py | 78 ++++++++ .../data/jobs/qa_job_4592780/meta.conf | 11 + .../standalone_job/client_api.yml | 148 ++++++++++++++ .../standalone_job/client_api_qa.yml | 102 ++++++++++ .../integration_test/run_integration_tests.sh | 2 +- tests/integration_test/test_configs.yml | 4 + 61 files changed, 3615 insertions(+), 1 deletion(-) create mode 100644 tests/integration_test/data/jobs/np_loop/app_client/config/config_fed_client.conf create mode 100755 tests/integration_test/data/jobs/np_loop/app_client/custom/train_diff.py create mode 100755 tests/integration_test/data/jobs/np_loop/app_client/custom/train_full.py create mode 100755 tests/integration_test/data/jobs/np_loop/app_client/custom/train_loop.py create mode 100755 tests/integration_test/data/jobs/np_loop/app_client/custom/train_metrics.py create mode 100644 tests/integration_test/data/jobs/np_loop/app_server/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/np_loop/meta.conf create mode 100644 tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_server.conf create mode 100755 tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_diff.py create mode 100755 tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_full.py create mode 100755 tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_loop.py create mode 100755 tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_metrics.py create mode 100644 tests/integration_test/data/jobs/np_loop_cell_pipe/meta.conf create mode 100644 tests/integration_test/data/jobs/np_metrics/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/np_metrics/app/config/config_fed_server.conf create mode 100755 tests/integration_test/data/jobs/np_metrics/app/custom/train_diff.py create mode 100755 tests/integration_test/data/jobs/np_metrics/app/custom/train_full.py create mode 100755 tests/integration_test/data/jobs/np_metrics/app/custom/train_loop.py create mode 100755 tests/integration_test/data/jobs/np_metrics/app/custom/train_metrics.py create mode 100644 tests/integration_test/data/jobs/np_metrics/meta.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api/app/custom/cifar10_fl.py create mode 100644 tests/integration_test/data/jobs/pt_client_api/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/pt_client_api/meta.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/cifar10_fl.py create mode 100644 tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/pt_client_api_cyclic/meta.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/cifar10_fl.py create mode 100644 tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/pt_client_api_launch_once/meta.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4558419/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/qa_job_4558419/app/custom/train.py create mode 100644 tests/integration_test/data/jobs/qa_job_4558419/meta.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4561583/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/qa_job_4561583/app/custom/pl_net.py create mode 100644 tests/integration_test/data/jobs/qa_job_4561583/app/custom/poc_executor.py create mode 100644 tests/integration_test/data/jobs/qa_job_4561583/meta.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4561872/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/qa_job_4561872/app/custom/train.py create mode 100644 tests/integration_test/data/jobs/qa_job_4561872/meta.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_client.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_server.conf create mode 100644 tests/integration_test/data/jobs/qa_job_4592780/app/custom/net.py create mode 100644 tests/integration_test/data/jobs/qa_job_4592780/app/custom/poc_executor.py create mode 100644 tests/integration_test/data/jobs/qa_job_4592780/meta.conf create mode 100644 tests/integration_test/data/test_configs/standalone_job/client_api.yml create mode 100644 tests/integration_test/data/test_configs/standalone_job/client_api_qa.yml diff --git a/tests/integration_test/data/jobs/np_loop/app_client/config/config_fed_client.conf b/tests/integration_test/data/jobs/np_loop/app_client/config/config_fed_client.conf new file mode 100644 index 0000000000..5248dae97a --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/app_client/config/config_fed_client.conf @@ -0,0 +1,43 @@ +{ + format_version = 2 + app_script = "train_loop.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_common.executors.client_api_launcher_executor.ClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "numpy" + params_transfer_type = "FULL" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 custom/{app_script} {app_config} " + launch_once = true + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.file_pipe.FilePipe" + args { + mode = "PASSIVE" + root_path = "{WORKSPACE}/{JOB_ID}/{SITE_NAME}" + } + } + ] +} diff --git a/tests/integration_test/data/jobs/np_loop/app_client/custom/train_diff.py b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_diff.py new file mode 100755 index 0000000000..08f3c8e7c0 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_diff.py @@ -0,0 +1,60 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + # calculate difference here + diff = output_numpy_array - input_numpy_array + + # send back the model difference + print(f"send back: {diff}") + flare.send(flare.FLModel(params={"numpy_key": diff}, params_type="DIFF", metrics={"accuracy": metrics})) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop/app_client/custom/train_full.py b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_full.py new file mode 100755 index 0000000000..9c05536b85 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_full.py @@ -0,0 +1,59 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel(params={"numpy_key": output_numpy_array}, params_type="FULL", metrics={"accuracy": metrics}) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop/app_client/custom/train_loop.py b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_loop.py new file mode 100755 index 0000000000..ea9ec25149 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_loop.py @@ -0,0 +1,68 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}", flush=True) + + while flare.is_running(): + input_model = flare.receive() + print(f"received weights is: {input_model.params}", flush=True) + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + sys_info = flare.system_info() + print(f"system info is: {sys_info}", flush=True) + print(f"finish round: {input_model.current_round}", flush=True) + + # send back the model + print(f"send back: {output_numpy_array}", flush=True) + flare.send( + flare.FLModel( + params={"numpy_key": output_numpy_array}, + params_type="FULL", + metrics={"accuracy": metrics}, + current_round=input_model.current_round, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop/app_client/custom/train_metrics.py b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_metrics.py new file mode 100755 index 0000000000..e508b74f30 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/app_client/custom/train_metrics.py @@ -0,0 +1,86 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import time + +import nvflare.client as flare +from nvflare.client.tracking import MLflowWriter + + +def train(input_arr, current_round, epochs=3): + writer = MLflowWriter() + output_arr = copy.deepcopy(input_arr) + num_of_data = 2000 + batch_size = 16 + num_of_batches = num_of_data // batch_size + for i in range(epochs): + for j in range(num_of_batches): + global_step = current_round * num_of_batches * epochs + i * num_of_batches + j + print(f"logging record: {global_step}") + writer.log_metric( + key="global_step", + value=global_step, + step=global_step, + ) + # mock training with plus 1 + output_arr += 1 + # assume each epoch takes 1 seconds + time.sleep(1.0) + return output_arr + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + while flare.is_running(): + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array, current_round=input_model.current_round, epochs=3) + + # evaluation + metrics = evaluate(input_numpy_array) + + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + print(f"finish round: {input_model.current_round}") + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel( + params={"numpy_key": output_numpy_array}, + params_type="FULL", + metrics={"accuracy": metrics}, + current_round=input_model.current_round, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop/app_server/config/config_fed_server.conf b/tests/integration_test/data/jobs/np_loop/app_server/config/config_fed_server.conf new file mode 100644 index 0000000000..36dcd2c344 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/app_server/config/config_fed_server.conf @@ -0,0 +1,47 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 5 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_common.np.np_model_persistor.NPModelPersistor" + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHTS" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + ] +} diff --git a/tests/integration_test/data/jobs/np_loop/meta.conf b/tests/integration_test/data/jobs/np_loop/meta.conf new file mode 100644 index 0000000000..5e421f1319 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop/meta.conf @@ -0,0 +1,15 @@ +{ + name = "np_loop" + resource_spec {} + deploy_map { + app_server = [ + "server" + ], + app_client = [ + "site-1", + "site-2" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_client.conf new file mode 100644 index 0000000000..458f50f568 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_client.conf @@ -0,0 +1,47 @@ +{ + format_version = 2 + app_script = "train_loop.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_common.executors.client_api_launcher_executor.ClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "numpy" + params_transfer_type = "FULL" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 custom/{app_script} {app_config} " + launch_once = true + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + ] +} diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_server.conf new file mode 100644 index 0000000000..36dcd2c344 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/config/config_fed_server.conf @@ -0,0 +1,47 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 5 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_common.np.np_model_persistor.NPModelPersistor" + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHTS" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + ] +} diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_diff.py b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_diff.py new file mode 100755 index 0000000000..08f3c8e7c0 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_diff.py @@ -0,0 +1,60 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + # calculate difference here + diff = output_numpy_array - input_numpy_array + + # send back the model difference + print(f"send back: {diff}") + flare.send(flare.FLModel(params={"numpy_key": diff}, params_type="DIFF", metrics={"accuracy": metrics})) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_full.py b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_full.py new file mode 100755 index 0000000000..9c05536b85 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_full.py @@ -0,0 +1,59 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel(params={"numpy_key": output_numpy_array}, params_type="FULL", metrics={"accuracy": metrics}) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_loop.py b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_loop.py new file mode 100755 index 0000000000..ea9ec25149 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_loop.py @@ -0,0 +1,68 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}", flush=True) + + while flare.is_running(): + input_model = flare.receive() + print(f"received weights is: {input_model.params}", flush=True) + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + sys_info = flare.system_info() + print(f"system info is: {sys_info}", flush=True) + print(f"finish round: {input_model.current_round}", flush=True) + + # send back the model + print(f"send back: {output_numpy_array}", flush=True) + flare.send( + flare.FLModel( + params={"numpy_key": output_numpy_array}, + params_type="FULL", + metrics={"accuracy": metrics}, + current_round=input_model.current_round, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_metrics.py b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_metrics.py new file mode 100755 index 0000000000..e508b74f30 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/app/custom/train_metrics.py @@ -0,0 +1,86 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import time + +import nvflare.client as flare +from nvflare.client.tracking import MLflowWriter + + +def train(input_arr, current_round, epochs=3): + writer = MLflowWriter() + output_arr = copy.deepcopy(input_arr) + num_of_data = 2000 + batch_size = 16 + num_of_batches = num_of_data // batch_size + for i in range(epochs): + for j in range(num_of_batches): + global_step = current_round * num_of_batches * epochs + i * num_of_batches + j + print(f"logging record: {global_step}") + writer.log_metric( + key="global_step", + value=global_step, + step=global_step, + ) + # mock training with plus 1 + output_arr += 1 + # assume each epoch takes 1 seconds + time.sleep(1.0) + return output_arr + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + while flare.is_running(): + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array, current_round=input_model.current_round, epochs=3) + + # evaluation + metrics = evaluate(input_numpy_array) + + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + print(f"finish round: {input_model.current_round}") + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel( + params={"numpy_key": output_numpy_array}, + params_type="FULL", + metrics={"accuracy": metrics}, + current_round=input_model.current_round, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_loop_cell_pipe/meta.conf b/tests/integration_test/data/jobs/np_loop_cell_pipe/meta.conf new file mode 100644 index 0000000000..1b27fd54a7 --- /dev/null +++ b/tests/integration_test/data/jobs/np_loop_cell_pipe/meta.conf @@ -0,0 +1,11 @@ +{ + name = "np_loop_cell_pipe" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/np_metrics/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/np_metrics/app/config/config_fed_client.conf new file mode 100644 index 0000000000..455684cf80 --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/app/config/config_fed_client.conf @@ -0,0 +1,77 @@ +{ + format_version = 2 + app_script = "train_metrics.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_common.executors.client_api_launcher_executor.ClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "numpy" + params_transfer_type = "DIFF" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 custom/{app_script} {app_config} " + launch_once = true + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + read_interval = 0.1 + } + } + { + id = "client_api_config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = [ + "metric_relay" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/np_metrics/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/np_metrics/app/config/config_fed_server.conf new file mode 100644 index 0000000000..a5f1f0856c --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/app/config/config_fed_server.conf @@ -0,0 +1,56 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 5 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_common.np.np_model_persistor.NPModelPersistor" + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHT_DIFF" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + { + id = "tb_analytics_receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args { + events = [ + "fed.analytix_log_stats" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/np_metrics/app/custom/train_diff.py b/tests/integration_test/data/jobs/np_metrics/app/custom/train_diff.py new file mode 100755 index 0000000000..08f3c8e7c0 --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/app/custom/train_diff.py @@ -0,0 +1,60 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + # calculate difference here + diff = output_numpy_array - input_numpy_array + + # send back the model difference + print(f"send back: {diff}") + flare.send(flare.FLModel(params={"numpy_key": diff}, params_type="DIFF", metrics={"accuracy": metrics})) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_metrics/app/custom/train_full.py b/tests/integration_test/data/jobs/np_metrics/app/custom/train_full.py new file mode 100755 index 0000000000..9c05536b85 --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/app/custom/train_full.py @@ -0,0 +1,59 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel(params={"numpy_key": output_numpy_array}, params_type="FULL", metrics={"accuracy": metrics}) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_metrics/app/custom/train_loop.py b/tests/integration_test/data/jobs/np_metrics/app/custom/train_loop.py new file mode 100755 index 0000000000..c7916efeec --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/app/custom/train_loop.py @@ -0,0 +1,68 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import nvflare.client as flare + + +def train(input_arr): + output_arr = copy.deepcopy(input_arr) + # mock training with plus 1 + return output_arr + 1 + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + while flare.is_running(): + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array) + + # evaluation + metrics = evaluate(input_numpy_array) + + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + print(f"finish round: {input_model.current_round}") + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel( + params={"numpy_key": output_numpy_array}, + params_type="FULL", + metrics={"accuracy": metrics}, + current_round=input_model.current_round, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_metrics/app/custom/train_metrics.py b/tests/integration_test/data/jobs/np_metrics/app/custom/train_metrics.py new file mode 100755 index 0000000000..e5e7cd4257 --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/app/custom/train_metrics.py @@ -0,0 +1,86 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import time + +import nvflare.client as flare +from nvflare.client.tracking import MLflowWriter + + +def train(input_arr, current_round, epochs=3): + writer = MLflowWriter() + output_arr = copy.deepcopy(input_arr) + num_of_data = 2000 + batch_size = 16 + num_of_batches = num_of_data // batch_size + for i in range(epochs): + for j in range(num_of_batches): + global_step = current_round * num_of_batches * epochs + i * num_of_batches + j + print(f"logging record: {global_step}", flush=True) + writer.log_metric( + key="global_step", + value=global_step, + step=global_step, + ) + # mock training with plus 1 + output_arr += 1 + # assume each epoch takes 1 seconds + time.sleep(1.0) + return output_arr + + +def evaluate(input_arr): + # mock evaluation metrics + return 100 + + +def main(): + # initializes NVFlare interface + flare.init() + + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + + while flare.is_running(): + input_model = flare.receive() + print(f"received weights is: {input_model.params}") + + input_numpy_array = input_model.params["numpy_key"] + + # training + output_numpy_array = train(input_numpy_array, current_round=input_model.current_round, epochs=3) + + # evaluation + metrics = evaluate(input_numpy_array) + + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + print(f"finish round: {input_model.current_round}") + + # send back the model + print(f"send back: {output_numpy_array}") + flare.send( + flare.FLModel( + params={"numpy_key": output_numpy_array}, + params_type="FULL", + metrics={"accuracy": metrics}, + current_round=input_model.current_round, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/np_metrics/meta.conf b/tests/integration_test/data/jobs/np_metrics/meta.conf new file mode 100644 index 0000000000..13374f8a0e --- /dev/null +++ b/tests/integration_test/data/jobs/np_metrics/meta.conf @@ -0,0 +1,11 @@ +{ + name = "np_metrics" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_client.conf new file mode 100644 index 0000000000..289f0e81a0 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_client.conf @@ -0,0 +1,77 @@ +{ + format_version = 2 + app_script = "cifar10_fl.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "pytorch" + params_transfer_type = "DIFF" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 -u custom/{app_script} {app_config} " + launch_once = false + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + read_interval = 0.1 + } + } + { + id = "config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = [ + "metric_relay" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_server.conf new file mode 100644 index 0000000000..8245a2d527 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api/app/config/config_fed_server.conf @@ -0,0 +1,62 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + model_class_path = "net.Net" + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 2 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + args { + model { + path = "{model_class_path}" + } + } + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHT_DIFF" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + { + id = "receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args { + events = [ + "fed.analytix_log_stats" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/pt_client_api/app/custom/cifar10_fl.py b/tests/integration_test/data/jobs/pt_client_api/app/custom/cifar10_fl.py new file mode 100644 index 0000000000..25fc291e98 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api/app/custom/cifar10_fl.py @@ -0,0 +1,137 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.optim as optim +import torchvision +import torchvision.transforms as transforms +from net import Net + +# (1) import nvflare client API +import nvflare.client as flare + +# (optional) metrics +from nvflare.client.tracking import SummaryWriter + +# (optional) set a fix place so we don't need to download everytime +DATASET_PATH = "/tmp/nvflare/data" +# (optional) We change to use GPU to speed things up. +# if you want to use CPU, change DEVICE="cpu" +DEVICE = "cuda:0" + + +def main(): + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + + batch_size = 4 + epochs = 2 + + trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform) + trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2) + + testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform) + testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2) + + net = Net() + + # (2) initializes NVFlare client API + flare.init() + + summary_writer = SummaryWriter() + while flare.is_running(): + # (3) receives FLModel from NVFlare + input_model = flare.receive() + print(f"current_round={input_model.current_round}") + + # (4) loads model from NVFlare + net.load_state_dict(input_model.params) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + + # (optional) use GPU to speed things up + net.to(DEVICE) + # (optional) calculate total steps + steps = epochs * len(trainloader) + for epoch in range(epochs): # loop over the dataset multiple times + + running_loss = 0.0 + for i, data in enumerate(trainloader, 0): + # get the inputs; data is a list of [inputs, labels] + # (optional) use GPU to speed things up + inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE) + + # zero the parameter gradients + optimizer.zero_grad() + + # forward + backward + optimize + outputs = net(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + # print statistics + running_loss += loss.item() + if i % 2000 == 1999: # print every 2000 mini-batches + print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}") + global_step = input_model.current_round * steps + epoch * len(trainloader) + i + + summary_writer.add_scalar(tag="loss_for_each_batch", scalar=running_loss, global_step=global_step) + running_loss = 0.0 + + print("Finished Training") + + PATH = "./cifar_net.pth" + torch.save(net.state_dict(), PATH) + + # (5) wraps evaluation logic into a method to re-use for + # evaluation on both trained and received model + def evaluate(input_weights): + net = Net() + net.load_state_dict(input_weights) + # (optional) use GPU to speed things up + net.to(DEVICE) + + correct = 0 + total = 0 + # since we're not training, we don't need to calculate the gradients for our outputs + with torch.no_grad(): + for data in testloader: + # (optional) use GPU to speed things up + images, labels = data[0].to(DEVICE), data[1].to(DEVICE) + # calculate outputs by running images through the network + outputs = net(images) + # the class with the highest energy is what we choose as prediction + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + print(f"Accuracy of the network on the 10000 test images: {100 * correct // total} %") + return 100 * correct // total + + # (6) evaluate on received model for model selection + accuracy = evaluate(input_model.params) + # (7) construct trained FL model + output_model = flare.FLModel( + params=net.cpu().state_dict(), + metrics={"accuracy": accuracy}, + meta={"NUM_STEPS_CURRENT_ROUND": steps}, + ) + # (8) send model back to NVFlare + flare.send(output_model) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/pt_client_api/app/custom/net.py b/tests/integration_test/data/jobs/pt_client_api/app/custom/net.py new file mode 100644 index 0000000000..47ac7e9589 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api/app/custom/net.py @@ -0,0 +1,37 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) # flatten all dimensions except batch + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x diff --git a/tests/integration_test/data/jobs/pt_client_api/meta.conf b/tests/integration_test/data/jobs/pt_client_api/meta.conf new file mode 100644 index 0000000000..479de325a2 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api/meta.conf @@ -0,0 +1,11 @@ +{ + name = "client_api" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_client.conf new file mode 100644 index 0000000000..1809d849c4 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_client.conf @@ -0,0 +1,43 @@ +{ + format_version = 2 + app_script = "cifar10_fl.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "pytorch" + params_transfer_type = "DIFF" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 -u custom/{app_script} {app_config} " + launch_once = true + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.file_pipe.FilePipe" + args { + mode = "PASSIVE" + root_path = "{WORKSPACE}/{JOB_ID}/{SITE_NAME}" + } + } + ] +} diff --git a/tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_server.conf new file mode 100644 index 0000000000..a194c1ed9e --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/config/config_fed_server.conf @@ -0,0 +1,38 @@ +{ + format_version = 2 + server { + heart_beat_timeout = 600 + } + task_data_filters = [] + task_result_filters = [] + model_class_path = "net.Net" + components = [ + { + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + args { + model { + path = "{model_class_path}" + } + } + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + ] + workflows = [ + { + id = "cyclic_ctl" + path = "nvflare.app_common.workflows.cyclic_ctl.CyclicController" + args { + num_rounds = 3 + task_assignment_timeout = 8 + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + task_name = "train" + } + } + ] +} diff --git a/tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/cifar10_fl.py b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/cifar10_fl.py new file mode 100644 index 0000000000..80df2f4086 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/cifar10_fl.py @@ -0,0 +1,132 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.optim as optim +import torchvision +import torchvision.transforms as transforms +from net import Net + +# (1) import nvflare client API +import nvflare.client as flare + +# (optional) set a fix place so we don't need to download everytime +DATASET_PATH = "/tmp/nvflare/data" +# (optional) We change to use GPU to speed things up. +# if you want to use CPU, change DEVICE="cpu" +DEVICE = "cuda:0" + + +def main(): + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + + batch_size = 4 + epochs = 2 + + trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform) + trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2) + + testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform) + testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2) + + net = Net() + + # (2) initializes NVFlare client API + flare.init() + + while flare.is_running(): + # (3) receives FLModel from NVFlare + input_model = flare.receive() + print(f"current_round={input_model.current_round}") + + # (4) loads model from NVFlare + net.load_state_dict(input_model.params) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + + # (optional) use GPU to speed things up + net.to(DEVICE) + # (optional) calculate total steps + steps = epochs * len(trainloader) + for epoch in range(epochs): # loop over the dataset multiple times + + running_loss = 0.0 + for i, data in enumerate(trainloader, 0): + # get the inputs; data is a list of [inputs, labels] + # (optional) use GPU to speed things up + inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE) + + # zero the parameter gradients + optimizer.zero_grad() + + # forward + backward + optimize + outputs = net(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + # print statistics + running_loss += loss.item() + if i % 2000 == 1999: # print every 2000 mini-batches + print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}") + global_step = input_model.current_round * steps + epoch * len(trainloader) + i + + running_loss = 0.0 + + print("Finished Training") + + PATH = "./cifar_net.pth" + torch.save(net.state_dict(), PATH) + + # (5) wraps evaluation logic into a method to re-use for + # evaluation on both trained and received model + def evaluate(input_weights): + net = Net() + net.load_state_dict(input_weights) + # (optional) use GPU to speed things up + net.to(DEVICE) + + correct = 0 + total = 0 + # since we're not training, we don't need to calculate the gradients for our outputs + with torch.no_grad(): + for data in testloader: + # (optional) use GPU to speed things up + images, labels = data[0].to(DEVICE), data[1].to(DEVICE) + # calculate outputs by running images through the network + outputs = net(images) + # the class with the highest energy is what we choose as prediction + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + print(f"Accuracy of the network on the 10000 test images: {100 * correct // total} %") + return 100 * correct // total + + # (6) evaluate on received model for model selection + accuracy = evaluate(input_model.params) + # (7) construct trained FL model + output_model = flare.FLModel( + params=net.cpu().state_dict(), + metrics={"accuracy": accuracy}, + meta={"NUM_STEPS_CURRENT_ROUND": steps}, + ) + # (8) send model back to NVFlare + flare.send(output_model) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/net.py b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/net.py new file mode 100644 index 0000000000..47ac7e9589 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_cyclic/app/custom/net.py @@ -0,0 +1,37 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) # flatten all dimensions except batch + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x diff --git a/tests/integration_test/data/jobs/pt_client_api_cyclic/meta.conf b/tests/integration_test/data/jobs/pt_client_api_cyclic/meta.conf new file mode 100644 index 0000000000..f6ce5c1810 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_cyclic/meta.conf @@ -0,0 +1,11 @@ +{ + name = "client_api" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 1 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_client.conf new file mode 100644 index 0000000000..2d3d180531 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_client.conf @@ -0,0 +1,77 @@ +{ + format_version = 2 + app_script = "cifar10_fl.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "pytorch" + params_transfer_type = "DIFF" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 -u custom/{app_script} {app_config} " + launch_once = true + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + read_interval = 0.1 + } + } + { + id = "config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = [ + "metric_relay" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_server.conf new file mode 100644 index 0000000000..8245a2d527 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/config/config_fed_server.conf @@ -0,0 +1,62 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + model_class_path = "net.Net" + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 2 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + args { + model { + path = "{model_class_path}" + } + } + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHT_DIFF" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + { + id = "receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args { + events = [ + "fed.analytix_log_stats" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/cifar10_fl.py b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/cifar10_fl.py new file mode 100644 index 0000000000..25fc291e98 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/cifar10_fl.py @@ -0,0 +1,137 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.optim as optim +import torchvision +import torchvision.transforms as transforms +from net import Net + +# (1) import nvflare client API +import nvflare.client as flare + +# (optional) metrics +from nvflare.client.tracking import SummaryWriter + +# (optional) set a fix place so we don't need to download everytime +DATASET_PATH = "/tmp/nvflare/data" +# (optional) We change to use GPU to speed things up. +# if you want to use CPU, change DEVICE="cpu" +DEVICE = "cuda:0" + + +def main(): + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + + batch_size = 4 + epochs = 2 + + trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform) + trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2) + + testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform) + testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2) + + net = Net() + + # (2) initializes NVFlare client API + flare.init() + + summary_writer = SummaryWriter() + while flare.is_running(): + # (3) receives FLModel from NVFlare + input_model = flare.receive() + print(f"current_round={input_model.current_round}") + + # (4) loads model from NVFlare + net.load_state_dict(input_model.params) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + + # (optional) use GPU to speed things up + net.to(DEVICE) + # (optional) calculate total steps + steps = epochs * len(trainloader) + for epoch in range(epochs): # loop over the dataset multiple times + + running_loss = 0.0 + for i, data in enumerate(trainloader, 0): + # get the inputs; data is a list of [inputs, labels] + # (optional) use GPU to speed things up + inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE) + + # zero the parameter gradients + optimizer.zero_grad() + + # forward + backward + optimize + outputs = net(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + # print statistics + running_loss += loss.item() + if i % 2000 == 1999: # print every 2000 mini-batches + print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}") + global_step = input_model.current_round * steps + epoch * len(trainloader) + i + + summary_writer.add_scalar(tag="loss_for_each_batch", scalar=running_loss, global_step=global_step) + running_loss = 0.0 + + print("Finished Training") + + PATH = "./cifar_net.pth" + torch.save(net.state_dict(), PATH) + + # (5) wraps evaluation logic into a method to re-use for + # evaluation on both trained and received model + def evaluate(input_weights): + net = Net() + net.load_state_dict(input_weights) + # (optional) use GPU to speed things up + net.to(DEVICE) + + correct = 0 + total = 0 + # since we're not training, we don't need to calculate the gradients for our outputs + with torch.no_grad(): + for data in testloader: + # (optional) use GPU to speed things up + images, labels = data[0].to(DEVICE), data[1].to(DEVICE) + # calculate outputs by running images through the network + outputs = net(images) + # the class with the highest energy is what we choose as prediction + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + print(f"Accuracy of the network on the 10000 test images: {100 * correct // total} %") + return 100 * correct // total + + # (6) evaluate on received model for model selection + accuracy = evaluate(input_model.params) + # (7) construct trained FL model + output_model = flare.FLModel( + params=net.cpu().state_dict(), + metrics={"accuracy": accuracy}, + meta={"NUM_STEPS_CURRENT_ROUND": steps}, + ) + # (8) send model back to NVFlare + flare.send(output_model) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/net.py b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/net.py new file mode 100644 index 0000000000..47ac7e9589 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_launch_once/app/custom/net.py @@ -0,0 +1,37 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) # flatten all dimensions except batch + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x diff --git a/tests/integration_test/data/jobs/pt_client_api_launch_once/meta.conf b/tests/integration_test/data/jobs/pt_client_api_launch_once/meta.conf new file mode 100644 index 0000000000..479de325a2 --- /dev/null +++ b/tests/integration_test/data/jobs/pt_client_api_launch_once/meta.conf @@ -0,0 +1,11 @@ +{ + name = "client_api" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_client.conf new file mode 100644 index 0000000000..5b41e42a04 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_client.conf @@ -0,0 +1,120 @@ +{ + # version of the configuration + format_version = 2 + + # This is the application script which will be invoked. Client can replace this script with user's own training script. + app_script = "train.py" + + # Additional arguments needed by the training code. For example, in lightning, these can be --trainer.batch_size=xxx. + app_config = "" + + # Client Computing Executors. + executors = [ + { + # tasks the executors are defined to handle + tasks = ["train"] + + # This particular executor + executor { + + # This is an executor for Client API. The underline data exchange is using Pipe. + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + + args { + # launcher_id is used to locate the Launcher object in "components" + launcher_id = "launcher" + + # pipe_id is used to locate the Pipe object in "components" + pipe_id = "pipe" + + # Timeout in seconds for waiting for a heartbeat from the training script. Defaults to 30 seconds. + # Please refer to the class docstring for all available arguments + heartbeat_timeout = 10 + + peer_read_timeout = 120 + + external_pre_init_timeout = 120 + + # format of the exchange parameters + params_exchange_format = "pytorch" + + # if the transfer_type is FULL, then it will be sent directly + # if the transfer_type is DIFF, then we will calculate the + # difference VS received parameters and send the difference + params_transfer_type = "DIFF" + + # if train_with_evaluation is true, the executor will expect + # the custom code need to send back both the trained parameters and the evaluation metric + # otherwise only trained parameters are expected + train_with_evaluation = true + } + } + } + ], + + # this defined an array of task data filters. If provided, it will control the data from server controller to client executor + task_data_filters = [] + + # this defined an array of task result filters. If provided, it will control the result from client executor to server controller + task_result_filters = [] + + components = [ + { + # component id is "launcher" + id = "launcher" + + # the class path of this component + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + + args { + # the launcher will invoke the script + script = "python3 -u custom/{app_script} {app_config} " + # if launch_once is true, the SubprocessLauncher will launch once for the whole job + # if launch_once is false, the SubprocessLauncher will launch a process for each task it receives from server + launch_once = false + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + }, + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + # how fast should it read from the peer + read_interval = 0.1 + } + }, + { + # we use this component so the client api `flare.init()` can get required information + id = "config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = ["metric_relay"] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_server.conf new file mode 100644 index 0000000000..da5b075533 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4558419/app/config/config_fed_server.conf @@ -0,0 +1,106 @@ +{ + # version of the configuration + format_version = 2 + + # task data filter: if filters are provided, the filter will filter the data flow out of server to client. + task_data_filters =[] + + # task result filter: if filters are provided, the filter will filter the result flow out of client to server. + task_result_filters = [] + + # This assumes that there will be a "net.py" file with class name "Net". + # If your model code is not in "net.py" and class name is not "Net", please modify here + model_class_path = "net.Net" + + # workflows: Array of workflows the control the Federated Learning workflow lifecycle. + # One can specify multiple workflows. The NVFLARE will run them in the order specified. + workflows = [ + { + # 1st workflow" + id = "scatter_and_gather" + + # name = ScatterAndGather, path is the class path of the ScatterAndGather controller. + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + # argument of the ScatterAndGather class. + # min number of clients required for ScatterAndGather controller to move to the next round + # during the workflow cycle. The controller will wait until the min_clients returned from clients + # before move to the next step. + min_clients = 2 + + # number of global round of the training. + num_rounds = 2 + + # starting round is 0-based + start_round = 0 + + # after received min number of clients' result, + # how much time should we wait further before move to the next step + wait_time_after_min_received = 0 + + # For ScatterAndGather, the server will aggregate the weights based on the client's result. + # the aggregator component id is named here. One can use the this ID to find the corresponding + # aggregator component listed below + aggregator_id = "aggregator" + + # The Scatter and Gather controller use an persistor to load the model and save the model. + # The persistent component can be identified by component ID specified here. + persistor_id = "persistor" + + # Shareable to a communication message, i.e. shared between clients and server. + # Shareable generator is a component that responsible to take the model convert to/from this communication message: Shareable. + # The component can be identified via "shareable_generator_id" + shareable_generator_id = "shareable_generator" + + # train task name: client side needs to have an executor that handles this task + train_task_name = "train" + + # train timeout in second. If zero, meaning no timeout. + train_timeout = 0 + } + } + ] + + # List of components used in the server side workflow. + components = [ + { + # This is the persistence component used in above workflow. + # PTFileModelPersistor is a Pytorch persistor which save/read the model to/from file. + + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + + # the persitor class take model class as argument + # This imply that the model is initialized from the server-side. + # The initialized model will be broadcast to all the clients to start the training. + args.model.path = "{model_class_path}" + }, + { + # This is the generator that convert the model to shareable communication message structure used in workflow + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args = {} + }, + { + # This is the aggregator that perform the weighted average aggregation. + # the aggregation is "in-time", so it doesn't wait for client results, but aggregates as soon as it received the data. + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args.expected_data_kind = "WEIGHT_DIFF" + }, + { + # This component is not directly used in Workflow. + # it select the best model based on the incoming global validation metrics. + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + # need to make sure this "key_metric" match what server side received + args.key_metric = "accuracy" + }, + { + id = "receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args.events = ["fed.analytix_log_stats"] + } + ] + +} diff --git a/tests/integration_test/data/jobs/qa_job_4558419/app/custom/net.py b/tests/integration_test/data/jobs/qa_job_4558419/app/custom/net.py new file mode 100644 index 0000000000..6d20a6a783 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4558419/app/custom/net.py @@ -0,0 +1,25 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch.nn as nn + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.fc1 = nn.Linear(10**3, 10**5) + + def forward(self, x): + x = self.fc1(x) + return x diff --git a/tests/integration_test/data/jobs/qa_job_4558419/app/custom/train.py b/tests/integration_test/data/jobs/qa_job_4558419/app/custom/train.py new file mode 100644 index 0000000000..64c990e787 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4558419/app/custom/train.py @@ -0,0 +1,68 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +import logging +import random +import re +from datetime import datetime + +import torch + +import nvflare.client as flare + + +def evaluate(data): + + t = time.time() / 1e10 + print(f"fake evaluate data: {data}") + print( + f"fake evaluate result: {t}, generated at {datetime.utcfromtimestamp(t * 1e10).strftime('%Y-%m-%d %H:%M:%S')}" + ) + return t + + +def main(): + flare.init() + input_model = flare.receive() + + print("@@@ input_model: ", input_model) + round_num = input_model.current_round + print("@@@ Round number in this round: ", round_num) + + site_name = input_model.meta.get("site_name") + multiplier = re.search(r"\d+", site_name).group() + print("@@@ site_name: ", site_name) + + start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + if input_model.current_round == 0: + weight = torch.zeros([10**5, 10**3], dtype=torch.float32) + bias = torch.zeros([10**5], dtype=torch.float32) + else: + weight = input_model.params.get("fc1.weight") + bias = input_model.params.get("fc1.bias") + + zzz = random.uniform(1.0, 3.0) + print("@@@ Sleep " + str(zzz)) + time.sleep(zzz) + + weight = torch.add(weight, 1) * int(multiplier) + bias = torch.add(bias, 1) * int(multiplier) + + end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logging.info("Finished Training") + + params = {"fc1.weight": weight, "fc1.bias": bias} + + accuracy = evaluate(params) + + output_model = flare.FLModel( + params=params, + metrics={"accuracy": accuracy}, + meta={"NUM_STEPS_CURRENT_ROUND": 2, "start": start_time, "end": end_time}, + ) + + print("@@@ output_model: ", output_model) + flare.send(output_model) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/qa_job_4558419/meta.conf b/tests/integration_test/data/jobs/qa_job_4558419/meta.conf new file mode 100644 index 0000000000..23c6c60e78 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4558419/meta.conf @@ -0,0 +1,11 @@ +{ + name = "qa_job_4558419" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_client.conf new file mode 100644 index 0000000000..9dd88b904c --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_client.conf @@ -0,0 +1,77 @@ +{ + format_version = 2 + app_script = "poc_executor.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "pytorch" + params_transfer_type = "DIFF" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 -u custom/{app_script} {app_config} " + launch_once = false + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + read_interval = 0.1 + } + } + { + id = "config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = [ + "metric_relay" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_server.conf new file mode 100644 index 0000000000..98ca4a26ac --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561583/app/config/config_fed_server.conf @@ -0,0 +1,62 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + model_class_path = "pl_net.PlNet" + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 2 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + args { + model { + path = "{model_class_path}" + } + } + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHT_DIFF" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "val_acc_epoch" + } + } + { + id = "receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args { + events = [ + "fed.analytix_log_stats" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/qa_job_4561583/app/custom/net.py b/tests/integration_test/data/jobs/qa_job_4561583/app/custom/net.py new file mode 100644 index 0000000000..1d7d0bd123 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561583/app/custom/net.py @@ -0,0 +1,25 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch.nn as nn + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.fc1 = nn.Linear(10, 5) + + def forward(self, x): + x = self.fc1(x) + return x diff --git a/tests/integration_test/data/jobs/qa_job_4561583/app/custom/pl_net.py b/tests/integration_test/data/jobs/qa_job_4561583/app/custom/pl_net.py new file mode 100644 index 0000000000..43eb716532 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561583/app/custom/pl_net.py @@ -0,0 +1,66 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +import random +import re +import time + +import lightning as L +import net +import torch +from torch import nn, optim + + +class PlNet(L.LightningModule): + def __init__(self): + super().__init__() + self.model = net.Net() + + self.site_name = "site-0" + self.current_round = 0 + + def training_step(self, batch, batch_idx): + print(f"@@@ {self.site_name}: batch: {batch}, batch idx: {batch_idx}") + + # Set fixed model's weight and bias by site num & round num + if self.current_round == 0: + weight = torch.zeros([5, 10], dtype=torch.float32) + bias = torch.zeros([5], dtype=torch.float32) + else: + print(f"@@@ {self.site_name} self.model.state_dict(): {self.model.state_dict()}") + weight = self.model.state_dict().get("fc1.weight") + bias = self.model.state_dict().get("fc1.bias") + + multiplier = re.search(r"\d+", self.site_name).group() + print(f"@@@ {self.site_name}: multiplier: {multiplier}") + + weight = torch.add(weight, 1) * int(multiplier) + bias = torch.add(bias, 1) * int(multiplier) + self.model.load_state_dict( + { + "fc1.weight": weight, + "fc1.bias": bias, + } + ) + + # Fake training steps, to adapt pytorch lightning framework + x = batch + y = torch.tensor([1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]) + + x = x.view(x.size(0), -1) + y = y.view(1, -1) + + # sleep to simulate training time elapsed + zzz = random.uniform(1.0, 3.0) + print(f"@@@ {self.site_name}: Sleep {zzz}") + time.sleep(zzz) + + loss = nn.functional.mse_loss(x, y) + loss.requires_grad_(True) + return loss + + def validation_step(self, batch, batch_idx): + t = time.time() / 1e10 + return torch.tensor([t]) + + def configure_optimizers(self): + optimizer = optim.Adam(self.parameters(), lr=1e-3) + return optimizer diff --git a/tests/integration_test/data/jobs/qa_job_4561583/app/custom/poc_executor.py b/tests/integration_test/data/jobs/qa_job_4561583/app/custom/poc_executor.py new file mode 100644 index 0000000000..2191ee3490 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561583/app/custom/poc_executor.py @@ -0,0 +1,41 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + +import lightning as L +import pl_net +import torch +from torch import utils + +import nvflare.client.lightning as flare + + +def main(): + + plnet = pl_net.PlNet() + dataset = torch.tensor( + [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0], [4.0, 3.0, 2.0, 1.0, 2.0, 5.0, 6.0, 2.0, 1.0, 32.0]] + ) + train_loader = utils.data.DataLoader(dataset) + trainer = L.Trainer(limit_train_batches=1, max_epochs=1, accelerator="cpu") + + for i in range(2): + flare.patch(trainer) + print(f"@@@ length of cb: {len(trainer.callbacks)}") + + site_name = flare.get_site_name() + print(f"@@@ site_name: {site_name}") + + while flare.is_running(): + # flare.receive() called for getting current_round information + input_model = flare.receive() + if input_model: + print(f"@@@ {site_name}: current_round={input_model.current_round}") + plnet.current_round = input_model.current_round + plnet.site_name = site_name + # Test the patch for validate and fit + trainer.validate(plnet, train_loader) + trainer.fit(plnet, train_loader) + print(f"@@@ {site_name} param: {plnet.state_dict()}") + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/qa_job_4561583/meta.conf b/tests/integration_test/data/jobs/qa_job_4561583/meta.conf new file mode 100644 index 0000000000..556f3e31c2 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561583/meta.conf @@ -0,0 +1,11 @@ +{ + name = "qa_job_4561583" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_client.conf new file mode 100644 index 0000000000..1380e17ae1 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_client.conf @@ -0,0 +1,144 @@ +format_version = 2 +# This is the application script which will be invoked. Client can replace this script with user's own training script. +app_script = "train.py" +# Additional arguments needed by the training code. +app_config = "" +# Client Computing Executors. +executors = [ + { + # tasks the executors are defined to handle + tasks = [ + "train", + "validate", + "submit_model" + ] + executor { + id = "Executor" + # Executor name : PTClientAPILauncherExecutor + # This is an executor for pytorch + Client API. The underline data exchange is using Pipe. + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + args { + # launcher_id is used to locate the Launcher object in "components" + launcher_id = "launcher" + # pipe_id is used to locate the Pipe object in "components" + pipe_id = "pipe" + # Timeout in seconds for waiting for a heartbeat from the training script. Defaults to 30 seconds. + # Please refer to the class docstring for all available arguments + heartbeat_timeout = 60 + # if the transfer_type is FULL, then it will be sent directly + # if the transfer_type is DIFF, then we will calculate the + # difference VS received parameters and send the difference + params_transfer_type = "FULL" + # if train_with_evaluation is true, the executor will expect + # the custom code need to send back both the trained parameters and the evaluation metric + # otherwise only trained parameters are expected + train_with_evaluation = true + # tasks for different modes + train_task_name = "train" + evaluate_task_name = "validate" + submit_model_task_name = "submit_model" + } + } + } + { + # All tasks prefixed with swarm_ are routed to SwarmClientController + tasks = ["swarm_*"] + executor { + # client-side controller for training and logic and aggregation management + path = "nvflare.app_common.ccwf.SwarmClientController" + args { + # train task must be implemented by Executor + learn_task_name = "train" + # how long to wait for current learn task before timing out the gathering + learn_task_timeout = 600 + # ids must map to corresponding components + persistor_id = "persistor" + aggregator_id = "aggregator" + shareable_generator_id = "shareable_generator" + min_responses_required = 2 + wait_time_after_min_resps_received = 30 + } + } + } + { + # All tasks prefixed with cse_ are routed to CrossSiteEvalClientController + tasks = ["cse_*"] + executor { + # client-side controller for cse + path = "nvflare.app_common.ccwf.CrossSiteEvalClientController" + args { + # submit_model and validate tasks must be implemented by Executor + submit_model_task_name = "submit_model" + validation_task_name = "validate" + # persistor id must map to corresponding persistor component + persistor_id = "persistor" + get_model_timeout = 60 + } + } + } +] +task_result_filters = [] +task_data_filters = [] +components = [ + { + # component id is "launcher" + id = "launcher" + + # the class path of this component + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + + args { + # the launcher will invoke the script + script = "python3 -u custom/{app_script} {app_config} " + # if launch_once is true, the SubprocessLauncher will launch once for the whole job + # if launch_once is false, the SubprocessLauncher will launch a process for each task it receives from server + launch_once = true + } + } + { + id = "pipe" + + path = "nvflare.fuel.utils.pipe.file_pipe.FilePipe" + + args { + mode = "PASSIVE" + # root_path: is the directory location of the parameters exchange. + # You can also set it to an absolute path in your system. + root_path = "{WORKSPACE}/{JOB_ID}/{SITE_NAME}" + } + } + { + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + args { + model { + path = "net.Net" + } + } + } + { + id = "shareable_generator" + path = "nvflare.app_common.ccwf.comps.simple_model_shareable_generator.SimpleModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHTS" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + # prints best results once executor is finalized + { + id = "result_printer" + path = "nvflare.app_common.ccwf.comps.cwe_result_printer.CWEResultPrinter" + args {} + } +] diff --git a/tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_server.conf new file mode 100644 index 0000000000..548e7d6655 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561872/app/config/config_fed_server.conf @@ -0,0 +1,33 @@ +format_version = 2 +task_data_filters = [] +task_result_filters = [] +components = [ + { + # write validation results to json file + id = "json_generator" + path = "nvflare.app_common.widgets.validation_json_generator.ValidationJsonGenerator" + args {} + } +] +workflows = [ + { + # server-side controller to manage job life cycle + id = "swarm_controller" + path = "nvflare.app_common.ccwf.SwarmServerController" + args { + # can also set aggregation clients and train clients, see class for all available args + num_rounds = 3, + aggr_clients = ["site-1"], + train_clients = ["site-2", "site-3"] + } + } + { + # server-side controller to manage configuration and evaluation workflow + id = "cross_site_eval" + path = "nvflare.app_common.ccwf.CrossSiteEvalServerController" + args { + # can also set evaluators and evaluatees, see class for all available args + eval_task_timeout = 300 + } + } +] diff --git a/tests/integration_test/data/jobs/qa_job_4561872/app/custom/net.py b/tests/integration_test/data/jobs/qa_job_4561872/app/custom/net.py new file mode 100644 index 0000000000..47ac7e9589 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561872/app/custom/net.py @@ -0,0 +1,37 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) # flatten all dimensions except batch + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x diff --git a/tests/integration_test/data/jobs/qa_job_4561872/app/custom/train.py b/tests/integration_test/data/jobs/qa_job_4561872/app/custom/train.py new file mode 100644 index 0000000000..566471d0cb --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561872/app/custom/train.py @@ -0,0 +1,188 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +import torch +import torch.nn as nn +import torch.optim as optim +import torchvision +import torchvision.transforms as transforms +from net import Net + +# (1) import nvflare client API +import nvflare.client as flare +from nvflare.app_common.app_constant import ModelName + +# (optional) set a fix place so we don't need to download everytime +CIFAR10_ROOT = "/tmp/nvflare/data" +# (optional) We change to use GPU to speed things up. +# if you want to use CPU, change DEVICE="cpu" +DEVICE = "cuda:0" + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("--dataset_path", type=str, default=CIFAR10_ROOT, nargs="?") + parser.add_argument("--batch_size", type=int, default=4, nargs="?") + parser.add_argument("--num_workers", type=int, default=1, nargs="?") + parser.add_argument("--local_epochs", type=int, default=2, nargs="?") + parser.add_argument("--model_path", type=str, default=f"{CIFAR10_ROOT}/cifar_net.pth", nargs="?") + return parser.parse_args() + + +def main(): + # define local parameters + args = define_parser() + + dataset_path = args.dataset_path + batch_size = args.batch_size + num_workers = args.num_workers + local_epochs = args.local_epochs + model_path = args.model_path + + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + trainset = torchvision.datasets.CIFAR10(root=dataset_path, train=True, download=True, transform=transform) + trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers) + testset = torchvision.datasets.CIFAR10(root=dataset_path, train=False, download=True, transform=transform) + testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers) + + net = Net() + best_accuracy = 0.0 + + # wraps evaluation logic into a method to re-use for + # evaluation on both trained and received model + def evaluate(input_weights): + net = Net() + net.load_state_dict(input_weights) + # (optional) use GPU to speed things up + net.to(DEVICE) + + correct = 0 + total = 0 + # since we're not training, we don't need to calculate the gradients for our outputs + with torch.no_grad(): + for data in testloader: + # (optional) use GPU to speed things up + images, labels = data[0].to(DEVICE), data[1].to(DEVICE) + # calculate outputs by running images through the network + outputs = net(images) + # the class with the highest energy is what we choose as prediction + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + return 100 * correct // total + + # (2) initialize NVFlare client API + flare.init() + + # (3) run continously when launch_once=true + while flare.is_running(): + + # (4) receive FLModel from NVFlare + input_model = flare.receive() + client_id = flare.get_site_name() + + # Based on different "task" we will do different things + # for "train" task (flare.is_train()) we use the received model to do training and/or evaluation + # and send back updated model and/or evaluation metrics, if the "train_with_evaluation" is specified as True + # in the config_fed_client we will need to do evaluation and include the evaluation metrics + # for "evaluate" task (flare.is_evaluate()) we use the received model to do evaluation + # and send back the evaluation metrics + # for "submit_model" task (flare.is_submit_model()) we just need to send back the local model + # (5) performing train task on received model + if flare.is_train(): + print(f"({client_id}) current_round={input_model.current_round}, total_rounds={input_model.total_rounds}") + + # (5.1) loads model from NVFlare + net.load_state_dict(input_model.params) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + + # (optional) use GPU to speed things up + net.to(DEVICE) + # (optional) calculate total steps + steps = local_epochs * len(trainloader) + for epoch in range(local_epochs): # loop over the dataset multiple times + + running_loss = 0.0 + for i, data in enumerate(trainloader, 0): + # get the inputs; data is a list of [inputs, labels] + # (optional) use GPU to speed things up + inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE) + + # zero the parameter gradients + optimizer.zero_grad() + + # forward + backward + optimize + outputs = net(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + # print statistics + running_loss += loss.item() + if i % 2000 == 1999: # print every 2000 mini-batches + print(f"({client_id}) [{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}") + running_loss = 0.0 + + print(f"({client_id}) Finished Training") + + # (5.2) evaluation on local trained model to save best model + local_accuracy = evaluate(net.state_dict()) + print(f"({client_id}) Evaluating local trained model. Accuracy on the 10000 test images: {local_accuracy}") + if local_accuracy > best_accuracy: + best_accuracy = local_accuracy + torch.save(net.state_dict(), model_path) + + # (5.3) evaluate on received model for model selection + accuracy = evaluate(input_model.params) + print( + f"({client_id}) Evaluating received model for model selection. Accuracy on the 10000 test images: {accuracy}" + ) + + # (5.4) construct trained FL model + output_model = flare.FLModel( + params=net.cpu().state_dict(), + metrics={"accuracy": accuracy}, + meta={"NUM_STEPS_CURRENT_ROUND": steps}, + ) + + # (5.5) send model back to NVFlare + flare.send(output_model) + + # (6) performing evaluate task on received model + elif flare.is_evaluate(): + accuracy = evaluate(input_model.params) + flare.send(flare.FLModel(metrics={"accuracy": accuracy})) + + # (7) performing submit_model task to obtain best local model + elif flare.is_submit_model(): + model_name = input_model.meta["submit_model_name"] + if model_name == ModelName.BEST_MODEL: + try: + weights = torch.load(model_path) + net = Net() + net.load_state_dict(weights) + flare.send(flare.FLModel(params=net.cpu().state_dict())) + except Exception as e: + raise ValueError("Unable to load best model") from e + else: + raise ValueError(f"Unknown model_type: {model_name}") + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/qa_job_4561872/meta.conf b/tests/integration_test/data/jobs/qa_job_4561872/meta.conf new file mode 100644 index 0000000000..9af81513bc --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4561872/meta.conf @@ -0,0 +1,11 @@ +{ + name = "swarm_cse_pt" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 3 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_client.conf b/tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_client.conf new file mode 100644 index 0000000000..9dd88b904c --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_client.conf @@ -0,0 +1,77 @@ +{ + format_version = 2 + app_script = "poc_executor.py" + app_config = "" + executors = [ + { + tasks = [ + "train" + ] + executor { + path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + args { + launcher_id = "launcher" + pipe_id = "pipe" + heartbeat_timeout = 60 + params_exchange_format = "pytorch" + params_transfer_type = "DIFF" + train_with_evaluation = true + } + } + } + ] + task_data_filters = [] + task_result_filters = [] + components = [ + { + id = "launcher" + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + args { + script = "python3 -u custom/{app_script} {app_config} " + launch_once = false + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + read_interval = 0.1 + } + } + { + id = "config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = [ + "metric_relay" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_server.conf b/tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_server.conf new file mode 100644 index 0000000000..8245a2d527 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4592780/app/config/config_fed_server.conf @@ -0,0 +1,62 @@ +{ + format_version = 2 + task_data_filters = [] + task_result_filters = [] + model_class_path = "net.Net" + workflows = [ + { + id = "scatter_and_gather" + path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather" + args { + min_clients = 2 + num_rounds = 2 + start_round = 0 + wait_time_after_min_received = 0 + aggregator_id = "aggregator" + persistor_id = "persistor" + shareable_generator_id = "shareable_generator" + train_task_name = "train" + train_timeout = 0 + } + } + ] + components = [ + { + id = "persistor" + path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor" + args { + model { + path = "{model_class_path}" + } + } + } + { + id = "shareable_generator" + path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator" + args {} + } + { + id = "aggregator" + path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator" + args { + expected_data_kind = "WEIGHT_DIFF" + } + } + { + id = "model_selector" + path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector" + args { + key_metric = "accuracy" + } + } + { + id = "receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args { + events = [ + "fed.analytix_log_stats" + ] + } + } + ] +} diff --git a/tests/integration_test/data/jobs/qa_job_4592780/app/custom/net.py b/tests/integration_test/data/jobs/qa_job_4592780/app/custom/net.py new file mode 100644 index 0000000000..1d7d0bd123 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4592780/app/custom/net.py @@ -0,0 +1,25 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch.nn as nn + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.fc1 = nn.Linear(10, 5) + + def forward(self, x): + x = self.fc1(x) + return x diff --git a/tests/integration_test/data/jobs/qa_job_4592780/app/custom/poc_executor.py b/tests/integration_test/data/jobs/qa_job_4592780/app/custom/poc_executor.py new file mode 100644 index 0000000000..6852b72e38 --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4592780/app/custom/poc_executor.py @@ -0,0 +1,78 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +import logging +import random +import re +from datetime import datetime + +import torch + +# Client API +import nvflare.client as flare + +logger = logging.getLogger("POCExecutor") + + +def main(): + + flare.init() + input_model = flare.receive() + + print("@@@ input_model: ", input_model) + round_num = input_model.current_round + print("@@@ Round number in this round: ", round_num) + + # 'site-2' + site_name = input_model.meta.get("site_name") + multiplier = re.search(r"\d+", site_name).group() + print("@@@ site_name: ", site_name) + + start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # data shape + if input_model.current_round == 0: + # Not involving init random weights + weight = torch.zeros([5, 10], dtype=torch.float32) + bias = torch.zeros([5], dtype=torch.float32) + else: + weight = input_model.params.get("fc1.weight") + bias = input_model.params.get("fc1.bias") + + # do the job + zzz = random.uniform(1.0, 3.0) + print("@@@ Sleep " + str(zzz)) + time.sleep(zzz) + + weight = torch.add(weight, 1) * int(multiplier) + bias = torch.add(bias, 1) * int(multiplier) + + end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info("Finished Training") + + params = { + "fc1.weight": weight, + "fc1.bias": bias, + } + + def evaluate(data): + # between 0-1, increase with time + t = time.time() / 1e10 + print(f"fake evaluate data: {data}") + print( + f"fake evaluate result: {t}, generated at {datetime.utcfromtimestamp(t * 1e10).strftime('%Y-%m-%d %H:%M:%S')}" + ) + return t + + accuracy = evaluate(params) + + output_model = flare.FLModel( + params=params, + metrics={"accuracy": accuracy}, + meta={"NUM_STEPS_CURRENT_ROUND": 2, "start": start_time, "end": end_time}, + ) + + print("@@@ output_model: ", output_model) + flare.send(output_model) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_test/data/jobs/qa_job_4592780/meta.conf b/tests/integration_test/data/jobs/qa_job_4592780/meta.conf new file mode 100644 index 0000000000..55a75fcabb --- /dev/null +++ b/tests/integration_test/data/jobs/qa_job_4592780/meta.conf @@ -0,0 +1,11 @@ +{ + name = "qa_job_4582780" + resource_spec {} + deploy_map { + app = [ + "@ALL" + ] + } + min_clients = 2 + mandatory_clients = [] +} diff --git a/tests/integration_test/data/test_configs/standalone_job/client_api.yml b/tests/integration_test/data/test_configs/standalone_job/client_api.yml new file mode 100644 index 0000000000..c8c030a79b --- /dev/null +++ b/tests/integration_test/data/test_configs/standalone_job/client_api.yml @@ -0,0 +1,148 @@ +n_servers: 1 +n_clients: 2 +jobs_root_dir: ./data/jobs +cleanup: True + + +tests: + - test_name: "run np-loop" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job np_loop" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + - test_name: "run np-loop-cell-pipe" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job np_loop_cell_pipe" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + - test_name: "run np-metrics" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job np_metrics" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + - test_name: "run pt-client-api" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job pt_client_api" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run pt-client-api-launch-once" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job pt_client_api_launch_once" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run pt-client-api-cyclic" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job pt_client_api_cyclic" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run pt-decorator" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job decorator" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run lightning" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job lightning" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -m pip install pytorch_lightning + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data diff --git a/tests/integration_test/data/test_configs/standalone_job/client_api_qa.yml b/tests/integration_test/data/test_configs/standalone_job/client_api_qa.yml new file mode 100644 index 0000000000..166f41b211 --- /dev/null +++ b/tests/integration_test/data/test_configs/standalone_job/client_api_qa.yml @@ -0,0 +1,102 @@ +n_servers: 1 +n_clients: 3 +jobs_root_dir: ./data/jobs +cleanup: True + + +tests: + - test_name: "run qa_job_4561872" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job qa_job_4561872" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run qa_job_4558419" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job qa_job_4558419" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run qa_job_4561583" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job qa_job_4561583" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run qa_job_4561872" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job qa_job_4561872" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data + - test_name: "run qa_job_4592780" + event_sequence: + - "trigger": + "type": "server_log" + "data": "Server started" + "actions": [ "submit_job qa_job_4592780" ] + "result": + "type": "job_submit_success" + - "trigger": + "type": "run_state" + "data": { "run_finished": True } + "actions": [ "ensure_current_job_done" ] + "result": + "type": "run_state" + "data": { "run_finished": True } + setup: + - python -c "from torchvision.datasets import CIFAR10; CIFAR10(root='/tmp/nvflare/data', train=True, download=True)" + teardown: + - rm -rf /tmp/nvflare/data diff --git a/tests/integration_test/run_integration_tests.sh b/tests/integration_test/run_integration_tests.sh index 6f8e68a981..dd38618796 100755 --- a/tests/integration_test/run_integration_tests.sh +++ b/tests/integration_test/run_integration_tests.sh @@ -3,7 +3,7 @@ set -e PYTHONPATH="${PWD}/../.." -backends=(numpy tensorflow pytorch overseer ha auth preflight cifar auto stats) +backends=(numpy tensorflow pytorch overseer ha auth preflight cifar auto stats xgboost client_api client_api_qa) usage() { diff --git a/tests/integration_test/test_configs.yml b/tests/integration_test/test_configs.yml index d75f59a784..55e710620c 100644 --- a/tests/integration_test/test_configs.yml +++ b/tests/integration_test/test_configs.yml @@ -32,3 +32,7 @@ test_configs: xgboost: - ./data/test_configs/standalone_job/xgb_histogram_examples.yml - ./data/test_configs/standalone_job/xgb_tree_examples.yml + client_api: + - ./data/test_configs/standalone_job/client_api.yml + client_api_qa: + - ./data/test_configs/standalone_job/client_api_qa.yml