Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make flower-simulation accept ClientApp and ServerApp objects #3024

Merged
merged 168 commits into from
Mar 5, 2024
Merged
Changes from 1 commit
Commits
Show all changes
168 commits
Select commit Hold shift + click to select a range
4c83a5e
init
jafermarq Feb 21, 2024
a85db40
base backend
jafermarq Feb 21, 2024
b770312
update
jafermarq Feb 21, 2024
b9c6455
update docstrings
jafermarq Feb 21, 2024
cd48539
minor fixes
jafermarq Feb 21, 2024
2791163
updates
jafermarq Feb 22, 2024
4ca33ec
backend-config should contain value types
jafermarq Feb 22, 2024
1b6564a
fix
jafermarq Feb 22, 2024
bad8727
w/ previous
jafermarq Feb 22, 2024
a68172f
fix
jafermarq Feb 22, 2024
e2b072e
Merge branch 'vce-flee-api' into vce-fleet-api-backends
jafermarq Feb 22, 2024
6881866
fix for json.loads
jafermarq Feb 22, 2024
935e333
keep backend-config as json string
jafermarq Feb 22, 2024
704d667
merged
jafermarq Feb 22, 2024
adfe198
added `RayBackend` and `SimpleActorPool`
jafermarq Feb 22, 2024
d0bab9a
complete VCE loop; works with `simulation-pytorch` example
jafermarq Feb 22, 2024
5e0ee74
fix exclude generation logic
jafermarq Feb 22, 2024
82836ad
Merge branch 'vce-fleet-api-backends-ray' into vce-fleet-api-loop
jafermarq Feb 22, 2024
4853813
simulation-tf w/ Flower-next; updates pytorch example too
jafermarq Feb 22, 2024
31787cf
format
jafermarq Feb 22, 2024
8522022
passing actor init kwargs
jafermarq Feb 22, 2024
788e63f
Merge branch 'vce-fleet-api-backends-ray' into vce-fleet-api-loop
jafermarq Feb 22, 2024
9200513
updated examples
jafermarq Feb 22, 2024
d8935b3
auto enable GPU growth if 'tensorflow' passed
jafermarq Feb 22, 2024
b108be2
return to default 1xCPU for virtual client
jafermarq Feb 22, 2024
17e3089
Merge branch 'main' into vce-flee-api
danieljanes Feb 22, 2024
0e02b05
moved import
jafermarq Feb 22, 2024
b77e4e8
Merge branch 'vce-flee-api' into vce-fleet-api-backends
jafermarq Feb 22, 2024
fd67f22
Apply suggestions from code review
jafermarq Feb 22, 2024
cf004d8
renamed vars; exporting
jafermarq Feb 22, 2024
f10a6ca
Merge branch 'vce-flee-api' into vce-fleet-api-backends
jafermarq Feb 22, 2024
d097bcd
Merge branch 'main' into vce-fleet-api-backends
jafermarq Feb 22, 2024
e669e2a
merge
jafermarq Feb 22, 2024
a521b40
moved
jafermarq Feb 22, 2024
890e329
Merge branch 'move-server-functions' into vce-fleet-api-backends-ray
jafermarq Feb 22, 2024
2ccf612
Merge branch 'main' into vce-fleet-api-backends
jafermarq Feb 22, 2024
0f7a071
Merge branch 'vce-fleet-api-backends' into vce-fleet-api-backends-ray
jafermarq Feb 22, 2024
443551f
revisited imports readiness for chosen backend
jafermarq Feb 23, 2024
bdfcccb
merge w/ main
jafermarq Feb 23, 2024
79f363e
Apply suggestions from code review
jafermarq Feb 23, 2024
12fa44c
remove suprefluous if
jafermarq Feb 23, 2024
c309046
fixes
jafermarq Feb 23, 2024
d217677
merge and more
jafermarq Feb 23, 2024
ee20d50
merge w/ main
jafermarq Feb 23, 2024
b16d0b8
init; need resolve circular imports
jafermarq Feb 24, 2024
93918db
gracefully shutdown
jafermarq Feb 25, 2024
0e4ab14
terminate method for backend; asyncio event to trigger stop
jafermarq Feb 25, 2024
e173312
Merge branch 'vce-fleet-terminate-and-rename' into vce-fleet-api-loop
jafermarq Feb 25, 2024
261f516
.
jafermarq Feb 25, 2024
21e9932
propagate terminate asyncio logic
jafermarq Feb 25, 2024
89de845
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 25, 2024
2faef27
merge
jafermarq Feb 26, 2024
f8b57c5
added build/process/terminate tests
jafermarq Feb 26, 2024
accc67c
Merge branch 'vce-fleet-raybackend-tests' into vce-fleet-api-loop
jafermarq Feb 26, 2024
39e3234
format
jafermarq Feb 26, 2024
8ea4b08
fix for py3.8
jafermarq Feb 26, 2024
fb9bfc2
Merge branch 'vce-fleet-raybackend-tests' into vce-fleet-api-loop
jafermarq Feb 26, 2024
35c55d4
fix py3.11
jafermarq Feb 26, 2024
49bc661
fix import
jafermarq Feb 26, 2024
4506a17
wrapped asyncio test under `IsolatedAsyncioTestCase` class
jafermarq Feb 26, 2024
7d6f821
Merge branch 'vce-fleet-raybackend-tests' into vce-fleet-api-loop
jafermarq Feb 26, 2024
ed5b181
start/shutdown tests
jafermarq Feb 26, 2024
2c05cdd
full loop tests; tweaks
jafermarq Feb 26, 2024
c589f7d
erge w/ main
jafermarq Feb 26, 2024
98fb4b4
.
jafermarq Feb 26, 2024
b15cebd
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 26, 2024
65c8b79
undoing changes to simulation examples
jafermarq Feb 26, 2024
2dfc83f
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 26, 2024
87d7a4c
adding back examples
jafermarq Feb 26, 2024
35ab1f3
Apply suggestions from code review
jafermarq Feb 26, 2024
5b3365a
Apply suggestions from code review
jafermarq Feb 26, 2024
785ac91
introduced `partition_id`.
jafermarq Feb 26, 2024
eba053a
fix for ray proxies and tests
jafermarq Feb 26, 2024
3d15041
Merge branch 'main' into metadata-with-partition-id
jafermarq Feb 26, 2024
ee84f85
Merge branch 'main' into metadata-with-partition-id
danieljanes Feb 26, 2024
27d2bb1
re written
jafermarq Feb 26, 2024
b7d5521
Merge branch 'metadata-with-partition-id' into vce-fleet-api-loop
jafermarq Feb 26, 2024
1969aac
using `metadata.partition_id`
jafermarq Feb 26, 2024
17d4b23
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 26, 2024
8f1ca09
more efficient
jafermarq Feb 27, 2024
f44b595
undo
jafermarq Feb 27, 2024
33e7be3
Merge branch 'main' into vce-fleet-api-loop
jafermarq Feb 27, 2024
666c65f
Merge branch 'main' into vce-fleet-api-loop
danieljanes Feb 27, 2024
ab55b0c
more tests
jafermarq Feb 27, 2024
28dda2d
more
jafermarq Feb 27, 2024
5cd047e
minor update
jafermarq Feb 27, 2024
4be09c2
handle loading of non-existing ClientApp
jafermarq Feb 27, 2024
17d3d34
merge /w branch ahead; test vce with non existing clientapp
jafermarq Feb 27, 2024
3c616e9
better tests; reorg
jafermarq Feb 27, 2024
ffed29b
Merge branch 'more-raybackend-tests-yes' into vce-fleet-api-loop
jafermarq Feb 27, 2024
aed4420
update
jafermarq Feb 27, 2024
96519dc
w/ previous
jafermarq Feb 27, 2024
e491b7b
Merge branch 'more-raybackend-tests-yes' into vce-fleet-api-loop
jafermarq Feb 27, 2024
3b7dc4c
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 27, 2024
1aa3b36
post merge update
jafermarq Feb 27, 2024
21f03a9
fix
jafermarq Feb 27, 2024
ab63974
Merge branch 'minor-fix-cli-test' into more-raybackend-tests-yes
jafermarq Feb 27, 2024
1d137da
Merge branch 'main' into more-raybackend-tests-yes
jafermarq Feb 27, 2024
a4590af
Merge branch 'more-raybackend-tests-yes' into vce-fleet-api-loop
jafermarq Feb 27, 2024
141be7b
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 27, 2024
4d8ee73
minor tweak
jafermarq Feb 27, 2024
1ac8e2b
runs passing server-app; client-app modules
jafermarq Feb 27, 2024
c366604
w/ previous
jafermarq Feb 27, 2024
a62a0d1
wip
jafermarq Feb 27, 2024
8f6de1e
Merge branch 'main' into more-raybackend-tests-yes
danieljanes Feb 27, 2024
c45c4af
no need for separate test/ dir
jafermarq Feb 27, 2024
e2ac2b0
Merge branch 'more-raybackend-tests-yes' into vce-fleet-api-loop
jafermarq Feb 27, 2024
c9492f0
update
jafermarq Feb 27, 2024
98b31e0
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 27, 2024
91e4413
Merge branch 'vce-fleet-run-simulation' into vce-fleet-notebook-ready
jafermarq Feb 27, 2024
b1e0460
merge w/ main
jafermarq Feb 27, 2024
fdb7ace
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 27, 2024
82878f6
updates
jafermarq Feb 27, 2024
12ed070
merge w/ `vce-fleet-run-simulation`
jafermarq Feb 27, 2024
b3d397b
better handling of exceptions in vce's ; adjust test for
jafermarq Feb 27, 2024
bd7b1aa
completed tests.
jafermarq Feb 27, 2024
4e32264
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 27, 2024
0dce992
update import
jafermarq Feb 27, 2024
1936631
wip
jafermarq Feb 28, 2024
d679ce6
merge and update
jafermarq Feb 28, 2024
6e3271b
minior formatting
jafermarq Feb 28, 2024
67777c5
Apply suggestions from code review
jafermarq Feb 28, 2024
35f4566
Merge branch 'main' into vce-fleet-api-loop
jafermarq Feb 28, 2024
46eac84
fixes post review
jafermarq Feb 28, 2024
f2ee2cd
Merge branch 'main' into vce-fleet-api-loop
jafermarq Feb 28, 2024
2bc3869
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 28, 2024
cc6a145
instantiating backend in asyncio event loop
jafermarq Feb 28, 2024
99a2522
Merge branch 'vce-fleet-api-loop' into vce-fleet-run-simulation
jafermarq Feb 28, 2024
662579e
minor
jafermarq Feb 28, 2024
91c3eb2
Merge branch 'vce-fleet-run-simulation' into vce-fleet-notebook-ready
jafermarq Feb 28, 2024
6dd034b
updated TF notebook
jafermarq Feb 28, 2024
ed375a4
Merge branch 'main' into vce-fleet-run-simulation
jafermarq Feb 28, 2024
2aba895
moved `run_driver_api_grpc()`
jafermarq Feb 28, 2024
744084c
Merge branch 'move-run-driver-api-grpc' into vce-fleet-run-simulation
jafermarq Feb 28, 2024
f3d2c63
update and format
jafermarq Feb 28, 2024
c3fcdf6
Merge branch 'main' into vce-fleet-run-simulation
jafermarq Feb 28, 2024
c2a4dc8
better exception handling; updated simp.py examples
jafermarq Feb 29, 2024
eafc2ed
Merge branch 'main' into vce-fleet-run-simulation
jafermarq Feb 29, 2024
1b3bc49
merge; fixed conflicts; better docstrings
jafermarq Feb 29, 2024
94c2649
updates
jafermarq Feb 29, 2024
bebf3c6
merge w/ main
jafermarq Feb 29, 2024
2783865
Merge branch 'main' into vce-fleet-notebook-ready
jafermarq Mar 1, 2024
b00e840
better
jafermarq Mar 1, 2024
afef4e5
Merge branch 'main' into vce-run-simulation-th-handling
danieljanes Mar 1, 2024
6f9bd9e
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 1, 2024
32d8b33
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 1, 2024
a29b5b6
merge
jafermarq Mar 1, 2024
16d9c57
merge w/ main
jafermarq Mar 1, 2024
57a84e5
pyling, mypy fixes
jafermarq Mar 1, 2024
5ef65ee
handling asyncio event loop running by default in colab/jupyter
jafermarq Mar 1, 2024
5f16bee
join thread, else bad things happen
jafermarq Mar 1, 2024
232a82b
Update src/py/flwr/server/run_serverapp.py
danieljanes Mar 2, 2024
7a56928
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 2, 2024
2859e3e
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 2, 2024
8e0f95c
exposing relevant args to entry point through python env / notebook; …
jafermarq Mar 2, 2024
39f443b
Merge branch 'main' into vce-fleet-notebook-ready
jafermarq Mar 4, 2024
86f3761
moved input args; fix enable GPU growth in ServerApp thread; other minor
jafermarq Mar 4, 2024
f3ed0c9
simplifications; option `--verbose`
jafermarq Mar 4, 2024
43bd2e1
discarded changes to notebooks
jafermarq Mar 5, 2024
91683fe
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
8531ab6
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
0de86ad
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
a7e875a
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
6a18a1e
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
bceabb0
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
037eda4
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
af97609
Update src/py/flwr/simulation/run_simulation.py
danieljanes Mar 5, 2024
fb506c8
Merge branch 'main' into vce-fleet-notebook-ready
danieljanes Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
full loop tests; tweaks
jafermarq committed Feb 26, 2024
commit 2c05cdd066a1dd7209e197eb4e2f3c5210b71205
4 changes: 4 additions & 0 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@
from logging import INFO
from typing import Callable, Dict, List, Tuple, Union

import ray

from flwr.client.clientapp import ClientApp
from flwr.common.context import Context
from flwr.common.logger import log
@@ -157,3 +159,5 @@ async def process_message(
async def terminate(self) -> None:
"""Terminate all actors in actor pool."""
await self.pool.terminate_all_actors()
ray.shutdown()
log(INFO, "Terminated %s", self.__class__.__name__)
Original file line number Diff line number Diff line change
@@ -19,8 +19,6 @@
from typing import Callable, Dict, Optional, Tuple, Union
from unittest import IsolatedAsyncioTestCase

import ray

from flwr.client import Client, NumPyClient
from flwr.client.clientapp import ClientApp
from flwr.common import (
@@ -73,8 +71,6 @@ async def backend_build_process_and_termination(

await backend.terminate()

ray.shutdown()

return to_return


27 changes: 16 additions & 11 deletions src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
import json
import traceback
from logging import DEBUG, ERROR, INFO
from typing import Callable, Dict
from typing import Callable, Dict, Optional

from flwr.client.clientapp import ClientApp, load_client_app
from flwr.client.node_state import NodeState
@@ -30,7 +30,6 @@

from .backend import Backend, error_messages_backends, supported_backends

TaskInsQueue = asyncio.Queue[TaskIns]
NodeToPartitionMapping = Dict[int, int]


@@ -50,7 +49,7 @@ def _register_nodes(
# pylint: disable=too-many-arguments,too-many-locals
async def worker(
app: Callable[[], ClientApp],
queue: TaskInsQueue,
queue: "asyncio.Queue[TaskIns]",
node_states: Dict[int, NodeState],
state_factory: StateFactory,
nodes_mapping: NodeToPartitionMapping,
@@ -60,7 +59,7 @@ async def worker(
state = state_factory.state()
while True:
try:
task_ins = await queue.get()
task_ins: TaskIns = await queue.get()
node_id = task_ins.task.consumer.node_id

# Register and retrive runstate
@@ -104,7 +103,7 @@ async def worker(


async def generate_pull_requests(
queue: TaskInsQueue,
queue: "asyncio.Queue[TaskIns]",
state_factory: StateFactory,
nodes_mapping: NodeToPartitionMapping,
f_stop: asyncio.Event,
@@ -132,7 +131,7 @@ async def run(
) -> None:
"""Run the VCE async."""
# pylint: disable=fixme
queue: TaskInsQueue = asyncio.Queue(128)
queue: "asyncio.Queue[TaskIns]" = asyncio.Queue(128)

# Build backend
await backend.build()
@@ -150,7 +149,7 @@ async def run(

# Produced task terminated, now cancel worker tasks
for w_t in worker_tasks:
_ = w_t.cancel("Terminate on Simulation Engine shutdown.")
_ = w_t.cancel()

# print('requested cancel')
while not all(w_t.done() for w_t in worker_tasks):
@@ -172,12 +171,18 @@ def start_vce(
state_factory: StateFactory,
working_dir: str,
f_stop: asyncio.Event,
existing_nodes_mapping: Optional[NodeToPartitionMapping] = None,
) -> None:
"""Start Fleet API with the VirtualClientEngine (VCE)."""
# Register SuperNodes
nodes_mapping = _register_nodes(
num_nodes=num_supernodes, state_factory=state_factory
)
if existing_nodes_mapping:
# Use mapping constructed externally. This also means nodes
# have previously being registered.
nodes_mapping = existing_nodes_mapping
else:
# Register SuperNodes
nodes_mapping = _register_nodes(
num_nodes=num_supernodes, state_factory=state_factory
)

# Construct mapping of NodeStates
node_states: Dict[int, NodeState] = {}
188 changes: 163 additions & 25 deletions src/py/flwr/server/superlink/fleet/vce/vce_api_test.py
Original file line number Diff line number Diff line change
@@ -13,47 +13,185 @@
# limitations under the License.
# ==============================================================================
"""Test Fleet Simulation Engine API."""

import asyncio
import threading
from itertools import cycle
from math import pi
from time import sleep
from typing import Dict, Optional, Set
from unittest import IsolatedAsyncioTestCase
from uuid import UUID

from flwr.client import Client, NumPyClient
from flwr.client.clientapp import ClientApp
from flwr.common import (
Config,
ConfigsRecord,
GetPropertiesIns,
Message,
Metadata,
Scalar,
)
from flwr.common.constant import MESSAGE_TYPE_GET_PROPERTIES
from flwr.common.recordset_compat import getpropertiesins_to_recordset
from flwr.common.serde import message_from_taskres, message_to_taskins
from flwr.server.superlink.fleet.vce.vce_api import (
NodeToPartitionMapping,
_register_nodes,
start_vce,
)
from flwr.server.superlink.state import InMemoryState, StateFactory


class DummyClient(NumPyClient):
"""A dummy NumPyClient for tests."""

def get_properties(self, config: Config) -> Dict[str, Scalar]:
"""Return properties by doing a simple calculation."""
result = float(config["factor"]) * pi

# store something in context
self.context.state.configs_records["result"] = ConfigsRecord({"result": result})
return {"result": result}


def get_dummy_client(cid: str) -> Client: # pylint: disable=unused-argument
"""Return a DummyClient converted to Client type."""
return DummyClient().to_client()


client_app = ClientApp(
client_fn=get_dummy_client,
)


from flwr.server.superlink.state import StateFactory
def terminate_simulation(f_stop: asyncio.Event, sleep_duration: int) -> None:
"""Set event to terminate Simulation Engine after `sleep_duration` seconds."""
sleep(sleep_duration)
f_stop.set()

from . import start_vce

def start_and_shutdown(
existing_state_factory: Optional[StateFactory] = None,
nodes_mapping: Optional[NodeToPartitionMapping] = None,
duration: int = 10,
) -> None:
"""Start Simulation Engine and terminate after specified number of seconds."""
f_stop = asyncio.Event()

# Initialize StateFactory
if nodes_mapping:
if existing_state_factory is None:
raise ValueError(
"If you specify a node mapping, you must pass a StateFactory."
)
state_factory = existing_state_factory
else:
state_factory = StateFactory(":flwr-in-memory-state:")

# Setup thread that will set the f_stop event, triggering the termination of all
# asyncio logic in the Simulation Engine. It will also terminate the Backend.
termination_th = threading.Thread(
target=terminate_simulation, args=(f_stop, duration)
)
termination_th.start()

start_vce(
num_supernodes=50,
client_app_module_name="vce_api_test:client_app",
backend_name="ray",
backend_config_json_stream="{}", # an empty json stream (an empty config)
state_factory=state_factory,
working_dir="",
f_stop=f_stop,
existing_nodes_mapping=nodes_mapping,
)

# Trigger stop event
f_stop.set()

termination_th.join()

class AsyncTestFleetSimulationEngine(IsolatedAsyncioTestCase):
"""A basic class to test Fleet Simulation Enginge funcionality."""

class AsyncTestFleetSimulationEngineRayBackend(IsolatedAsyncioTestCase):
"""A basic class that enables testing asyncio functionalities."""

def test_start_and_shutdown(self) -> None:
"""Start Simulation Engine Fleet and terminate it."""
f_stop = asyncio.Event()
start_and_shutdown()

# pylint: disable=too-many-locals
def test_start_and_shutdown_with_tasks_in_state(self) -> None:
"""Run Simulation Engine with some TasksIns in State.

This test creates a few nodes and submits a few messages that need to be
executed by the Backend. In order for that to happen the asyncio
producer/consumer logic must function.
"""
num_messages = 113
num_nodes = 59

# Initialize StateFactory
# Register a state and a run_id in it
run_id = 1234
state_factory = StateFactory(":flwr-in-memory-state:")
state: InMemoryState = state_factory.state() # type: ignore
state.run_ids.add(run_id)

superlink_th = threading.Thread(
target=start_vce,
args=(
50,
"",
"ray",
"{}", # an empty json stream (represents an empty config)
state_factory,
"",
f_stop,
),
daemon=False,
# Register a few nodes
nodes_mapping = _register_nodes(
num_nodes=num_nodes, state_factory=state_factory
)

superlink_th.start()
# Artificially add TaskIns to state so they can be processed
# by the Simulation Engine logic
nodes_cycle = cycle(
nodes_mapping.keys()
) # we have more messages than supernodes
task_ids: Set[UUID] = set() # so we can retrieve them later
expected_results = {}
for i in range(num_messages):
dst_node_id = next(nodes_cycle)
# Construct a Message
mult_factor = 2024 + i
getproperties_ins = GetPropertiesIns(config={"factor": mult_factor})
recordset = getpropertiesins_to_recordset(getproperties_ins)
message = Message(
content=recordset,
metadata=Metadata(
run_id=run_id,
message_id="",
group_id="",
src_node_id=0,
dst_node_id=dst_node_id, # indicate destination node
reply_to_message="",
ttl="",
message_type=MESSAGE_TYPE_GET_PROPERTIES,
),
)
# Convert Message to TaskIns
taskins = message_to_taskins(message)
# Instert in state
task_id = state.store_task_ins(taskins)
if task_id:
# Add to UUID set
task_ids.add(task_id)
# Store expected output for check later on
expected_results[task_id] = mult_factor * pi

# Run
start_and_shutdown(state_factory, nodes_mapping)

# Get all TaskRes
task_res_list = state.get_task_res(task_ids=task_ids, limit=len(task_ids))

# Sleep for some time
sleep(10)
# Check results by first converting to Message
for task_res in task_res_list:

# Trigger stop event
f_stop.set()
message = message_from_taskres(task_res)

superlink_th.join()
# Verify message content is as expected
content = message.content
assert (
content.configs_records["getpropertiesres.properties"]["result"]
== expected_results[UUID(task_res.task.ancestry[0])]
)