Skip to content

Commit

Permalink
[Core][ConcurrencyGroup] Fix blocking task in default group block tas…
Browse files Browse the repository at this point in the history
…ks in other group. (#20525)

Why are these changes needed?
If max concurrency is 1 in default group, a blocking task executing in default group will block the following tasks in different group. See reproduction script in #20475

The issue is due to tasks executing in the default concurrent group run in the main task execution thread, and tasks in other concurrent groups will be blocked if the main task execution thread is blocked.

This PR only changes concurrent actor behavior that default group will not block other groups.

Related issue number
Fix #20475
  • Loading branch information
jovany-wang authored Nov 25, 2021
1 parent d725457 commit cd2b83a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
30 changes: 30 additions & 0 deletions java/test/src/main/java/io/ray/test/ConcurrencyGroupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.ray.api.concurrencygroup.ConcurrencyGroup;
import io.ray.api.concurrencygroup.ConcurrencyGroupBuilder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -159,4 +160,33 @@ public void testMaxConcurrencyForGroups() {
Assert.assertTrue(ret7.get());
Assert.assertTrue(ret8.get());
}

private static class ConcurrencyActor2 {

public String f1() throws InterruptedException {
TimeUnit.MINUTES.sleep(100);
return "never returned";
}

public String f2() {
return "ok";
}
}

/// This case tests that blocking task in default group will block other groups.
/// See https://github.com/ray-project/ray/issues/20475
@Test(groups = {"cluster"})
public void testDefaultCgDoNotBlockOthers() {
ConcurrencyGroup group =
new ConcurrencyGroupBuilder<ConcurrencyActor2>()
.setName("group")
.setMaxConcurrency(1)
.addMethod(ConcurrencyActor2::f2)
.build();

ActorHandle<ConcurrencyActor2> myActor =
Ray.actor(ConcurrencyActor2::new).setConcurrencyGroups(group).remote();
myActor.task(ConcurrencyActor2::f1).remote();
Assert.assertEquals(myActor.task(ConcurrencyActor2::f2).remote().get(), "ok");
}
}
9 changes: 6 additions & 3 deletions src/ray/core_worker/transport/thread_pool_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ PoolManager::PoolManager(const std::vector<ConcurrencyGroup> &concurrency_groups
}
name_to_thread_pool_index_[name] = pool;
}
// If max concurrency of default group is 1, the tasks of default group
// will be performed in main thread instead of any executor pool.
if (default_group_max_concurrency > 1) {
// If max concurrency of default group is 1 and there is no other concurrency group of
// this actor, the tasks of default group will be performed in main thread instead of
// any executor pool, otherwise tasks in any concurrency group should be performed in
// the thread pools instead of main thread.
if (default_group_max_concurrency > 1 || !concurrency_groups.empty()) {
/// The concurrency group is enabled.
default_thread_pool_ =
std::make_shared<BoundedExecutor>(default_group_max_concurrency);
}
Expand Down

0 comments on commit cd2b83a

Please sign in to comment.