Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add exception handling to SuperNode for graceful exit when stopped #4668

Merged
merged 108 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 106 commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
3a2ef95
Add protos for flwr stop
chongshenng Dec 4, 2024
3019ebb
Add StopRun to ExecServicer
chongshenng Dec 4, 2024
dc7c8e6
Merge branch 'main' into add-flwr-stop-protos
chongshenng Dec 4, 2024
a15fb4b
Merge main
chongshenng Dec 4, 2024
dde8471
Update
chongshenng Dec 4, 2024
a46c6b4
feat(framework) Add flwr stop CLI
chongshenng Dec 5, 2024
2f3be4f
Add noqa
chongshenng Dec 5, 2024
cd1a6bc
Init
chongshenng Dec 5, 2024
48501f0
Initial add
chongshenng Dec 5, 2024
78084ba
Remove FleetServicer context abort
chongshenng Dec 5, 2024
483c8f1
Merge main
chongshenng Dec 9, 2024
fc84267
Remove FleetServicer check
chongshenng Dec 9, 2024
6aa749d
Update
chongshenng Dec 9, 2024
0e3f577
Update pylint ignores
chongshenng Dec 9, 2024
3217740
Merge main
chongshenng Dec 9, 2024
18d10db
Update protos
chongshenng Dec 9, 2024
2ebd50e
Add GetRunStatus
chongshenng Dec 9, 2024
468ac5e
Add run_id to PushTaskInsRequest and PullTaskResRequest
chongshenng Dec 9, 2024
27e7088
Update
chongshenng Dec 9, 2024
9c0a40f
Reduce diff
chongshenng Dec 9, 2024
73483b8
Fix GetRunStatus
chongshenng Dec 9, 2024
6b5b469
Merge branch 'add-flwr-stop-protos' into add-context-abort
chongshenng Dec 9, 2024
f91ede3
Update
chongshenng Dec 9, 2024
4f33992
Add set
chongshenng Dec 9, 2024
0a474a1
Remove RunStatus check
chongshenng Dec 9, 2024
1ca2035
Merge branch 'add-flwr-stop-protos' into add-context-abort
chongshenng Dec 9, 2024
754293d
Merge branch 'main' into add-flwr-stop-protos
chongshenng Dec 9, 2024
fd13678
Merge branch 'add-flwr-stop-protos' into add-context-abort
chongshenng Dec 9, 2024
3ff5ec8
Merge branch 'main' into add-flwr-stop-protos
chongshenng Dec 9, 2024
e01debf
Merge branch 'add-flwr-stop-protos' into add-context-abort
chongshenng Dec 9, 2024
22139e8
Remove except
chongshenng Dec 9, 2024
d223cb6
Undo pylint
chongshenng Dec 9, 2024
e4db4c3
Init
chongshenng Dec 9, 2024
b548230
Fix type
chongshenng Dec 9, 2024
7d2ea01
Update exception get
chongshenng Dec 9, 2024
25ea081
Add threadpoolexecutor
chongshenng Dec 9, 2024
acef698
Revert
chongshenng Dec 10, 2024
0d5d791
Revert
chongshenng Dec 10, 2024
8815008
Fix
chongshenng Dec 10, 2024
70e17c6
Fix
chongshenng Dec 10, 2024
d8c4de6
Merge branch 'main' into add-flwr-stop-protos
chongshenng Dec 10, 2024
8344853
Merge branch 'add-flwr-stop-protos' into add-context-abort
chongshenng Dec 10, 2024
7888976
Merge branch 'add-context-abort' into add-serverapp-abort
chongshenng Dec 10, 2024
8b8eb44
Rename
chongshenng Dec 10, 2024
e99b6a5
Init
chongshenng Dec 10, 2024
3dd1d6e
Merge branch 'main' into add-flwr-stop-protos
chongshenng Dec 11, 2024
1d18254
Merge branch 'add-flwr-stop-protos' into add-context-abort
chongshenng Dec 11, 2024
bc55bab
Merge branch 'add-context-abort' into add-serverapp-abort
chongshenng Dec 11, 2024
d2db892
Update
chongshenng Dec 11, 2024
ab01f74
Merge main
chongshenng Dec 11, 2024
1b3dbcd
Cleanup
chongshenng Dec 11, 2024
efd7e06
.
chongshenng Dec 11, 2024
9b5c083
Fix
chongshenng Dec 11, 2024
5e2c210
Add docstring to StopRunException
chongshenng Dec 11, 2024
83031ba
Merge branch 'add-serverapp-abort' into add-fleet-abort
chongshenng Dec 11, 2024
45468ef
Add run_id
chongshenng Dec 11, 2024
917c375
Merge branch 'main' into add-context-abort
danieljanes Dec 11, 2024
8a2a317
Better abort
chongshenng Dec 11, 2024
5492ebf
Merge branch 'main' into add-context-abort
chongshenng Dec 11, 2024
c70f360
Merge branch 'add-context-abort' into add-serverapp-abort
chongshenng Dec 11, 2024
43d70ef
Fix ruff
chongshenng Dec 11, 2024
cae81a1
Merge branch 'add-context-abort' into add-serverapp-abort
chongshenng Dec 11, 2024
8bcbf4f
Merge branch 'add-serverapp-abort' into add-fleet-abort
chongshenng Dec 11, 2024
94d7905
Better abort
chongshenng Dec 11, 2024
78e41b3
Add test
chongshenng Dec 11, 2024
45a32b7
Fix formatting
chongshenng Dec 11, 2024
87061be
Init
chongshenng Dec 11, 2024
3064132
Fix
chongshenng Dec 11, 2024
d36d29c
Fix
chongshenng Dec 11, 2024
4b79768
Init
chongshenng Dec 11, 2024
3cd0493
Fix mypy
chongshenng Dec 12, 2024
94b0ff5
Refactor abort\_if
chongshenng Dec 12, 2024
d536bda
Rename to utils.py
chongshenng Dec 12, 2024
1b2a6bf
Merge branch 'add-context-abort' into add-serverapp-abort
chongshenng Dec 12, 2024
dc9b4d2
Add linebreaks
chongshenng Dec 12, 2024
1706b0d
Merge branch 'main' into add-context-abort
chongshenng Dec 12, 2024
a395dbe
Update docstring
chongshenng Dec 12, 2024
366ae95
Merge main
chongshenng Dec 12, 2024
72ccbdf
Merge branch 'add-serverapp-abort' into add-fleet-abort
chongshenng Dec 12, 2024
eee64b3
Refactor FleetServicer abort
chongshenng Dec 12, 2024
d301b83
Fix mypy
chongshenng Dec 12, 2024
a4fbe35
Fix pylint
chongshenng Dec 12, 2024
bc973c5
Make run_id not optional
chongshenng Dec 12, 2024
261fdde
Fix
chongshenng Dec 12, 2024
ec3ac72
Remove delete_tasks on callback
chongshenng Dec 12, 2024
8bf13a5
Improve test
chongshenng Dec 12, 2024
55ecab4
Merge branch 'main' into add-linkstate-get-taskins
chongshenng Dec 12, 2024
b4fb037
Merge branch 'add-linkstate-get-taskins' into delete-stopped-run-link…
chongshenng Dec 12, 2024
c3e09dc
Simplify delete_tasks calling
chongshenng Dec 12, 2024
64e4e46
Remove unneeded assert
chongshenng Dec 12, 2024
aa4e07e
Merge branch 'add-linkstate-get-taskins' into delete-stopped-run-link…
chongshenng Dec 12, 2024
5c07773
Address comments
chongshenng Dec 12, 2024
0888f96
Merge branch 'add-linkstate-get-taskins' into delete-stopped-run-link…
chongshenng Dec 12, 2024
28ff942
Merge main
chongshenng Dec 12, 2024
e8805ac
Fix
chongshenng Dec 12, 2024
45919df
Merge add get TaskIns
chongshenng Dec 12, 2024
250c536
Merge branch 'delete-stopped-run-linkstate-msgs' into add-fleet-abort
chongshenng Dec 12, 2024
0a14d3f
Init
chongshenng Dec 12, 2024
91db42f
Remove unused pylint ignore
chongshenng Dec 12, 2024
615fbdd
Fix tests
chongshenng Dec 12, 2024
bfe410d
Update tests
chongshenng Dec 12, 2024
b37752e
Merge main
chongshenng Dec 14, 2024
eeaca75
Clean
chongshenng Dec 14, 2024
51cfd12
Make consistent
chongshenng Dec 14, 2024
6f79744
Merge branch 'main' into add-fleet-abort
danieljanes Dec 15, 2024
2a83370
Address comments
chongshenng Dec 15, 2024
ca53ad7
Address comments
chongshenng Dec 16, 2024
5750c58
Merge branch 'main' into add-fleet-abort
danieljanes Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from flwr.common.logger import log, warn_deprecated_feature
from flwr.common.message import Error
from flwr.common.retry_invoker import RetryInvoker, RetryState, exponential
from flwr.common.typing import Fab, Run, UserConfig
from flwr.common.typing import Fab, Run, RunNotRunningException, UserConfig
from flwr.proto.clientappio_pb2_grpc import add_ClientAppIoServicer_to_server
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server
from flwr.server.superlink.linkstate.utils import generate_rand_int_from_bytes
Expand Down Expand Up @@ -612,6 +612,17 @@ def _on_backoff(retry_state: RetryState) -> None:
send(reply_message)
log(INFO, "Sent reply")

except RunNotRunningException:
log(INFO, "")
log(
INFO,
"SuperNode stopped execution. "
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
"Run ID %s is not in `RUNNING` status.",
run_id,
)
log(INFO, "")
continue
chongshenng marked this conversation as resolved.
Show resolved Hide resolved

except StopIteration:
sleep_duration = 0
break
Expand Down
1 change: 1 addition & 0 deletions src/py/flwr/client/clientapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def run_clientapp( # pylint: disable=R0914

# Execute ClientApp
reply_message = client_app(message=message, context=context)

except Exception as ex: # pylint: disable=broad-exception-caught
# Don't update/change NodeState

Expand Down
14 changes: 10 additions & 4 deletions src/py/flwr/client/grpc_rere_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from flwr.common.message import Message, Metadata
from flwr.common.retry_invoker import RetryInvoker
from flwr.common.serde import message_from_taskins, message_to_taskres, run_from_proto
from flwr.common.typing import Fab, Run
from flwr.common.typing import Fab, Run, RunNotRunningException
from flwr.proto.fab_pb2 import GetFabRequest, GetFabResponse # pylint: disable=E0611
from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611
CreateNodeRequest,
Expand Down Expand Up @@ -155,10 +155,16 @@ def grpc_request_response( # pylint: disable=R0913,R0914,R0915,R0917
ping_thread: Optional[threading.Thread] = None
ping_stop_event = threading.Event()

def _should_giveup_fn(e: Exception) -> bool:
if e.code() == grpc.StatusCode.PERMISSION_DENIED: # type: ignore
raise RunNotRunningException
if e.code() == grpc.StatusCode.UNAVAILABLE: # type: ignore
return False
return True

# Restrict retries to cases where the status code is UNAVAILABLE
retry_invoker.should_giveup = (
lambda e: e.code() != grpc.StatusCode.UNAVAILABLE # type: ignore
)
# If the status code is PERMISSION_DENIED, additionally raise RunNotRunningException
retry_invoker.should_giveup = _should_giveup_fn

###########################################################################
# ping/create_node/delete_node/receive/send/get_run functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ def push_task_res(request: PushTaskResRequest, state: LinkState) -> PushTaskResR
return response


def get_run(
request: GetRunRequest, state: LinkState # pylint: disable=W0613
) -> GetRunResponse:
def get_run(request: GetRunRequest, state: LinkState) -> GetRunResponse:
"""Get run information."""
run = state.get_run(request.run_id)

Expand Down