Skip to content

Commit

Permalink
feat(nats-connection): implement named executor thread factories (#1254)
Browse files Browse the repository at this point in the history
* feat(nats-connection): implement named executor thread factories

* Refactor executor creation to use configurable thread factories PR#1254

* Fix executor tests PR#1254

---------

Co-authored-by: Scott Fauerbach <scottfauerbach@gmail.com>
  • Loading branch information
krzysiekigry and scottf authored Nov 26, 2024
1 parent a7f40d8 commit a76dd02
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
52 changes: 52 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static io.nats.client.support.Encoding.*;
import static io.nats.client.support.NatsConstants.*;
Expand Down Expand Up @@ -226,6 +227,11 @@ public class Options {
*/
public static final boolean DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL = false;

/**
* Default supplier for creating a single-threaded executor service.
*/
public static final Supplier<ExecutorService> DEFAULT_SINGLE_THREAD_EXECUTOR = Executors::newSingleThreadExecutor;

// ----------------------------------------------------------------------------------------------------
// ENVIRONMENT PROPERTIES
// ----------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -644,6 +650,8 @@ public class Options {
private final boolean traceConnection;

private final ExecutorService executor;
private final ThreadFactory connectThreadFactory;
private final ThreadFactory callbackThreadFactory;
private final ServerPool serverPool;
private final DispatcherFactory dispatcherFactory;

Expand Down Expand Up @@ -759,6 +767,8 @@ public static class Builder {
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ThreadFactory connectThreadFactory;
private ThreadFactory callbackThreadFactory;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
private Proxy proxy;

Expand Down Expand Up @@ -1553,6 +1563,28 @@ public Builder executor(ExecutorService executor) {
return this;
}

/**
* Sets custom thread factory for the executor service
*
* @param threadFactory the thread factory to use for the executor service
* @return the Builder for chaining
*/
public Builder connectThreadFactory(ThreadFactory threadFactory) {
this.connectThreadFactory = threadFactory;
return this;
}

/**
* Sets custom thread factory for the executor service
*
* @param threadFactory the thread factory to use for the executor service
* @return the Builder for chaining
*/
public Builder callbackThreadFactory(ThreadFactory threadFactory) {
this.callbackThreadFactory = threadFactory;
return this;
}

/**
* Add an HttpRequest interceptor which can be used to modify the HTTP request when using websockets
*
Expand Down Expand Up @@ -1914,6 +1946,8 @@ public Builder(Options o) {
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
this.callbackThreadFactory = o.callbackThreadFactory;
this.connectThreadFactory = o.connectThreadFactory;
this.httpRequestInterceptors = o.httpRequestInterceptors;
this.proxy = o.proxy;

Expand Down Expand Up @@ -1979,6 +2013,8 @@ private Options(Builder b) {
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
this.callbackThreadFactory = b.callbackThreadFactory;
this.connectThreadFactory = b.connectThreadFactory;
this.httpRequestInterceptors = b.httpRequestInterceptors;
this.proxy = b.proxy;

Expand All @@ -2002,6 +2038,22 @@ public ExecutorService getExecutor() {
return this.executor;
}

/**
* @return the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
*/
public ExecutorService getCallbackExecutor() {
return this.callbackThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
}

/**
* @return the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
*/
public ExecutorService getConnectExecutor() {
return this.connectThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
}

/**
* @return the list of HttpRequest interceptors.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ class NatsConnection implements Connection {

timeTraceLogger.trace("creating executors");
this.executor = options.getExecutor();
this.callbackRunner = Executors.newSingleThreadExecutor();
this.connectExecutor = Executors.newSingleThreadExecutor();
this.callbackRunner = options.getCallbackExecutor();
this.connectExecutor = options.getConnectExecutor();

timeTraceLogger.trace("creating reader and writer");
this.reader = new NatsConnectionReader(this);
Expand Down
29 changes: 25 additions & 4 deletions src/test/java/io/nats/client/OptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import static io.nats.client.Options.*;
import static io.nats.client.support.Encoding.base64UrlEncodeToString;
Expand Down Expand Up @@ -980,6 +977,30 @@ public void testDefaultExecutor() throws Exception {
assertTrue(name.startsWith(Options.DEFAULT_THREAD_NAME_PREFIX));
}

@Test
public void testCallbackExecutor() throws ExecutionException, InterruptedException, TimeoutException {
ThreadFactory threadFactory = r -> new Thread(r, "test");
Options options = new Options.Builder()
.callbackThreadFactory(threadFactory)
.build();
Future<?> callbackFuture = options.getCallbackExecutor().submit(() -> {
assertEquals("test", Thread.currentThread().getName());
});
callbackFuture.get(5, TimeUnit.SECONDS);
}

@Test
public void testConnectExecutor() throws ExecutionException, InterruptedException, TimeoutException {
ThreadFactory threadFactory = r -> new Thread(r, "test");
Options options = new Options.Builder()
.connectThreadFactory(threadFactory)
.build();
Future<?> connectFuture = options.getConnectExecutor().submit(() -> {
assertEquals("test", Thread.currentThread().getName());
});
connectFuture.get(5, TimeUnit.SECONDS);
}

String[] schemes = new String[] { "NATS", "unk", "tls", "opentls", "ws", "wss", "nats"};
boolean[] secures = new boolean[] { false, false, true, true, false, true, false};
boolean[] wses = new boolean[] { false, false, false, false, true, true, false};
Expand Down

0 comments on commit a76dd02

Please sign in to comment.