Skip to content

Commit

Permalink
Optional dispatching with executor instead of blocking (#1083)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Feb 27, 2024
1 parent fbfef14 commit 6c75530
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ public class Options {
* {@link Builder#useTimeoutException()}.
*/
public static final String PROP_USE_TIMEOUT_EXCEPTION = PFX + "use.timeout.exception";
/**
* Property used to a dispatcher that dispatches messages via the executor service instead of with a blocking call.
* {@link Builder#useDispatcherWithExecutor()}.
*/
public static final String PROP_USE_DISPATCHER_WITH_EXECUTOR = PFX + "use.dispatcher.with.executor";

// ----------------------------------------------------------------------------------------------------
// PROTOCOL CONNECT OPTION CONSTANTS
Expand Down Expand Up @@ -586,6 +591,7 @@ public class Options {
private final boolean ignoreDiscoveredServers;
private final boolean tlsFirst;
private final boolean useTimeoutException;
private final boolean useDispatcherWithExecutor;

private final AuthHandler authHandler;
private final ReconnectDelayHandler reconnectDelayHandler;
Expand Down Expand Up @@ -696,6 +702,7 @@ public static class Builder {
private boolean ignoreDiscoveredServers = false;
private boolean tlsFirst = false;
private boolean useTimeoutException = false;
private boolean useDispatcherWithExecutor = false;
private ServerPool serverPool = null;
private DispatcherFactory dispatcherFactory = null;

Expand Down Expand Up @@ -823,6 +830,7 @@ public Builder properties(Properties props) {
booleanProperty(props, PROP_IGNORE_DISCOVERED_SERVERS, b -> this.ignoreDiscoveredServers = b);
booleanProperty(props, PROP_TLS_FIRST, b -> this.tlsFirst = b);
booleanProperty(props, PROP_USE_TIMEOUT_EXCEPTION, b -> this.useTimeoutException = b);
booleanProperty(props, PROP_USE_DISPATCHER_WITH_EXECUTOR, b -> this.useDispatcherWithExecutor = b);

classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
Expand Down Expand Up @@ -1537,6 +1545,11 @@ public Builder useTimeoutException() {
return this;
}

public Builder useDispatcherWithExecutor() {
this.useDispatcherWithExecutor = true;
return this;
}

/**
* Set the ServerPool implementation for connections to use instead of the default implementation
* @param serverPool the implementation
Expand Down Expand Up @@ -1721,6 +1734,7 @@ public Builder(Options o) {
this.ignoreDiscoveredServers = o.ignoreDiscoveredServers;
this.tlsFirst = o.tlsFirst;
this.useTimeoutException = o.useTimeoutException;
this.useDispatcherWithExecutor = o.useDispatcherWithExecutor;

this.serverPool = o.serverPool;
this.dispatcherFactory = o.dispatcherFactory;
Expand Down Expand Up @@ -1785,6 +1799,7 @@ private Options(Builder b) {
this.ignoreDiscoveredServers = b.ignoreDiscoveredServers;
this.tlsFirst = b.tlsFirst;
this.useTimeoutException = b.useTimeoutException;
this.useDispatcherWithExecutor = b.useDispatcherWithExecutor;

this.serverPool = b.serverPool;
this.dispatcherFactory = b.dispatcherFactory;
Expand Down Expand Up @@ -2198,6 +2213,8 @@ public boolean useTimeoutException() {
return useTimeoutException;
}

public boolean useDispatcherWithExecutor() { return useDispatcherWithExecutor; }

/**
* Get the ServerPool implementation. If null, a default implementation is used.
* @return the ServerPool implementation
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/nats/client/impl/DispatcherFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
*/
public class DispatcherFactory {
NatsDispatcher createDispatcher(NatsConnection conn, MessageHandler handler) {
if (conn.getOptions().useDispatcherWithExecutor()) {
return new NatsDispatcherWithExecutor(conn, handler);
}
return new NatsDispatcher(conn, handler);
}
}
76 changes: 76 additions & 0 deletions src/main/java/io/nats/client/impl/NatsDispatcherWithExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.impl;

import io.nats.client.MessageHandler;

class NatsDispatcherWithExecutor extends NatsDispatcher {

NatsDispatcherWithExecutor(NatsConnection conn, MessageHandler handler) {
super(conn, handler);
}

@Override
public void run() {
try {
while (this.running.get()) { // start

NatsMessage msg = this.incoming.pop(this.waitForMessage);

if (msg != null) {
NatsSubscription sub = msg.getNatsSubscription();
if (sub != null && sub.isActive()) {
MessageHandler handler = subscriptionHandlers.get(sub.getSID());
if (handler == null) {
handler = defaultHandler;
}
// A dispatcher can have a null defaultHandler. You can't subscribe without a handler,
// but messages might come in while the dispatcher is being closed or after unsubscribe
// and the [non-default] handler has already been removed from subscriptionHandlers
if (handler != null) {
sub.incrementDeliveredCount();
this.incrementDeliveredCount();

MessageHandler finalHandler = handler;
connection.getExecutor().execute(() -> {
try {
finalHandler.onMessage(msg);
} catch (Exception exp) {
connection.processException(exp);
}

if (sub.reachedUnsubLimit()) {
connection.invalidate(sub);
}
});
}
}
}

if (breakRunLoop()) {
return;
}
}
}
catch (InterruptedException exp) {
if (this.running.get()){
this.connection.processException(exp);
} //otherwise we did it
}
finally {
this.running.set(false);
this.thread = null;
}
}
}

0 comments on commit 6c75530

Please sign in to comment.