diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/DispatchAgent.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/DispatchAgent.java index 057dbf97a2..fc15bb933e 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/DispatchAgent.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/DispatchAgent.java @@ -310,7 +310,7 @@ public DispatchAgent( this.timerWheel = new DeadlineTimerWheel(MILLISECONDS, currentTimeMillis(), 512, 1024); this.tasksByTimerId = new Long2ObjectHashMap<>(); this.futuresById = new Long2ObjectHashMap<>(); - this.signaler = new ElektronSignaler(executor); + this.signaler = new ElektronSignaler(executor, Math.max(config.bufferSlotCapacity(), 512)); this.poller = new Poller(); @@ -1672,9 +1672,10 @@ public Affinity resolveAffinity( return affinity; } - private static SignalFW.Builder newSignalRW() + private static SignalFW.Builder newSignalRW( + int capacity) { - MutableDirectBuffer buffer = new UnsafeBuffer(new byte[512]); + MutableDirectBuffer buffer = new UnsafeBuffer(new byte[capacity]); return new SignalFW.Builder().wrap(buffer, 0, buffer.capacity()); } @@ -1691,16 +1692,18 @@ private Int2ObjectHashMap[] initDispatcher() private final class ElektronSignaler implements Signaler { - private final ThreadLocal signalRW = withInitial(DispatchAgent::newSignalRW); + private final ThreadLocal signalRW; private final ExecutorService executorService; private long nextFutureId; private ElektronSignaler( - ExecutorService executorService) + ExecutorService executorService, + int slotCapacity) { this.executorService = executorService; + signalRW = withInitial(() -> newSignalRW(slotCapacity)); } public void executeTaskAt(