Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add get_pending_run_id method to LinkState #4357

Merged
merged 39 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
049e3d4
init
panh99 Sep 18, 2024
52721e0
rename and introduce constants
panh99 Sep 18, 2024
6779654
fix field name
panh99 Sep 18, 2024
0646b5c
fix comments
panh99 Sep 18, 2024
cfbd1ec
Merge branch 'main' into run-mgmt-state
panh99 Sep 19, 2024
8d38b65
format
panh99 Sep 19, 2024
8d19b85
Merge branch 'main' into run-mgmt-state
panh99 Sep 23, 2024
ee578b9
rename enum
panh99 Sep 23, 2024
2e75d26
rename dataclass
panh99 Sep 23, 2024
85579df
update utils
panh99 Sep 23, 2024
68b3154
update states
panh99 Sep 23, 2024
e219662
fix sqlite
panh99 Sep 23, 2024
3f5a116
Merge branch 'main' into run-mgmt-state
danieljanes Sep 23, 2024
3d6c3bb
Merge branch 'main' into run-mgmt-state
panh99 Sep 23, 2024
aeac5b1
init sqlite_state_utils
panh99 Sep 23, 2024
0eb8ad1
format
panh99 Sep 23, 2024
8765100
add timestamps for in memory state
panh99 Sep 23, 2024
4e16416
keep utils in the same file
panh99 Sep 24, 2024
b7e931c
Merge branch 'main' into run-mgmt-state
panh99 Sep 27, 2024
30e4649
rename variables and improve logging
panh99 Sep 27, 2024
521b38f
Merge remote-tracking branch 'origin/main' into run-mgmt-state
panh99 Oct 21, 2024
9c79cb5
format
panh99 Oct 21, 2024
9474f61
Merge branch 'main' into run-mgmt-state
panh99 Oct 22, 2024
3f74076
correct wording
panh99 Oct 22, 2024
71bc27c
restore changes
panh99 Oct 22, 2024
cbab64e
Update src/py/flwr/server/superlink/linkstate/in_memory_linkstate.py
panh99 Oct 23, 2024
a55811b
add pending
panh99 Oct 23, 2024
c3bac06
apply set
panh99 Oct 23, 2024
55f740f
Merge branch 'main' into run-mgmt-state
panh99 Oct 23, 2024
9b49a20
Merge branch 'main' into run-mgmt-state
panh99 Oct 23, 2024
a6aed95
apply suggestion
panh99 Oct 23, 2024
48828ec
init
jafermarq Oct 23, 2024
8012662
fix
jafermarq Oct 23, 2024
639ec38
fix
jafermarq Oct 23, 2024
ae9fd4d
+ Pan`s suggestion
jafermarq Oct 23, 2024
88ae666
w/ tests
jafermarq Oct 23, 2024
9d4c908
tweak
jafermarq Oct 23, 2024
0a26fca
Merge branch 'main' into get-pending-run-id-linkstate
jafermarq Oct 23, 2024
a9c40b2
fix for py3.9
jafermarq Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/py/flwr/server/superlink/linkstate/in_memory_linkstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,19 @@ def update_run_status(self, run_id: int, new_status: RunStatus) -> bool:
run_record.status = new_status
return True

def get_pending_run_id(self) -> Optional[int]:
"""Get the `run_id` of a run with `Status.PENDING` status, if any."""
pending_run_id = None

# Loop through all registered runs
for run_id, run_rec in self.run_ids.items():
# Break once a pending run is found
if run_rec.status.status == Status.PENDING:
pending_run_id = run_id
break

return pending_run_id

def acknowledge_ping(self, node_id: int, ping_interval: float) -> bool:
"""Acknowledge a ping received from a node, serving as a heartbeat."""
with self.lock:
Expand Down
11 changes: 11 additions & 0 deletions src/py/flwr/server/superlink/linkstate/linkstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ def update_run_status(self, run_id: int, new_status: RunStatus) -> bool:
True if the status update is successful; False otherwise.
"""

@abc.abstractmethod
def get_pending_run_id(self) -> Optional[int]:
"""Get the `run_id` of a run with `Status.PENDING` status.

Returns
-------
Optional[int]
The `run_id` of a `Run` that is pending to be started; None if
there is no Run pending.
"""

@abc.abstractmethod
def store_server_private_public_key(
self, private_key: bytes, public_key: bytes
Expand Down
20 changes: 20 additions & 0 deletions src/py/flwr/server/superlink/linkstate/linkstate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ def test_create_and_get_run(self) -> None:
assert run.fab_hash == "9f86d08"
assert run.override_config["test_key"] == "test_value"

def test_get_pending_run_id(self) -> None:
"""Test if get_pending_run_id works correctly."""
# Prepare
state = self.state_factory()
_ = state.create_run(None, None, "9f86d08", {"test_key": "test_value"})
run_id2 = state.create_run(None, None, "fffffff", {"mock_key": "mock_value"})
state.update_run_status(run_id2, RunStatus(Status.STARTING, "", ""))

# Execute
pending_run_id = state.get_pending_run_id()
assert pending_run_id is not None
run_status_dict = state.get_run_status({pending_run_id})
assert run_status_dict[pending_run_id].status == Status.PENDING

# Change state
state.update_run_status(pending_run_id, RunStatus(Status.STARTING, "", ""))
# Attempt get pending run
pending_run_id = state.get_pending_run_id()
assert pending_run_id is None

def test_get_and_update_run_status(self) -> None:
"""Test if get_run_status and update_run_status work correctly."""
# Prepare
Expand Down
12 changes: 12 additions & 0 deletions src/py/flwr/server/superlink/linkstate/sqlite_linkstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,18 @@ def update_run_status(self, run_id: int, new_status: RunStatus) -> bool:
self.query(query % timestamp_fld, data)
return True

def get_pending_run_id(self) -> Optional[int]:
"""Get the `run_id` of a run with `Status.PENDING` status, if any."""
pending_run_id = None

# Fetch all runs with unset `starting_at` (i.e. they are in PENDING status)
query = "SELECT * FROM run WHERE starting_at = '' LIMIT 1;"
rows = self.query(query)
if rows:
pending_run_id = convert_sint64_to_uint64(rows[0]["run_id"])

return pending_run_id

def acknowledge_ping(self, node_id: int, ping_interval: float) -> bool:
"""Acknowledge a ping received from a node, serving as a heartbeat."""
sint64_node_id = convert_uint64_to_sint64(node_id)
Expand Down