Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated bulkhead test to properly wait until Task 1 is enqueued #5562

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ static <T> CompletableFuture<T> invokeStatic(Supplier<T> supplier) {
return create().invoke(supplier);
}

/**
* Convenience method to avoid having to call {@link #create()}. Also accepts
* an {@code onStart} future to inform of async task startup.
*
* @param supplier supplier of value (or a method reference)
* @param onStart future completed when async task starts
* @param <T> type of returned value
* @return a future that is a "promise" of the future result
*/
static <T> CompletableFuture<T> invokeStatic(Supplier<T> supplier, CompletableFuture<Async> onStart) {
return new Builder()
.onStart(onStart)
.build()
.invoke(supplier);
}

/**
* A new builder to build a customized {@link Async} instance.
* @return a new builder
Expand All @@ -73,6 +89,7 @@ static Builder builder() {
*/
class Builder implements io.helidon.common.Builder<Builder, Async> {
private LazyValue<? extends ExecutorService> executor = FaultTolerance.executor();
private CompletableFuture<Async> onStart;

private Builder() {
}
Expand Down Expand Up @@ -104,8 +121,23 @@ public Builder executor(ExecutorService executor) {
return this;
}

/**
* Configure future that shall be completed when async task starts.
*
* @param onStart future completed when async task starts
* @return updated builder instance
*/
public Builder onStart(CompletableFuture<Async> onStart) {
this.onStart = onStart;
return this;
}

LazyValue<? extends ExecutorService> executor() {
return executor;
}

CompletableFuture<Async> onStart() {
return onStart;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
*/
class AsyncImpl implements Async {
private final LazyValue<? extends ExecutorService> executor;
private final CompletableFuture<Async> onStart;

AsyncImpl() {
this(Async.builder());
}

AsyncImpl(Builder builder) {
this.executor = builder.executor();
this.onStart = builder.onStart();
}

@Override
Expand All @@ -54,6 +56,9 @@ public boolean cancel(boolean mayInterruptIfRunning) {
Future<?> future = executor.get().submit(() -> {
Thread thread = Thread.currentThread();
thread.setName(thread.getName() + ": async");
if (onStart != null) {
onStart.complete(this);
}
try {
T t = supplier.get();
result.complete(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.helidon.logging.common.LogConfig;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static java.lang.System.Logger.Level.INFO;
Expand All @@ -42,24 +40,18 @@
class BulkheadTest {
private static final System.Logger LOGGER = System.getLogger(BulkheadTest.class.getName());

private static final long WAIT_TIMEOUT_MILLIS = 10000;

private CountDownLatch enqueuedSubmitted;
private static final long WAIT_TIMEOUT_MILLIS = 5000;

@BeforeAll
static void setupTest() {
LogConfig.configureRuntime();
}

@BeforeEach
void resetLatch() {
enqueuedSubmitted = new CountDownLatch(1);
}

@Disabled
@Test
void testBulkhead() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
// Create bulkhead of 1 with queue length 1
String name = "unit:testBulkhead";
CountDownLatch enqueuedSubmitted = new CountDownLatch(1);
Bulkhead bulkhead = Bulkhead.builder()
.limit(1)
.queueLength(1)
Expand Down Expand Up @@ -87,15 +79,19 @@ public <T> void enqueueing(Supplier<? extends T> supplier) {
CompletableFuture<Integer> enqueuedResult = Async.invokeStatic(
() -> bulkhead.invoke(enqueued::run));

// Wait until previous task is "likely" queued
// Wait until previous task is queued
if (!enqueuedSubmitted.await(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
fail("Task enqueued never submitted");
}
assertEventually(() -> bulkhead.stats().waitingQueueSize() == 1, WAIT_TIMEOUT_MILLIS);

// Submit new task that should be rejected
Task rejected = new Task(2);
CompletableFuture<Async> asyncRejected = new CompletableFuture<>();
CompletableFuture<Integer> rejectedResult = Async.invokeStatic(
() -> bulkhead.invoke(rejected::run));
() -> bulkhead.invoke(rejected::run),
asyncRejected);
asyncRejected.get(); // waits for async to start

assertThat(inProgress.isStarted(), is(true));
assertThat(inProgress.isBlocked(), is(true));
Expand Down Expand Up @@ -258,4 +254,15 @@ CompletableFuture<?> future() {
return future;
}
}

private static void assertEventually(Supplier<Boolean> predicate, long millis) throws InterruptedException {
long start = System.currentTimeMillis();
do {
if (predicate.get()) {
return;
}
Thread.sleep(100);
} while (System.currentTimeMillis() - start <= millis);
fail("Predicate failed after " + millis + " milliseconds");
}
}