Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use daemon thread and replace time.sleep with Event.wait #3224

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/py/flwr/client/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def start_ping_loop(
asynchronous ping operations. The loop can be terminated through the provided stop
event.
"""
thread = threading.Thread(target=_ping_loop, args=(ping_fn, stop_event))
thread = threading.Thread(
target=_ping_loop, args=(ping_fn, stop_event), daemon=True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@panh99 can you share your opinion on this from the Python docs

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property or the daemon constructor argument.

Note Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an [Event](https://docs.python.org/3.10/library/threading.html#threading.Event).

There is a “main thread” object; this corresponds to the initial thread of control in the Python program. It is not a daemon thread.

There is the possibility that “dummy thread objects” are created. These are thread objects corresponding to “alien threads”, which are threads of control started outside the threading module, such as directly from C code. Dummy thread objects have limited functionality; they are always considered alive and daemonic, and cannot be join()ed. They are never deleted, since it is impossible to detect the termination of alien threads.

My understanding is that background thread may potentially not be released and this could actually create issues.

Copy link
Contributor Author

@panh99 panh99 Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanertopal @danieljanes It's true that bg thread may potentially be cleaned up properly if it's a daemon thread.
But in fact, the two threads can safely be daemon threads because they don't have critical tasks that must be cleaned up properly. In addition, the two threads can exit gracefully if the ClientApp exit gracefully, which will not cause the problem you mentioned.

The purpose of this change is that non-daemon threads now prevent the process from shutting down (i.e., invalidating kill or pkill). In the event that users use Ctrl + C or kill/pkill to terminate the ClientApp process, the main thread will terminate abruptly, without flagging stop events for the two threads. One of them, the heartbeat thread, will always be alive if the stop event is not flagged.

Here's a more detailed explanation from chat gpt.

Daemon Threads

  1. Definition: A daemon thread runs in the background and is killed automatically when the main program exits. This kind of thread does not prevent the program from exiting, and its tasks are abruptly stopped when the program terminates.

  2. Use Cases: Daemon threads are useful for tasks that run in the background and do not need to complete before the program ends. Common examples include background monitoring, periodic checks, or any service that should only run as long as the main program is running.

  3. Behavior: If the Python interpreter exits when only daemon threads are left, these threads are killed automatically. They do not get a chance to clean up, save state, or finish important work.

Non-Daemon Threads

  1. Definition: Non-daemon threads are the default in Python. The program will wait for these threads to complete before it terminates. They must be manually managed and explicitly joined to the main thread if you want the program to wait for these threads to finish.

  2. Use Cases: Non-daemon threads are suitable for tasks that must complete before the program can exit safely, such as data processing, file writing, or other cleanup tasks.

  3. Behavior: The Python program will not terminate until all non-daemon threads have completed their tasks, unless explicitly terminated or killed. This ensures that important tasks are completed and resources are properly released.

Choosing Between Daemon and Non-Daemon Threads

  • Task Criticality: Use non-daemon threads for critical tasks that must complete before your application closes, such as saving output to a file or sending a final network request. Choose daemon threads for non-critical tasks that should not delay the shutdown of the application, like periodic updates or heartbeats that only need to run while the main application is active.

  • Shutdown Behavior: If you need robust cleanup and you want to ensure certain actions are performed even if your program is interrupted, use non-daemon threads and handle their termination gracefully. If abrupt termination is acceptable for background tasks, daemon threads are a suitable choice.

  • Control and Management: Non-daemon threads require more explicit management (using thread joins, managing shutdown signals, etc.), whereas daemon threads are simpler in terms of lifecycle management because they automatically terminate with the main program.

Copy link
Member

@tanertopal tanertopal Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I understand. Overall, I am ok with this change. We should discuss another time how we can make sure passing signals for graceful shutdown globally could be improved.

)
thread.start()

return thread
5 changes: 3 additions & 2 deletions src/py/flwr/server/compat/app_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


import threading
import time
from typing import Dict, Tuple

from ..client_manager import ClientManager
Expand Down Expand Up @@ -60,6 +59,7 @@ def start_update_client_manager_thread(
client_manager,
f_stop,
),
daemon=True,
)
thread.start()

Expand Down Expand Up @@ -99,4 +99,5 @@ def _update_client_manager(
raise RuntimeError("Could not register node.")

# Sleep for 3 seconds
time.sleep(3)
if not f_stop.is_set():
f_stop.wait(3)
39 changes: 24 additions & 15 deletions src/py/flwr/server/compat/app_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import time
import unittest
from threading import Event
from typing import Optional
from unittest.mock import Mock, patch

from ..client_manager import SimpleClientManager
Expand All @@ -29,30 +31,37 @@ class TestUtils(unittest.TestCase):
def test_start_update_client_manager_thread(self) -> None:
"""Test start_update_client_manager_thread function."""
# Prepare
sleep = time.sleep
sleep_patch = patch("time.sleep", lambda x: sleep(x / 100))
sleep_patch.start()
expected_node_ids = list(range(100))
updated_expected_node_ids = list(range(80, 120))
driver = Mock()
driver.grpc_driver = Mock()
driver.run_id = 123
driver.get_node_ids.return_value = expected_node_ids
client_manager = SimpleClientManager()
original_wait = Event.wait

def custom_wait(self: Event, timeout: Optional[float] = None) -> None:
if timeout is not None:
timeout /= 100
original_wait(self, timeout)

# Execute
thread, f_stop = start_update_client_manager_thread(driver, client_manager)
# Wait until all nodes are registered via `client_manager.sample()`
client_manager.sample(len(expected_node_ids))
# Retrieve all nodes in `client_manager`
node_ids = {proxy.node_id for proxy in client_manager.all().values()}
# Update the GetNodesResponse and wait until the `client_manager` is updated
driver.get_node_ids.return_value = updated_expected_node_ids
sleep(0.1)
# Retrieve all nodes in `client_manager`
updated_node_ids = {proxy.node_id for proxy in client_manager.all().values()}
# Stop the thread
f_stop.set()
# Patching Event.wait with our custom function
with patch.object(Event, "wait", new=custom_wait):
thread, f_stop = start_update_client_manager_thread(driver, client_manager)
# Wait until all nodes are registered via `client_manager.sample()`
client_manager.sample(len(expected_node_ids))
# Retrieve all nodes in `client_manager`
node_ids = {proxy.node_id for proxy in client_manager.all().values()}
# Update the GetNodesResponse and wait until the `client_manager` is updated
driver.get_node_ids.return_value = updated_expected_node_ids
time.sleep(0.1)
# Retrieve all nodes in `client_manager`
updated_node_ids = {
proxy.node_id for proxy in client_manager.all().values()
}
# Stop the thread
f_stop.set()

# Assert
assert node_ids == set(expected_node_ids)
Expand Down