From 87961921b3e56990183c80820691f170f3e221c1 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 10 Mar 2024 16:02:52 +0100 Subject: [PATCH 01/23] Initial draft --- src/py/flwr/client/app.py | 140 +++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 64 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index c2544e10c9c8..ba4858712cf1 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -435,71 +435,83 @@ def _load_client_app() -> ClientApp: node_state = NodeState() while True: - sleep_duration: int = 0 - with connection( - address, - insecure, - retry_invoker, - grpc_max_message_length, - root_certificates, - ) as conn: - receive, send, create_node, delete_node = conn - - # Register node - if create_node is not None: - create_node() # pylint: disable=not-callable - - while True: - # Receive - message = receive() - if message is None: - time.sleep(3) # Wait for 3s before asking again - continue - - log(INFO, "Received message") - - # Handle control message - out_message, sleep_duration = handle_control_message(message) - if out_message: - send(out_message) - break - - # Register context for this run - node_state.register_context(run_id=message.metadata.run_id) - - # Retrieve context for this run - context = node_state.retrieve_context(run_id=message.metadata.run_id) - - # Load ClientApp instance - client_app: ClientApp = load_client_app_fn() - - # Handle task message - out_message = client_app(message=message, context=context) - - # Update node state - node_state.update_context( - run_id=message.metadata.run_id, - context=context, - ) - - # Send - send(out_message) - log(INFO, "Sent reply") - - # Unregister node - if delete_node is not None: - delete_node() # pylint: disable=not-callable - - if sleep_duration == 0: - log(INFO, "Disconnect and shut down") + try: + sleep_duration: int = 0 + with connection( + address, + insecure, + retry_invoker, + grpc_max_message_length, + root_certificates, + ) as conn: + receive, send, create_node, delete_node = conn + + try: + # Register node + if create_node is not None: + create_node() # pylint: disable=not-callable + + while True: + # Receive + message = receive() + if message is None: + time.sleep(3) # Wait for 3s before asking again + continue + + log(INFO, "Received message") + + # Handle control message + out_message, sleep_duration = handle_control_message(message) + if out_message: + send(out_message) + break + + # Register context for this run + node_state.register_context(run_id=message.metadata.run_id) + + # Retrieve context for this run + context = node_state.retrieve_context( + run_id=message.metadata.run_id + ) + + # Load ClientApp instance + client_app: ClientApp = load_client_app_fn() + + # Handle task message + out_message = client_app(message=message, context=context) + + # Update node state + node_state.update_context( + run_id=message.metadata.run_id, + context=context, + ) + + # Send + send(out_message) + log(INFO, "Sent reply") + + # Unregister node + if delete_node is not None: + delete_node() # pylint: disable=not-callable + + except KeyboardInterrupt as err: + if delete_node is not None: + delete_node() # pylint: disable=not-callable + raise err from None + + if sleep_duration == 0: + log(INFO, "Disconnect and shut down") + break + # Sleep and reconnect afterwards + log( + INFO, + "Disconnect, then re-establish connection after %s second(s)", + sleep_duration, + ) + time.sleep(sleep_duration) + + except KeyboardInterrupt: break - # Sleep and reconnect afterwards - log( - INFO, - "Disconnect, then re-establish connection after %s second(s)", - sleep_duration, - ) - time.sleep(sleep_duration) def start_numpy_client( From 0d2a092d81c217c59f57053dd6db077352738280 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 10 Mar 2024 17:16:28 +0100 Subject: [PATCH 02/23] Working draft --- src/py/flwr/client/app.py | 70 +++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index ba4858712cf1..9416a867363b 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -16,6 +16,7 @@ import argparse +import signal import sys import time from logging import DEBUG, INFO, WARN @@ -396,6 +397,19 @@ def _load_client_app() -> ClientApp: transport, server_address ) + exit_handler = ExitHandler() + + def on_backoff(retry_state): + exit_handler.connection = False + if retry_state.tries == 1: + log(WARN, "Connection attempt failed, retrying...") + else: + log( + DEBUG, + "Connection attempt failed, retrying in %.2f seconds", + retry_state.actual_wait, + ) + retry_invoker = RetryInvoker( wait_factory=exponential, recoverable_exceptions=connection_error_type, @@ -421,20 +435,12 @@ def _load_client_app() -> ClientApp: if retry_state.tries > 1 else None ), - on_backoff=lambda retry_state: ( - log(WARN, "Connection attempt failed, retrying...") - if retry_state.tries == 1 - else log( - DEBUG, - "Connection attempt failed, retrying in %.2f seconds", - retry_state.actual_wait, - ) - ), + on_backoff=on_backoff, ) node_state = NodeState() - while True: + while not exit_handler.should_exit: try: sleep_duration: int = 0 with connection( @@ -446,12 +452,22 @@ def _load_client_app() -> ClientApp: ) as conn: receive, send, create_node, delete_node = conn - try: - # Register node - if create_node is not None: - create_node() # pylint: disable=not-callable + # def signal_handler(sig, frame): + # if delete_node is not None: + # delete_node() + # exit_handler.should_exit = True + # raise KeyboardInterrupt + # # sys.exit(0) + + # signal.signal(signal.SIGINT, signal_handler) + # signal.signal(signal.SIGTERM, signal_handler) - while True: + # Register node + if create_node is not None: + create_node() # pylint: disable=not-callable + + while not exit_handler.should_exit: + try: # Receive message = receive() if message is None: @@ -489,15 +505,16 @@ def _load_client_app() -> ClientApp: # Send send(out_message) log(INFO, "Sent reply") + except KeyboardInterrupt: + if delete_node is not None and exit_handler.connection: + delete_node() + exit_handler.should_exit = True + sleep_duration = 0 + raise KeyboardInterrupt - # Unregister node - if delete_node is not None: - delete_node() # pylint: disable=not-callable - - except KeyboardInterrupt as err: - if delete_node is not None: - delete_node() # pylint: disable=not-callable - raise err from None + # Unregister node + if delete_node is not None: + delete_node() # pylint: disable=not-callable if sleep_duration == 0: log(INFO, "Disconnect and shut down") @@ -509,7 +526,6 @@ def _load_client_app() -> ClientApp: sleep_duration, ) time.sleep(sleep_duration) - except KeyboardInterrupt: break @@ -665,3 +681,9 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ ) return connection, address, error_type + + +class ExitHandler: + def __init__(self) -> None: + self.should_exit = False + self.connection = True From 2959f1fe627f397c4be2d1b163b637fce239d659 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 10 Mar 2024 17:33:01 +0100 Subject: [PATCH 03/23] Cleaner implementation --- src/py/flwr/client/app.py | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 9416a867363b..10b423811fe9 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -16,9 +16,9 @@ import argparse -import signal import sys import time +from dataclasses import dataclass from logging import DEBUG, INFO, WARN from pathlib import Path from typing import Callable, ContextManager, Optional, Tuple, Type, Union @@ -397,10 +397,10 @@ def _load_client_app() -> ClientApp: transport, server_address ) - exit_handler = ExitHandler() + connection_tracker = _ConnectionTracker() def on_backoff(retry_state): - exit_handler.connection = False + connection_tracker.connection = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") else: @@ -440,7 +440,7 @@ def on_backoff(retry_state): node_state = NodeState() - while not exit_handler.should_exit: + while True: try: sleep_duration: int = 0 with connection( @@ -452,21 +452,11 @@ def on_backoff(retry_state): ) as conn: receive, send, create_node, delete_node = conn - # def signal_handler(sig, frame): - # if delete_node is not None: - # delete_node() - # exit_handler.should_exit = True - # raise KeyboardInterrupt - # # sys.exit(0) - - # signal.signal(signal.SIGINT, signal_handler) - # signal.signal(signal.SIGTERM, signal_handler) - # Register node if create_node is not None: create_node() # pylint: disable=not-callable - while not exit_handler.should_exit: + while True: try: # Receive message = receive() @@ -505,12 +495,11 @@ def on_backoff(retry_state): # Send send(out_message) log(INFO, "Sent reply") - except KeyboardInterrupt: - if delete_node is not None and exit_handler.connection: + except KeyboardInterrupt as err: + if delete_node is not None and connection_tracker.connection: delete_node() - exit_handler.should_exit = True sleep_duration = 0 - raise KeyboardInterrupt + raise err from None # Unregister node if delete_node is not None: @@ -683,7 +672,6 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ return connection, address, error_type -class ExitHandler: - def __init__(self) -> None: - self.should_exit = False - self.connection = True +@dataclass +class _ConnectionTracker: + connection: bool = True From dfef6f8231c4b88e7e6fc369857b5fa5a00f1558 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 10 Mar 2024 17:37:33 +0100 Subject: [PATCH 04/23] Add type hints --- src/py/flwr/client/app.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 10b423811fe9..870b197d6923 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -40,7 +40,7 @@ from flwr.common.exit_handlers import register_exit_handlers from flwr.common.logger import log, warn_deprecated_feature, warn_experimental_feature from flwr.common.object_ref import load_app, validate -from flwr.common.retry_invoker import RetryInvoker, exponential +from flwr.common.retry_invoker import RetryInvoker, RetryState, exponential from .grpc_client.connection import grpc_connection from .grpc_rere_client.connection import grpc_request_response @@ -399,7 +399,7 @@ def _load_client_app() -> ClientApp: connection_tracker = _ConnectionTracker() - def on_backoff(retry_state): + def _on_backoff(retry_state: RetryState) -> None: connection_tracker.connection = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") @@ -435,7 +435,7 @@ def on_backoff(retry_state): if retry_state.tries > 1 else None ), - on_backoff=on_backoff, + on_backoff=_on_backoff, ) node_state = NodeState() From 90ed4f0ac0af1d8583c682115275414d0749922a Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Mon, 11 Mar 2024 13:06:17 +0100 Subject: [PATCH 05/23] Remove nested try-except --- src/py/flwr/client/app.py | 176 ++++++++++++++++++++------------------ 1 file changed, 92 insertions(+), 84 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 870b197d6923..b11bd44bc3e1 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -399,6 +399,16 @@ def _load_client_app() -> ClientApp: connection_tracker = _ConnectionTracker() + def _on_sucess(retry_state: RetryState) -> None: + connection_tracker.connection = True + if retry_state.tries > 1: + log( + INFO, + "Connection successful after %.2f seconds and %s tries.", + retry_state.elapsed_time, + retry_state.tries, + ) + def _on_backoff(retry_state: RetryState) -> None: connection_tracker.connection = False if retry_state.tries == 1: @@ -425,98 +435,96 @@ def _on_backoff(retry_state: RetryState) -> None: if retry_state.tries > 1 else None ), - on_success=lambda retry_state: ( - log( - INFO, - "Connection successful after %.2f seconds and %s tries.", - retry_state.elapsed_time, - retry_state.tries, - ) - if retry_state.tries > 1 - else None - ), + on_success=_on_sucess, on_backoff=_on_backoff, ) node_state = NodeState() while True: - try: - sleep_duration: int = 0 - with connection( - address, - insecure, - retry_invoker, - grpc_max_message_length, - root_certificates, - ) as conn: - receive, send, create_node, delete_node = conn - - # Register node - if create_node is not None: - create_node() # pylint: disable=not-callable - - while True: - try: - # Receive - message = receive() - if message is None: - time.sleep(3) # Wait for 3s before asking again - continue - - log(INFO, "Received message") - - # Handle control message - out_message, sleep_duration = handle_control_message(message) - if out_message: - send(out_message) - break - - # Register context for this run - node_state.register_context(run_id=message.metadata.run_id) - - # Retrieve context for this run - context = node_state.retrieve_context( - run_id=message.metadata.run_id - ) - - # Load ClientApp instance - client_app: ClientApp = load_client_app_fn() - - # Handle task message - out_message = client_app(message=message, context=context) - - # Update node state - node_state.update_context( - run_id=message.metadata.run_id, - context=context, - ) - - # Send + sleep_duration: int = 0 + with connection( + address, + insecure, + retry_invoker, + grpc_max_message_length, + root_certificates, + ) as conn: + receive, send, create_node, delete_node = conn + + # Register node + if create_node is not None: + create_node() # pylint: disable=not-callable + + while True: + try: + # Receive + message = receive() + if message is None: + time.sleep(3) # Wait for 3s before asking again + continue + + log( + INFO, + "Received group %s message of type %s for run %s", + message.metadata.group_id, + message.metadata.message_type, + message.metadata.run_id, + ) + + # Handle control message + out_message, sleep_duration = handle_control_message(message) + if out_message: send(out_message) - log(INFO, "Sent reply") - except KeyboardInterrupt as err: - if delete_node is not None and connection_tracker.connection: - delete_node() - sleep_duration = 0 - raise err from None - - # Unregister node - if delete_node is not None: - delete_node() # pylint: disable=not-callable - - if sleep_duration == 0: - log(INFO, "Disconnect and shut down") - break - # Sleep and reconnect afterwards - log( - INFO, - "Disconnect, then re-establish connection after %s second(s)", - sleep_duration, - ) - time.sleep(sleep_duration) - except KeyboardInterrupt: + break + + # Register context for this run + node_state.register_context(run_id=message.metadata.run_id) + + # Retrieve context for this run + context = node_state.retrieve_context( + run_id=message.metadata.run_id + ) + + # Load ClientApp instance + client_app: ClientApp = load_client_app_fn() + + # Handle task message + out_message = client_app(message=message, context=context) + + # Update node state + node_state.update_context( + run_id=message.metadata.run_id, + context=context, + ) + + # Send + send(out_message) + log( + INFO, + "Sent group %s reply of type %s for run %s", + out_message.metadata.group_id, + out_message.metadata.message_type, + out_message.metadata.run_id, + ) + except KeyboardInterrupt: + sleep_duration = 0 + break + + # Unregister node + if delete_node is not None and connection_tracker.connection: + delete_node() # pylint: disable=not-callable + + if sleep_duration == 0: + log(INFO, "Disconnect and shut down") break + # Sleep and reconnect afterwards + log( + INFO, + "Disconnect, then re-establish connection after %s second(s)", + sleep_duration, + ) + time.sleep(sleep_duration) def start_numpy_client( From 240c31fb3354a7c6edc7a937b5193af2a1815a2c Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Mon, 11 Mar 2024 14:26:34 +0100 Subject: [PATCH 06/23] Revert log changes --- src/py/flwr/client/app.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index b11bd44bc3e1..8b3d4a9a39a3 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -464,13 +464,7 @@ def _on_backoff(retry_state: RetryState) -> None: time.sleep(3) # Wait for 3s before asking again continue - log( - INFO, - "Received group %s message of type %s for run %s", - message.metadata.group_id, - message.metadata.message_type, - message.metadata.run_id, - ) + log(INFO, "Received message") # Handle control message out_message, sleep_duration = handle_control_message(message) @@ -500,13 +494,7 @@ def _on_backoff(retry_state: RetryState) -> None: # Send send(out_message) - log( - INFO, - "Sent group %s reply of type %s for run %s", - out_message.metadata.group_id, - out_message.metadata.message_type, - out_message.metadata.run_id, - ) + log(INFO, "Sent reply") except KeyboardInterrupt: sleep_duration = 0 break From 48910a2f6b228173b760440c76d8b2e4984fdc2e Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 17:24:29 +0000 Subject: [PATCH 07/23] Use signals --- src/py/flwr/client/app.py | 26 ++++++++++++++----- .../fleet/message_handler/message_handler.py | 2 +- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 69288f26a110..8746dddcf3ae 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -16,6 +16,7 @@ import argparse +import signal import sys import time from dataclasses import dataclass @@ -397,10 +398,10 @@ def _load_client_app() -> ClientApp: transport, server_address ) - connection_tracker = _ConnectionTracker() + run_tracker = _RunTracker() def _on_sucess(retry_state: RetryState) -> None: - connection_tracker.connection = True + run_tracker.connection = True if retry_state.tries > 1: log( INFO, @@ -410,7 +411,7 @@ def _on_sucess(retry_state: RetryState) -> None: ) def _on_backoff(retry_state: RetryState) -> None: - connection_tracker.connection = False + run_tracker.connection = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") else: @@ -441,7 +442,9 @@ def _on_backoff(retry_state: RetryState) -> None: node_state = NodeState() - while True: + run_tracker.register_signal_handler() + + while not run_tracker.should_exit: sleep_duration: int = 0 with connection( address, @@ -518,12 +521,12 @@ def _on_backoff(retry_state: RetryState) -> None: out_message.metadata.message_type, message.metadata.message_id, ) - except KeyboardInterrupt: + except StopIteration: sleep_duration = 0 break # Unregister node - if delete_node is not None and connection_tracker.connection: + if delete_node is not None and run_tracker.connection: delete_node() # pylint: disable=not-callable if sleep_duration == 0: @@ -692,5 +695,14 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ @dataclass -class _ConnectionTracker: +class _RunTracker: connection: bool = True + should_exit: bool = False + + def register_signal_handler(self): + def signal_handler(sig, frame): + self.should_exit = True + raise StopIteration + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) diff --git a/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py b/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py index 5fe815180823..fa455321dede 100644 --- a/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py +++ b/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py @@ -47,7 +47,7 @@ def create_node( def delete_node(request: DeleteNodeRequest, state: State) -> DeleteNodeResponse: """.""" # Validate node_id - if request.node.anonymous or request.node.node_id <= 0: + if request.node.anonymous: return DeleteNodeResponse() # Update state From 5b17f2ebfb4f6e11255a539f0703549f063d4419 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 21:07:16 +0000 Subject: [PATCH 08/23] Better exception --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 8746dddcf3ae..5c43f244649a 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -702,7 +702,7 @@ class _RunTracker: def register_signal_handler(self): def signal_handler(sig, frame): self.should_exit = True - raise StopIteration + raise StopIteration from None signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) From 53a6d6f90aaf13daed9d92a7b611eb826bcb66e3 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 21:15:49 +0000 Subject: [PATCH 09/23] Add type hints --- src/py/flwr/client/app.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 5c43f244649a..629da7b98a20 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -22,6 +22,7 @@ from dataclasses import dataclass from logging import DEBUG, INFO, WARN from pathlib import Path +from types import FrameType from typing import Callable, ContextManager, Optional, Tuple, Type, Union from grpc import RpcError @@ -699,8 +700,8 @@ class _RunTracker: connection: bool = True should_exit: bool = False - def register_signal_handler(self): - def signal_handler(sig, frame): + def register_signal_handler(self) -> None: + def signal_handler(sig: int, frame: Optional[FrameType]) -> None: self.should_exit = True raise StopIteration from None From 38aed6a7391b255dbf68ef0f1b4c73db9f648812 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 21:27:22 +0000 Subject: [PATCH 10/23] Add disables --- src/py/flwr/client/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 629da7b98a20..354902849654 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -701,7 +701,8 @@ class _RunTracker: should_exit: bool = False def register_signal_handler(self) -> None: - def signal_handler(sig: int, frame: Optional[FrameType]) -> None: + def signal_handler(sig, frame): # type: ignore + # pylint: disable=unused-argument self.should_exit = True raise StopIteration from None From 73d68e2ac40452b6d7a04f28fec49c92e41dc06b Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 21:45:05 +0000 Subject: [PATCH 11/23] Simplify implementation --- src/py/flwr/client/app.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 354902849654..60529434e1f5 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -22,7 +22,6 @@ from dataclasses import dataclass from logging import DEBUG, INFO, WARN from pathlib import Path -from types import FrameType from typing import Callable, ContextManager, Optional, Tuple, Type, Union from grpc import RpcError @@ -400,6 +399,7 @@ def _load_client_app() -> ClientApp: ) run_tracker = _RunTracker() + run_tracker.register_signal_handler() def _on_sucess(retry_state: RetryState) -> None: run_tracker.connection = True @@ -443,9 +443,7 @@ def _on_backoff(retry_state: RetryState) -> None: node_state = NodeState() - run_tracker.register_signal_handler() - - while not run_tracker.should_exit: + while True: sleep_duration: int = 0 with connection( address, @@ -533,6 +531,7 @@ def _on_backoff(retry_state: RetryState) -> None: if sleep_duration == 0: log(INFO, "Disconnect and shut down") break + # Sleep and reconnect afterwards log( INFO, @@ -698,12 +697,10 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ @dataclass class _RunTracker: connection: bool = True - should_exit: bool = False def register_signal_handler(self) -> None: def signal_handler(sig, frame): # type: ignore # pylint: disable=unused-argument - self.should_exit = True raise StopIteration from None signal.signal(signal.SIGINT, signal_handler) From aa764707e7346f3e0b6ad828be39a2d2f226bf3a Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 21:54:42 +0000 Subject: [PATCH 12/23] Add docstring --- src/py/flwr/client/app.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 60529434e1f5..3d3605914fb7 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -699,6 +699,8 @@ class _RunTracker: connection: bool = True def register_signal_handler(self) -> None: + """Register handlers for exit signals.""" + def signal_handler(sig, frame): # type: ignore # pylint: disable=unused-argument raise StopIteration from None From 27ad48c5d5b35f455805862a7f337d2a60ce7428 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 13 Mar 2024 22:20:45 +0000 Subject: [PATCH 13/23] Move signal handler registration before try/except --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 3d3605914fb7..6a07fee33cb3 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -399,7 +399,6 @@ def _load_client_app() -> ClientApp: ) run_tracker = _RunTracker() - run_tracker.register_signal_handler() def _on_sucess(retry_state: RetryState) -> None: run_tracker.connection = True @@ -458,6 +457,7 @@ def _on_backoff(retry_state: RetryState) -> None: if create_node is not None: create_node() # pylint: disable=not-callable + run_tracker.register_signal_handler() while True: try: # Receive From 446c46d02a094c0ce63d6dc4c680517aca192ea2 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 10 Apr 2024 08:56:00 +0200 Subject: [PATCH 14/23] Format file --- src/py/flwr/client/app.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index e2946da396c5..2fcf29c136b2 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -491,7 +491,9 @@ def _on_backoff(retry_state: RetryState) -> None: node_state.register_context(run_id=message.metadata.run_id) # Retrieve context for this run - context = node_state.retrieve_context(run_id=message.metadata.run_id) + context = node_state.retrieve_context( + run_id=message.metadata.run_id + ) # Create an error reply message that will never be used to prevent # the used-before-assignment linting error From cb7955278b37c1001e9f5a3ec9d48c616cc130ce Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 10 Apr 2024 08:59:10 +0200 Subject: [PATCH 15/23] Split comment on 2 lines --- src/py/flwr/client/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 2fcf29c136b2..5d3ec1ecd95d 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -519,7 +519,8 @@ def _on_backoff(retry_state: RetryState) -> None: # Don't update/change NodeState e_code = ErrorCode.CLIENT_APP_RAISED_EXCEPTION - # Reason example: ":<'division by zero'>" + # Reason example: + # ":<'division by zero'>" reason = str(type(ex)) + ":<'" + str(ex) + "'>" exc_entity = "ClientApp" if isinstance(ex, LoadClientAppError): From e0e7167cafadccaa69fe6213c44ab876c1a0d1ca Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 10 Apr 2024 14:27:39 +0200 Subject: [PATCH 16/23] CreateNode on reconnect --- src/py/flwr/client/app.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 5d3ec1ecd95d..d1a1d365bf99 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -408,6 +408,8 @@ def _on_sucess(retry_state: RetryState) -> None: retry_state.elapsed_time, retry_state.tries, ) + if run_tracker.create_node: + run_tracker.create_node() def _on_backoff(retry_state: RetryState) -> None: run_tracker.connection = False @@ -453,6 +455,7 @@ def _on_backoff(retry_state: RetryState) -> None: receive, send, create_node, delete_node = conn # Register node + run_tracker.create_node = create_node if create_node is not None: create_node() # pylint: disable=not-callable @@ -725,6 +728,7 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ @dataclass class _RunTracker: connection: bool = True + create_node: Optional[Callable[[], None]] = None def register_signal_handler(self) -> None: """Register handlers for exit signals.""" From c78892b2dc0a3b0b57d729b1832088512a5f565e Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Thu, 9 May 2024 09:16:47 +0200 Subject: [PATCH 17/23] Delete run_tracker on exit --- src/py/flwr/client/app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index cd0b119f2bdb..9b73f63e27af 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -431,6 +431,7 @@ def _on_backoff(retry_state: RetryState) -> None: if sleep_duration == 0: log(INFO, "Disconnect and shut down") + del run_tracker break # Sleep and reconnect afterwards From adfb32bdba09f4621d2900713ec05dc442047eaf Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 28 May 2024 16:14:07 +0200 Subject: [PATCH 18/23] Clientapp exit bool test (#3522) --- src/py/flwr/client/app.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 9b73f63e27af..e3487d352240 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -268,7 +268,6 @@ def _load_client_app() -> ClientApp: run_tracker = _RunTracker() def _on_sucess(retry_state: RetryState) -> None: - run_tracker.connection = True if retry_state.tries > 1: log( INFO, @@ -280,7 +279,6 @@ def _on_sucess(retry_state: RetryState) -> None: run_tracker.create_node() def _on_backoff(retry_state: RetryState) -> None: - run_tracker.connection = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") else: @@ -311,7 +309,7 @@ def _on_backoff(retry_state: RetryState) -> None: node_state = NodeState() - while True: + while not run_tracker.interrupt: sleep_duration: int = 0 with connection( address, @@ -330,7 +328,7 @@ def _on_backoff(retry_state: RetryState) -> None: create_node() # pylint: disable=not-callable run_tracker.register_signal_handler() - while True: + while not run_tracker.interrupt: try: # Receive message = receive() @@ -404,7 +402,10 @@ def _on_backoff(retry_state: RetryState) -> None: e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION exc_entity = "SuperNode" - log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) + if not run_tracker.interrupt: + log( + ERROR, "%s raised an exception", exc_entity, exc_info=ex + ) # Create error message reply_message = message.create_error_reply( @@ -425,10 +426,6 @@ def _on_backoff(retry_state: RetryState) -> None: sleep_duration = 0 break - # Unregister node - if delete_node is not None and run_tracker.connection: - delete_node() # pylint: disable=not-callable - if sleep_duration == 0: log(INFO, "Disconnect and shut down") del run_tracker @@ -606,14 +603,15 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ @dataclass class _RunTracker: - connection: bool = True create_node: Optional[Callable[[], None]] = None + interrupt: bool = False def register_signal_handler(self) -> None: """Register handlers for exit signals.""" def signal_handler(sig, frame): # type: ignore # pylint: disable=unused-argument + self.interrupt = True raise StopIteration from None signal.signal(signal.SIGINT, signal_handler) From c0f2379f1e4ed6797d22411810a20bee916feebe Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 29 May 2024 22:10:41 +0200 Subject: [PATCH 19/23] feat(framework:skip) Add delete node on shutdown (#3524) --- src/py/flwr/client/app.py | 7 +++++++ src/py/flwr/client/grpc_rere_client/connection.py | 5 ----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index e3487d352240..c9f45d14539c 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -268,6 +268,7 @@ def _load_client_app() -> ClientApp: run_tracker = _RunTracker() def _on_sucess(retry_state: RetryState) -> None: + run_tracker.is_connected = True if retry_state.tries > 1: log( INFO, @@ -279,6 +280,7 @@ def _on_sucess(retry_state: RetryState) -> None: run_tracker.create_node() def _on_backoff(retry_state: RetryState) -> None: + run_tracker.is_connected = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") else: @@ -426,6 +428,10 @@ def _on_backoff(retry_state: RetryState) -> None: sleep_duration = 0 break + # Unregister node + if delete_node is not None and run_tracker.is_connected: + delete_node() # pylint: disable=not-callable + if sleep_duration == 0: log(INFO, "Disconnect and shut down") del run_tracker @@ -605,6 +611,7 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ class _RunTracker: create_node: Optional[Callable[[], None]] = None interrupt: bool = False + is_connected: bool = False def register_signal_handler(self) -> None: """Register handlers for exit signals.""" diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 3778fd4061f9..d1769958863d 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -188,11 +188,6 @@ def delete_node() -> None: log(ERROR, "Node instance missing") return - # Stop the ping-loop thread - ping_stop_event.set() - if ping_thread is not None: - ping_thread.join() - # Call FleetAPI delete_node_request = DeleteNodeRequest(node=node) retry_invoker.invoke(stub.DeleteNode, request=delete_node_request) From ffdd1080607bb722259075fd588435c7d4af7824 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Thu, 30 May 2024 09:26:04 +0200 Subject: [PATCH 20/23] Revert delete_node change --- src/py/flwr/client/app.py | 7 ------- src/py/flwr/client/grpc_rere_client/connection.py | 5 +++++ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index c9f45d14539c..e3487d352240 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -268,7 +268,6 @@ def _load_client_app() -> ClientApp: run_tracker = _RunTracker() def _on_sucess(retry_state: RetryState) -> None: - run_tracker.is_connected = True if retry_state.tries > 1: log( INFO, @@ -280,7 +279,6 @@ def _on_sucess(retry_state: RetryState) -> None: run_tracker.create_node() def _on_backoff(retry_state: RetryState) -> None: - run_tracker.is_connected = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") else: @@ -428,10 +426,6 @@ def _on_backoff(retry_state: RetryState) -> None: sleep_duration = 0 break - # Unregister node - if delete_node is not None and run_tracker.is_connected: - delete_node() # pylint: disable=not-callable - if sleep_duration == 0: log(INFO, "Disconnect and shut down") del run_tracker @@ -611,7 +605,6 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ class _RunTracker: create_node: Optional[Callable[[], None]] = None interrupt: bool = False - is_connected: bool = False def register_signal_handler(self) -> None: """Register handlers for exit signals.""" diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index d1769958863d..3778fd4061f9 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -188,6 +188,11 @@ def delete_node() -> None: log(ERROR, "Node instance missing") return + # Stop the ping-loop thread + ping_stop_event.set() + if ping_thread is not None: + ping_thread.join() + # Call FleetAPI delete_node_request = DeleteNodeRequest(node=node) retry_invoker.invoke(stub.DeleteNode, request=delete_node_request) From 5d415a40461d4aeea72b6a5327e7655446fa8b4b Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 9 Jun 2024 18:35:37 +0200 Subject: [PATCH 21/23] Rename and remove create_node --- src/py/flwr/client/app.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 0c212a5511e7..0c0b06f1a8e1 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -265,8 +265,7 @@ def _load_client_app() -> ClientApp: transport, server_address ) - run_tracker = _RunTracker() - _ = run_tracker + app_state_tracker = _AppStateTracker() def _on_sucess(retry_state: RetryState) -> None: if retry_state.tries > 1: @@ -308,7 +307,7 @@ def _on_backoff(retry_state: RetryState) -> None: node_state = NodeState() - while not run_tracker.interrupt: + while not app_state_tracker.interrupt: sleep_duration: int = 0 with connection( address, @@ -322,12 +321,11 @@ def _on_backoff(retry_state: RetryState) -> None: receive, send, create_node, delete_node, get_run = conn # Register node - run_tracker.create_node = create_node if create_node is not None: create_node() # pylint: disable=not-callable - run_tracker.register_signal_handler() - while not run_tracker.interrupt: + app_state_tracker.register_signal_handler() + while not app_state_tracker.interrupt: try: # Receive message = receive() @@ -399,7 +397,7 @@ def _on_backoff(retry_state: RetryState) -> None: e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION exc_entity = "SuperNode" - if not run_tracker.interrupt: + if not app_state_tracker.interrupt: log( ERROR, "%s raised an exception", exc_entity, exc_info=ex ) @@ -425,7 +423,7 @@ def _on_backoff(retry_state: RetryState) -> None: if sleep_duration == 0: log(INFO, "Disconnect and shut down") - del run_tracker + del app_state_tracker break # Sleep and reconnect afterwards @@ -599,7 +597,7 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ @dataclass -class _RunTracker: +class _AppStateTracker: interrupt: bool = False def register_signal_handler(self) -> None: From 100ba859b6be24f36a58f9598df2f512acc3fbd4 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 9 Jun 2024 19:07:19 +0200 Subject: [PATCH 22/23] feat(framework:skip) Call delete_node on shutdown (#3531) --- src/py/flwr/client/app.py | 7 +++++++ src/py/flwr/client/grpc_rere_client/connection.py | 5 ----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 0c0b06f1a8e1..7294600288aa 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -268,6 +268,7 @@ def _load_client_app() -> ClientApp: app_state_tracker = _AppStateTracker() def _on_sucess(retry_state: RetryState) -> None: + app_state_tracker.is_connected = True if retry_state.tries > 1: log( INFO, @@ -277,6 +278,7 @@ def _on_sucess(retry_state: RetryState) -> None: ) def _on_backoff(retry_state: RetryState) -> None: + app_state_tracker.is_connected = False if retry_state.tries == 1: log(WARN, "Connection attempt failed, retrying...") else: @@ -421,6 +423,10 @@ def _on_backoff(retry_state: RetryState) -> None: sleep_duration = 0 break + # Unregister node + if delete_node is not None and app_state_tracker.is_connected: + delete_node() # pylint: disable=not-callable + if sleep_duration == 0: log(INFO, "Disconnect and shut down") del app_state_tracker @@ -599,6 +605,7 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ @dataclass class _AppStateTracker: interrupt: bool = False + is_connected: bool = False def register_signal_handler(self) -> None: """Register handlers for exit signals.""" diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 8ef8e7ebf62a..44e1f2e32e06 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -191,11 +191,6 @@ def delete_node() -> None: log(ERROR, "Node instance missing") return - # Stop the ping-loop thread - ping_stop_event.set() - if ping_thread is not None: - ping_thread.join() - # Call FleetAPI delete_node_request = DeleteNodeRequest(node=node) retry_invoker.invoke(stub.DeleteNode, request=delete_node_request) From 83528a36acc2cb5750f376dbd7838299f983ab13 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 9 Jun 2024 19:39:47 +0200 Subject: [PATCH 23/23] Keep ping stop event call --- src/py/flwr/client/grpc_rere_client/connection.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 44e1f2e32e06..9579d5830165 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -191,6 +191,9 @@ def delete_node() -> None: log(ERROR, "Node instance missing") return + # Stop the ping-loop thread + ping_stop_event.set() + # Call FleetAPI delete_node_request = DeleteNodeRequest(node=node) retry_invoker.invoke(stub.DeleteNode, request=delete_node_request)