Skip to content

Commit

Permalink
refactor(framework) Stop passing working directory to simulation back…
Browse files Browse the repository at this point in the history
…ends (#3954)
  • Loading branch information
jafermarq authored and chongshenng committed Aug 13, 2024
1 parent e40afe1 commit 0a750be
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/py/flwr/server/superlink/fleet/vce/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class Backend(ABC):
"""Abstract base class for a Simulation Engine Backend."""

def __init__(self, backend_config: BackendConfig, work_dir: str) -> None:
def __init__(self, backend_config: BackendConfig) -> None:
"""Construct a backend."""

@abstractmethod
Expand Down
39 changes: 4 additions & 35 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
# ==============================================================================
"""Ray backend for the Fleet API using the Simulation Engine."""

import pathlib
from logging import DEBUG, ERROR
from typing import Callable, Dict, List, Tuple, Union
from typing import Callable, Dict, Tuple, Union

import ray

Expand All @@ -33,7 +32,6 @@

ClientResourcesDict = Dict[str, Union[int, float]]
ActorArgsDict = Dict[str, Union[int, float, Callable[[], None]]]
RunTimeEnvDict = Dict[str, Union[str, List[str]]]


class RayBackend(Backend):
Expand All @@ -42,18 +40,14 @@ class RayBackend(Backend):
def __init__(
self,
backend_config: BackendConfig,
work_dir: str,
) -> None:
"""Prepare RayBackend by initialising Ray and creating the ActorPool."""
log(DEBUG, "Initialising: %s", self.__class__.__name__)
log(DEBUG, "Backend config: %s", backend_config)

if not pathlib.Path(work_dir).exists():
raise ValueError(f"Specified work_dir {work_dir} does not exist.")

# Initialise ray
self.init_args_key = "init_args"
self.init_ray(backend_config, work_dir)
self.init_ray(backend_config)

# Validate client resources
self.client_resources_key = "client_resources"
Expand All @@ -68,23 +62,6 @@ def __init__(
actor_kwargs=actor_kwargs,
)

def _configure_runtime_env(self, work_dir: str) -> RunTimeEnvDict:
"""Return list of files/subdirectories to exclude relative to work_dir.
Without this, Ray will push everything to the Ray Cluster.
"""
runtime_env: RunTimeEnvDict = {"working_dir": work_dir}

excludes = []
path = pathlib.Path(work_dir)
for p in path.rglob("*"):
# Exclude files need to be relative to the working_dir
if p.is_file() and not str(p).endswith(".py"):
excludes.append(str(p.relative_to(path)))
runtime_env["excludes"] = excludes

return runtime_env

def _validate_client_resources(self, config: BackendConfig) -> ClientResourcesDict:
client_resources_config = config.get(self.client_resources_key)
client_resources: ClientResourcesDict = {}
Expand Down Expand Up @@ -123,26 +100,18 @@ def _validate_actor_arguments(self, config: BackendConfig) -> ActorArgsDict:
actor_args["on_actor_init_fn"] = enable_tf_gpu_growth
return actor_args

def init_ray(self, backend_config: BackendConfig, work_dir: str) -> None:
def init_ray(self, backend_config: BackendConfig) -> None:
"""Intialises Ray if not already initialised."""
if not ray.is_initialized():
# Init ray and append working dir if needed
runtime_env = (
self._configure_runtime_env(work_dir=work_dir) if work_dir else None
)

ray_init_args: Dict[
str,
Union[ConfigsRecordValues, RunTimeEnvDict],
ConfigsRecordValues,
] = {}

if backend_config.get(self.init_args_key):
for k, v in backend_config[self.init_args_key].items():
ray_init_args[k] = v

if runtime_env is not None:
ray_init_args["runtime_env"] = runtime_env

ray.init(**ray_init_args)

@property
Expand Down
60 changes: 4 additions & 56 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
"""Test for Ray backend for the Fleet API using the Simulation Engine."""

from math import pi
from pathlib import Path
from typing import Callable, Dict, Optional, Tuple, Union
from unittest import TestCase

import ray

from flwr.client import Client, NumPyClient
from flwr.client.client_app import ClientApp, LoadClientAppError
from flwr.client.client_app import ClientApp
from flwr.client.node_state import NodeState
from flwr.common import (
DEFAULT_TTL,
Expand All @@ -36,7 +35,6 @@
Scalar,
)
from flwr.common.constant import PARTITION_ID_KEY
from flwr.common.object_ref import load_app
from flwr.common.recordset_compat import getpropertiesins_to_recordset
from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig
from flwr.server.superlink.fleet.vce.backend.raybackend import RayBackend
Expand All @@ -63,25 +61,6 @@ def _load_app() -> ClientApp:
return ClientApp(client_fn=get_dummy_client)


client_app = ClientApp(
client_fn=get_dummy_client,
)


def _load_from_module(client_app_module_name: str) -> Callable[[], ClientApp]:
def _load_app() -> ClientApp:
app = load_app(client_app_module_name, LoadClientAppError)

if not isinstance(app, ClientApp):
raise LoadClientAppError(
f"Attribute {client_app_module_name} is not of type {ClientApp}",
) from None

return app

return _load_app


def backend_build_process_and_termination(
backend: RayBackend,
process_args: Optional[Tuple[Callable[[], ClientApp], Message, Context]] = None,
Expand Down Expand Up @@ -140,16 +119,15 @@ def doCleanups(self) -> None:

def test_backend_creation_and_termination(self) -> None:
"""Test creation of RayBackend and its termination."""
backend = RayBackend(backend_config={}, work_dir="")
backend = RayBackend(backend_config={})
backend_build_process_and_termination(backend=backend, process_args=None)

def test_backend_creation_submit_and_termination(
self,
client_app_loader: Callable[[], ClientApp] = _load_app,
workdir: str = "",
) -> None:
"""Test submitting a message to a given ClientApp."""
backend = RayBackend(backend_config={}, work_dir=workdir)
backend = RayBackend(backend_config={})

# Define ClientApp
client_app_callable = client_app_loader
Expand Down Expand Up @@ -178,42 +156,14 @@ def test_backend_creation_submit_and_termination(
]
assert obtained_result_in_context == expected_output

def test_backend_creation_submit_and_termination_non_existing_client_app(
self,
) -> None:
"""Testing with ClientApp module that does not exist."""
with self.assertRaises(LoadClientAppError):
self.test_backend_creation_submit_and_termination(
client_app_loader=_load_from_module("a_non_existing_module:app")
)

def test_backend_creation_submit_and_termination_existing_client_app(
self,
) -> None:
"""Testing with ClientApp module that exist."""
# Resolve what should be the workdir to pass upon Backend initialisation
file_path = Path(__file__)
working_dir = Path.cwd()
rel_workdir = file_path.relative_to(working_dir)

# Susbtract last element
rel_workdir_str = str(rel_workdir.parent)

self.test_backend_creation_submit_and_termination(
client_app_loader=_load_from_module("raybackend_test:client_app"),
workdir=rel_workdir_str,
client_app_loader=_load_app,
)

def test_backend_creation_submit_and_termination_existing_client_app_unsetworkdir(
self,
) -> None:
"""Testing with ClientApp module that exist but the passed workdir does not."""
with self.assertRaises(ValueError):
self.test_backend_creation_submit_and_termination(
client_app_loader=_load_from_module("raybackend_test:client_app"),
workdir="/?&%$^#%@$!",
)

def test_backend_creation_with_init_arguments(self) -> None:
"""Testing whether init args are properly parsed to Ray."""
backend_config_4: BackendConfig = {
Expand All @@ -228,7 +178,6 @@ def test_backend_creation_with_init_arguments(self) -> None:

RayBackend(
backend_config=backend_config_4,
work_dir="",
)
nodes = ray.nodes()

Expand All @@ -238,7 +187,6 @@ def test_backend_creation_with_init_arguments(self) -> None:

RayBackend(
backend_config=backend_config_2,
work_dir="",
)
nodes = ray.nodes()

Expand Down
2 changes: 1 addition & 1 deletion src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def start_vce(

def backend_fn() -> Backend:
"""Instantiate a Backend."""
return backend_type(backend_config, work_dir=app_dir)
return backend_type(backend_config)

# Load ClientApp if needed
def _load() -> ClientApp:
Expand Down
30 changes: 29 additions & 1 deletion src/py/flwr/server/superlink/fleet/vce/vce_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@
from unittest import TestCase
from uuid import UUID

from flwr.client import Client, ClientApp, NumPyClient
from flwr.client.client_app import LoadClientAppError
from flwr.common import (
DEFAULT_TTL,
Config,
ConfigsRecord,
Context,
GetPropertiesIns,
Message,
MessageTypeLegacy,
Metadata,
Scalar,
)
from flwr.common.recordset_compat import getpropertiesins_to_recordset
from flwr.common.serde import message_from_taskres, message_to_taskins
Expand All @@ -45,6 +50,28 @@
from flwr.server.superlink.state import InMemoryState, StateFactory


class DummyClient(NumPyClient):
"""A dummy NumPyClient for tests."""

def get_properties(self, config: Config) -> Dict[str, Scalar]:
"""Return properties by doing a simple calculation."""
result = float(config["factor"]) * pi

# store something in context
self.context.state.configs_records["result"] = ConfigsRecord({"result": result})
return {"result": result}


def get_dummy_client(context: Context) -> Client: # pylint: disable=unused-argument
"""Return a DummyClient converted to Client type."""
return DummyClient().to_client()


dummy_client_app = ClientApp(
client_fn=get_dummy_client,
)


def terminate_simulation(f_stop: threading.Event, sleep_duration: int) -> None:
"""Set event to terminate Simulation Engine after `sleep_duration` seconds."""
sleep(sleep_duration)
Expand Down Expand Up @@ -137,7 +164,7 @@ def _autoresolve_app_dir(rel_client_app_dir: str = "backend") -> str:
# pylint: disable=too-many-arguments
def start_and_shutdown(
backend: str = "ray",
client_app_attr: str = "raybackend_test:client_app",
client_app_attr: Optional[str] = None,
app_dir: str = "",
num_supernodes: Optional[int] = None,
state_factory: Optional[StateFactory] = None,
Expand Down Expand Up @@ -169,6 +196,7 @@ def start_and_shutdown(

start_vce(
num_supernodes=num_supernodes,
client_app=None if client_app_attr else dummy_client_app,
client_app_attr=client_app_attr,
backend_name=backend,
backend_config_json_stream=backend_config,
Expand Down

0 comments on commit 0a750be

Please sign in to comment.