Skip to content

Commit

Permalink
Atomic OnSubscribe fixes for MonoSendMany
Browse files Browse the repository at this point in the history
Slight ReactorNetty constant order tweak
  • Loading branch information
Stephane Maldini authored and smaldini committed May 6, 2019
1 parent 608e5b0 commit 9f65dd0
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 102 deletions.
197 changes: 104 additions & 93 deletions src/main/java/reactor/netty/ReactorNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,110 @@
*/
public final class ReactorNetty {

// System properties names


/**
* Specifies whether the channel ID will be prepended to the log message when possible.
* By default it will be prepended.
*/
static final boolean LOG_CHANNEL_INFO =
Boolean.parseBoolean(System.getProperty("reactor.netty.logChannelInfo", "true"));

/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";


/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
public static final String NATIVE = "reactor.netty.native";


/**
* Default max connections, if -1 will never wait to acquire before opening a new
* connection in an unbounded fashion. Fallback to
* available number of processors (but with a minimum value of 16)
*/
public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
/**
* Default acquisition timeout (milliseconds) before error. If -1 will never wait to
* acquire before opening a new
* connection in an unbounded fashion. Fallback 45 seconds
*/
public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";


/**
* Default SSL handshake timeout (milliseconds), fallback to 10 seconds
*/
public static final String SSL_HANDSHAKE_TIMEOUT = "reactor.netty.tcp.sslHandshakeTimeout";
/**
* Default value whether the SSL debugging on the client side will be enabled/disabled,
* fallback to SSL debugging disabled
*/
public static final String SSL_CLIENT_DEBUG = "reactor.netty.tcp.ssl.client.debug";
/**
* Default value whether the SSL debugging on the server side will be enabled/disabled,
* fallback to SSL debugging disabled
*/
public static final String SSL_SERVER_DEBUG = "reactor.netty.tcp.ssl.server.debug";


/**
* Specifies whether the Http Server access log will be enabled.
* By default it is disabled.
*/
public static final String ACCESS_LOG_ENABLED = "reactor.netty.http.server.accessLogEnabled";


/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted} or it is already released,
* this method does nothing.
*/
public static void safeRelease(Object msg) {
if (msg instanceof ReferenceCounted) {
ReferenceCounted referenceCounted = (ReferenceCounted) msg;
if (referenceCounted.refCnt() > 0) {
referenceCounted.release();
}
}
}

/**
* Append channel ID to a log message for correlated traces
* @param channel current channel associated with the msg
* @param msg the log msg
* @return a formatted msg
*/
public static String format(Channel channel, String msg) {
if (LOG_CHANNEL_INFO) {
String channelStr = channel.toString();
return new StringBuilder(channelStr.length() + 1 + msg.length())
.append(channel)
.append(' ')
.append(msg)
.toString();
}
else {
return msg;
}
}

static void addChunkedWriter(Connection c){
if (c.channel()
.pipeline()
Expand Down Expand Up @@ -672,101 +776,8 @@ public Mono<Void> then() {
};


/**
* Specifies whether the channel ID will be prepended to the log message when possible.
* By default it will be prepended.
*/
static final boolean LOG_CHANNEL_INFO =
Boolean.parseBoolean(System.getProperty("reactor.netty.logChannelInfo", "true"));

public static String format(Channel channel, String msg) {
if (LOG_CHANNEL_INFO) {
String channelStr = channel.toString();
return new StringBuilder(channelStr.length() + 1 + msg.length())
.append(channel)
.append(' ')
.append(msg)
.toString();
}
else {
return msg;
}
}


/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted} or it is already released,
* this method does nothing.
*/
public static void safeRelease(Object msg) {
if (msg instanceof ReferenceCounted) {
ReferenceCounted referenceCounted = (ReferenceCounted) msg;
if (referenceCounted.refCnt() > 0) {
referenceCounted.release();
}
}
}


// System properties names

/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";


/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
public static final String NATIVE = "reactor.netty.native";


/**
* Default max connections, if -1 will never wait to acquire before opening a new
* connection in an unbounded fashion. Fallback to
* available number of processors (but with a minimum value of 16)
*/
public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
/**
* Default acquisition timeout (milliseconds) before error. If -1 will never wait to
* acquire before opening a new
* connection in an unbounded fashion. Fallback 45 seconds
*/
public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";


/**
* Default SSL handshake timeout (milliseconds), fallback to 10 seconds
*/
public static final String SSL_HANDSHAKE_TIMEOUT = "reactor.netty.tcp.sslHandshakeTimeout";
/**
* Default value whether the SSL debugging on the client side will be enabled/disabled,
* fallback to SSL debugging disabled
*/
public static final String SSL_CLIENT_DEBUG = "reactor.netty.tcp.ssl.client.debug";
/**
* Default value whether the SSL debugging on the server side will be enabled/disabled,
* fallback to SSL debugging disabled
*/
public static final String SSL_SERVER_DEBUG = "reactor.netty.tcp.ssl.server.debug";


/**
* Specifies whether the Http Server access log will be enabled.
* By default it is disabled.
*/
public static final String ACCESS_LOG_ENABLED = "reactor.netty.http.server.accessLogEnabled";
}
9 changes: 6 additions & 3 deletions src/main/java/reactor/netty/channel/MonoSend.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ abstract class MonoSend<I, O> extends Mono<Void> {
final ChannelHandlerContext ctx;
final Function<? super I, ? extends O> transformer;
final Consumer<? super I> sourceCleanup;
final ToIntFunction<O> sizeOf;
final ToIntFunction<? super O> sizeOf; //FIXME use MessageSizeEstimator ?

MonoSend(Channel channel,
Function<? super I, ? extends O> transformer,
Consumer<? super I> sourceCleanup,
ToIntFunction<O> sizeOf) {
ToIntFunction<? super O> sizeOf) {
this.transformer = Objects.requireNonNull(transformer, "source transformer cannot be null");
this.sourceCleanup = Objects.requireNonNull(sourceCleanup, "source cleanup handler cannot be null");
this.sizeOf = Objects.requireNonNull(sizeOf, "message size mapper cannot be null");
Expand Down Expand Up @@ -79,7 +79,10 @@ static <O> ToIntFunction<O> defaultSizeOf() {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return (int) Math.min(Integer.MAX_VALUE, ((FileRegion) msg).count());
// aligns with DefaultMessageSizeEstimator.DEFAULT
return 0;
// alternatively could have used
// return (int) Math.min(Integer.MAX_VALUE, ((FileRegion) msg).count());
}
return -1;
};
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/reactor/netty/channel/MonoSendMany.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ public void onNext(I t) {

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;

if (Operators.setOnce(SUBSCRIPTION, this, s)) {
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked") QueueSubscription<I> f =
(QueueSubscription<I>) s;
Expand Down Expand Up @@ -237,9 +235,7 @@ public void request(long n) {

@Override
public void operationComplete(ChannelFuture future) {
Subscription s;
if ((s = SUBSCRIPTION.getAndSet(this, Operators.cancelledSubscription())) != Operators.cancelledSubscription()) {
s.cancel();
if (Operators.terminate(SUBSCRIPTION, this)) {
if (WIP.getAndIncrement(this) == 0) {
cleanup();
}
Expand Down

0 comments on commit 9f65dd0

Please sign in to comment.