Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify ThreadPoolExecutor usage in all Dashboard(Agent)Head. #47160

Merged
merged 10 commits into from
Aug 27, 2024

Conversation

rynewang
Copy link
Contributor

@rynewang rynewang commented Aug 15, 2024

Previously each Dashboard(Agent)HeadModule can have its own ThreadPoolExecutor. This PR makes a unified TPE in Dashboard(Agent)Head, and uses it everywhere. Also adds asyncio yield in DataOrganizer.organize() to avoid event loop blocking in DataOrganizer.organize for big time.

…ntHeads. Adds to DataOrgranizer.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rynewang one more change we'd do is to make frequency of data refresh configurable so that we can mitigate impact by suggesting to users to reduce update frequency

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@rynewang
Copy link
Contributor Author

@alexeykudinkin done. Now one can set RAY_ORGANIZE_DATA_INTERVAL_SECONDS, defaults to 2.

@rynewang rynewang added the go add ONLY when ready to merge, run all tests label Aug 15, 2024
@@ -81,7 +90,7 @@ async def organize(cls):
DataSource.core_worker_stats.reset(core_worker_stats)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is DataSource thread safe given now it's accessed in another thread as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

These custom data-structures aren't thread-safe. If we're planning to run this on TPE we need to cover these with locks

@alexeykudinkin
Copy link
Contributor

return StateAPIManager(state_api_data_source_client)
return StateAPIManager(
state_api_data_source_client,
thread_pool_executor=ThreadPoolExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided that we're planning on offloading CPU-bound tasks that however still hold on to GIL, we should limit # of threads in the TPE (by default TPE provisions at # of CPUs + 4 threads)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in head.py, I mean

@@ -47,6 +48,9 @@ def __init__(
# Public attributes are accessible for all agent modules.
self.ip = node_ip_address
self.minimal = minimal
self.thread_pool_executor = ThreadPoolExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comment above.

We need to

  • Isolate these TPEs to non critical operations (like refreshing stats/data used in UI)
  • Limit concurrency of these TPEs (to 2-4)

@@ -25,7 +25,7 @@
RETRY_REDIS_CONNECTION_TIMES = 10
CONNECT_REDIS_INTERNAL_SECONDS = 2
PURGE_DATA_INTERVAL_SECONDS = 60 * 10
ORGANIZE_DATA_INTERVAL_SECONDS = 2
ORGANIZE_DATA_INTERVAL_SECONDS = env_integer("RAY_ORGANIZE_DATA_INTERVAL_SECONDS", 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ORGANIZE_DATA_INTERVAL_SECONDS = env_integer("RAY_ORGANIZE_DATA_INTERVAL_SECONDS", 2)
ORGANIZE_DATA_INTERVAL_SECONDS = env_integer("RAY_DASHBOARD_STATS_UPDATING_INTERVAL", 2)

@@ -81,7 +90,7 @@ async def organize(cls):
DataSource.core_worker_stats.reset(core_worker_stats)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

These custom data-structures aren't thread-safe. If we're planning to run this on TPE we need to cover these with locks

@@ -125,6 +125,10 @@ def __init__(
self._modules_to_load = modules_to_load
self._modules_loaded = False

self._thread_pool_executor = ThreadPoolExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check comment above

@rynewang
Copy link
Contributor Author

todo:

  1. revert datacenter.py changes
  2. move node_stats change back to async ctx

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@rynewang rynewang changed the title Unify ThreadPoolExecutor usage in all Dashboard(Agent)Head and DataOrganizer. Unify ThreadPoolExecutor usage in all Dashboard(Agent)Head. Aug 24, 2024
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@rynewang
Copy link
Contributor Author

@jjyao @alexeykudinkin pls take a look

# Offloads the blocking operation to a thread pool executor. This also
# yields to the event loop.
workers = await get_or_create_event_loop().run_in_executor(
thread_pool_executor, cls.get_node_workers, node_id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_node_workers uses DataSource which is not thread safe, so we cannot just run it in a different thread.

For this PR, let's just call it in the main event loop thread and have yield inside the for loop.

We can optimize get_node_workers by stopping using ImmutableDict in a separate PR

python/ray/dashboard/head.py Outdated Show resolved Hide resolved
python/ray/dashboard/modules/node/node_head.py Outdated Show resolved Hide resolved
@jjyao
Copy link
Collaborator

jjyao commented Aug 25, 2024

After this PR, we should check every call of dashboard_utils.message_to_dict and run it in the background thread.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@@ -317,7 +313,7 @@ async def upload_package(self, req: Request):
try:
data = await req.read()
await get_or_create_event_loop().run_in_executor(
self._upload_package_thread_pool_executor,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it critical enough to deserve its own thread pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so because it's not latency critical either.

@rynewang rynewang enabled auto-merge (squash) August 26, 2024 23:22
@rynewang rynewang merged commit 6c7da02 into ray-project:master Aug 27, 2024
6 checks passed
@rynewang rynewang deleted the tpe-for-all branch August 27, 2024 07:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants