Skip to content

Commit

Permalink
Update JCTools 3.3.0 -> 3.3.1-ea, use unpadded queues (#2257)
Browse files Browse the repository at this point in the history
Motivation:
JCTools 3.3.1-ea provides unpadded queue variants which remove
padding intended to avoid false sharing scenarios. The padding
adds significant overhead when queue sizes are small which is
common for ServiceTalk's queue usage (operators that only queue
when back pressure occurs, pipelining requests which isn't
frequently used, ..)

Modifications:
- Unpadded queue variants currently only exist for the UNSAFE
  variants. Switch PlatformDepedent UNSAFE queues to use them.
  • Loading branch information
Scottmitch authored Jul 2, 2022
1 parent b3ca008 commit 28ca713
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 10 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jaxRsVersion=2.1.6
jerseyVersion=2.35

reactiveStreamsVersion=1.0.3
jcToolsVersion=3.3.0
jcToolsVersion=3.3.1-ea
jacksonVersion=2.13.2.2
# backward compatible with jackson 2.9+, we do not depend on any new features from later versions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscGrowableAtomicArrayQueue;
import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.ea.unpadded.MpscChunkedUnpaddedArrayQueue;
import org.jctools.queues.ea.unpadded.MpscLinkedUnpaddedQueue;
import org.jctools.queues.ea.unpadded.MpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.ea.unpadded.SpscChunkedUnpaddedArrayQueue;
import org.jctools.queues.ea.unpadded.SpscUnboundedUnpaddedArrayQueue;
import org.jctools.util.Pow2;
import org.jctools.util.UnsafeAccess;
import org.slf4j.Logger;
Expand All @@ -51,6 +56,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static java.lang.Boolean.getBoolean;
import static java.lang.Math.max;
import static java.lang.Math.min;

Expand Down Expand Up @@ -280,6 +286,26 @@ public static <T> Queue<T> newUnboundedSpscQueue(final int initialCapacity) {

private static final class Queues {
private static final boolean USE_UNSAFE_QUEUES;
private static final boolean USE_UNPADDED_QUEUES;

static {
// Internal property, subject to removal after validation completed.
final String useUnpaddedQueuesName = "io.servicetalk.internal.queues.useUnpadded";
boolean useUnpaddedQueues = getBoolean(useUnpaddedQueuesName);
if (useUnpaddedQueues) {
Queue<Integer> queue = null;
try {
// org.jctools.queues.ea.unpadded classes may be moved to another package name in the future.
// Don't use them if they are not available due to failure to link/initialize the classes.
queue = new MpscLinkedUnpaddedQueue<>();
} catch (Throwable ignored) {
useUnpaddedQueues = false;
}
LOGGER.debug("jctools unpadded: {}available.", queue == null ? "un" : "");
}
LOGGER.debug("{}: {}", useUnpaddedQueuesName, useUnpaddedQueues);
USE_UNPADDED_QUEUES = useUnpaddedQueues;
}

private Queues() {
}
Expand Down Expand Up @@ -311,19 +337,27 @@ static <T> Queue<T> newMpscQueue(final int initialCapacity, final int maxCapacit
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_MPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new MpscChunkedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscChunkedUnpaddedArrayQueue<>(initialCap, capacity) :
new MpscChunkedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}

static <T> Queue<T> newUnboundedMpscQueue(final int initialCapacity) {
return USE_UNSAFE_QUEUES ? new MpscUnboundedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity))
: new MpscUnboundedAtomicArrayQueue<>(
max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity));
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscUnboundedUnpaddedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity)) :
new MpscUnboundedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity))
: new MpscUnboundedAtomicArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity));
}

static <T> Queue<T> newUnboundedLinkedMpscQueue() {
return USE_UNSAFE_QUEUES ? new MpscLinkedQueue<>()
: new MpscLinkedAtomicQueue<>();
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscLinkedUnpaddedQueue<>() :
new MpscLinkedQueue<>()
: new MpscLinkedAtomicQueue<>();
}

static <T> Queue<T> newSpscQueue(final int initialCapacity, final int maxCapacity) {
Expand All @@ -332,12 +366,18 @@ static <T> Queue<T> newSpscQueue(final int initialCapacity, final int maxCapacit
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_SPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_SPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new SpscChunkedArrayQueue<>(initialCap, capacity)
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new SpscChunkedUnpaddedArrayQueue<>(initialCap, capacity) :
new SpscChunkedArrayQueue<>(initialCap, capacity)
: new SpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}

static <T> Queue<T> newUnboundedSpscQueue(final int initialCapacity) {
return USE_UNSAFE_QUEUES ? new SpscUnboundedArrayQueue<>(initialCapacity)
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new SpscUnboundedUnpaddedArrayQueue<>(initialCapacity) :
new SpscUnboundedArrayQueue<>(initialCapacity)
: new SpscUnboundedAtomicArrayQueue<>(initialCapacity);
}
}
Expand Down

0 comments on commit 28ca713

Please sign in to comment.