Skip to content

Commit

Permalink
Minor fixes for SequentialExecutor and its tests
Browse files Browse the repository at this point in the history
  • Loading branch information
idelpivnitskiy committed Dec 8, 2023
1 parent d288479 commit 59afaad
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio

private volatile long nextResubscribeTime = RESUBSCRIBING;

// writes to these fields protected by `executeSequentially` but they can be read from any thread.
// writes to these fields protected by `sequentialExecutor` but they can be read from any thread.
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
private volatile boolean isClosed;

Expand Down Expand Up @@ -132,11 +132,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.connectionFactory = requireNonNull(connectionFactory);
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
this.sequentialExecutor = new SequentialExecutor((uncaughtException) -> {
LOGGER.error("{}: Uncaught exception in SequentialExecutor triggered closing of the load balancer.",
this, uncaughtException);
closeAsync().subscribe();
});
this.sequentialExecutor = new SequentialExecutor((uncaughtException) ->
LOGGER.error("{}: Uncaught exception in SequentialExecutor", this, uncaughtException));
this.asyncCloseable = toAsyncCloseable(this::doClose);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
eventStream.ignoreElements().subscribe();
Expand Down Expand Up @@ -177,8 +174,7 @@ private Completable doClose(final boolean graceful) {
SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() :
// We only want to empty the host list on error if we're closing non-gracefully.
compositeCloseable.closeAsync().beforeOnError(t ->
sequentialExecutor.execute(() -> usedHosts = emptyList()))
)
sequentialExecutor.execute(() -> usedHosts = emptyList())))
// we want to always empty out the host list if we complete successfully
.beforeOnComplete(() -> sequentialExecutor.execute(() -> usedHosts = emptyList())))
.subscribe(processor);
Expand Down Expand Up @@ -268,10 +264,8 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
}

private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
assert events != null && !events.isEmpty();

if (isClosed) {
// nothing to do if the load balancer is closed.
if (isClosed || events.isEmpty()) {
// nothing to do if the load balancer is closed or there are no events.
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@

/**
* A concurrency primitive for providing thread safety without using locks.
*
* <p>
* A {@link SequentialExecutor} is queue of tasks that are executed one at a time in the order they were
* received. This provides a way to serialize work between threads without needing to use locks which can
* result in thread contention and thread deadlock scenarios.
*/
final class SequentialExecutor implements Executor {

/**
* Handler of exceptions thrown by submitted Runnables.
* Handler of exceptions thrown by submitted {@link Runnable}s.
*/
@FunctionalInterface
public interface ExceptionHandler {
interface ExceptionHandler {

/**
* Handle the exception thrown from a submitted Runnable.
* Handle the exception thrown from a submitted {@link Runnable}.
* <p>
* Note that if this method throws the behavior is undefined.
*
* @param ex the Throwable thrown by the Runnable.
* @param ex the {@link Throwable} thrown by the {@link Runnable}.
*/
void onException(Throwable ex);
}
Expand All @@ -56,13 +57,13 @@ public interface ExceptionHandler {

@Override
public void execute(Runnable command) {
// Make sure we propagate any sync contexts.
// Make sure we propagate async contexts.
command = AsyncContext.wrapRunnable(requireNonNull(command, "command"));
final Cell next = new Cell(command);
Cell t = tail.getAndSet(next);
if (t != null) {
Cell prev = tail.getAndSet(next);
if (prev != null) {
// Execution already started. Link the old tail to the new tail.
t.next = next;
prev.next = next;
} else {
// We are the first element in the queue so it's our responsibility to drain.
// Note that the getAndSet establishes the happens before with relation to the previous draining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

class SequentialExecutorTest {


private SequentialExecutor.ExceptionHandler exceptionHandler;
private Executor executor;

Expand All @@ -50,29 +52,28 @@ void setup() {
void tasksAreExecuted() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
// submit two tasks and they should both complete.
executor.execute(() -> latch.countDown());
executor.execute(() -> latch.countDown());
executor.execute(latch::countDown);
executor.execute(latch::countDown);
latch.await();
}

@Test
void firstTaskIsExecutedByCallingThread() {
AtomicReference<Thread> executorThread = new AtomicReference<>();
executor.execute(() -> executorThread.set(Thread.currentThread()));
assertNotNull(executorThread.get());
assertEquals(Thread.currentThread(), executorThread.get());
assertThat(executorThread.get(), is(notNullValue()));
assertThat(executorThread.get(), is(equalTo(Thread.currentThread())));
}

@Test
void thrownExceptionsArePropagatedToTheExceptionHandler() {
AtomicReference<Throwable> caught = new AtomicReference<>();
exceptionHandler = caught::set;
executor = new SequentialExecutor(exceptionHandler);
final RuntimeException ex = new RuntimeException("expected");
executor.execute(() -> {
throw ex;
throw DELIBERATE_EXCEPTION;
});
assertEquals(ex, caught.get());
assertThat(caught.get(), is(sameInstance(DELIBERATE_EXCEPTION)));
}

@Test
Expand All @@ -99,16 +100,16 @@ void queuedTasksAreExecuted() throws InterruptedException {
// the model, the test should be adjusted to conform to the desired behavior.
final AtomicReference<Thread> executingThread = new AtomicReference<>();
executor.execute(() -> executingThread.set(Thread.currentThread()));
assertNull(executingThread.get());
assertThat(executingThread.get(), is(nullValue()));

// Now unblock the initial thread and it should also run the second task.
l2.countDown();
t.join();
assertEquals(t, executingThread.get());
assertThat(executingThread.get(), is(sameInstance(t)));
}

@Test
void tasksAreNotRenentrant() {
void tasksAreNotReentrant() {
Queue<Integer> order = new ArrayDeque<>();
executor.execute(() -> {
// this should be queued for later.
Expand All @@ -120,7 +121,7 @@ void tasksAreNotRenentrant() {
}

@Test
void noStackOverflows() throws Exception {
void noStackOverflows() {
final int maxDepth = 10_000;
// If we substitute `executor` with `(runnable) -> runnable.run()` we get a stack overflow.
final Runnable runnable = new Runnable() {
Expand Down Expand Up @@ -149,7 +150,7 @@ void manyThreadsCanSubmitTasksConcurrently() throws InterruptedException {
try {
ready.countDown();
barrier.await();
executor.execute(() -> completed.countDown());
executor.execute(completed::countDown);
} catch (Exception ex) {
throw new AssertionError("unexpected error", ex);
}
Expand Down Expand Up @@ -189,11 +190,11 @@ void preservesAsyncContext() throws InterruptedException {

AsyncContext.put(key, value);
executor.execute(() -> observedContextValue.set(AsyncContext.context().get(key)));
assertNull(observedContextValue.get());
assertThat(observedContextValue.get(), is(nullValue()));

// Now unblock the initial thread and it should also run the second task.
l2.countDown();
t.join();
assertEquals(value, observedContextValue.get());
assertThat(observedContextValue.get(), is(value));
}
}

0 comments on commit 59afaad

Please sign in to comment.