From 184a26541cf6e052578a2bee7b204cdff18a779b Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Sat, 10 Jul 2021 18:38:08 +0200 Subject: [PATCH] =?UTF-8?q?Get=20rid=20of=20AsyncMode=20and=20rely=20solel?= =?UTF-8?q?y=20on=20=E2=80=9CappendTimeout=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Block/drop behaviour is entirely controlled by the value of the “appendTimeout” property: - `-1` to disable timeout and wait until enough space becomes available - `0` for no timeout at all and drop the event immediately when the buffer is full - `> 0` to retry during the specified amount of time --- .../appender/AsyncDisruptorAppender.java | 104 +++++------------- .../appender/AsyncDisruptorAppenderTest.java | 8 +- 2 files changed, 31 insertions(+), 81 deletions(-) diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index c01b60ed..fc293978 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -29,18 +29,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; -import ch.qos.logback.core.status.OnConsoleStatusListener; -import ch.qos.logback.core.status.Status; -import ch.qos.logback.core.util.Duration; - import net.logstash.logback.appender.listener.AppenderListener; +import net.logstash.logback.status.LevelFilteringStatusListener; + import ch.qos.logback.access.spi.IAccessEvent; import ch.qos.logback.classic.AsyncAppender; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.UnsynchronizedAppenderBase; import ch.qos.logback.core.spi.DeferredProcessingAware; -import net.logstash.logback.status.LevelFilteringStatusListener; - +import ch.qos.logback.core.status.OnConsoleStatusListener; +import ch.qos.logback.core.status.Status; +import ch.qos.logback.core.util.Duration; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; @@ -50,7 +49,6 @@ import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SleepingWaitStrategy; -import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @@ -258,33 +256,22 @@ public abstract class AsyncDisruptorAppender listeners = new ArrayList<>(); - public enum AsyncMode { - /** - * Appender thread is blocked until space is available in the ring buffer - * or the retry timeout expires. - */ - BLOCK, - - /** - * Event is dropped when the ring buffer is full - */ - DROP - } - private AsyncMode asyncMode = AsyncMode.DROP; - /** - * Delay (in millis) between consecutive attempts to append an event in the ring buffer when full. - * Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}. + * Maximum time to wait when appending events to the ring buffer when full before the event + * is dropped. Use the following values: + * */ - private long retryMillis = 100; - + private Duration appendTimeout = Duration.buildByMilliseconds(0); + /** - * Maximum time to wait for space in the ring buffer before dropping the event. - * Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}. - * - *

Use {@code -1} for no timeout, i.e. block until space is available. + * Delay (in millis) between consecutive attempts to append an event in the ring buffer when + * full. */ - private Duration retryTimeout = Duration.buildByMilliseconds(1000); + private Duration appendRetryFrequency = Duration.buildByMilliseconds(100); /** * How long to wait for in-flight events during shutdown. @@ -292,7 +279,6 @@ public enum AsyncMode { private Duration shutdownGracePeriod = Duration.buildByMinutes(1); - /** * Event wrapper object used for each element of the {@link RingBuffer}. */ @@ -406,7 +392,6 @@ public void onShutdown() { } - @SuppressWarnings("unchecked") @Override public void start() { if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) { @@ -561,28 +546,13 @@ protected void prepareForDeferredProcessing(Event event) { } /** - * Enqueue the given {@code event} in the ring buffer according to the configured {@link #asyncMode}. + * Enqueue the given {@code event} in the ring buffer. * * @param event the {@link Event} to enqueue * @return {@code true} when the even is successfully enqueued in the ring buffer */ protected boolean enqueueEvent(Event event) { - if (this.asyncMode == AsyncMode.BLOCK) { - return enqueueEventBlock(event); - } else { - return enqueueEventDrop(event); - } - } - - /** - * Enqueue the given {@code event} in the ring buffer, blocking until enough space - * is available or the {@link #retryTimeout} expires (if configured). - * - * @param event the {@link Event} to enqueue - * @return {@code true} when the even is successfully enqueued in the ring buffer - */ - private boolean enqueueEventBlock(Event event) { - long timeout = this.retryTimeout.getMilliseconds() <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.retryTimeout.getMilliseconds(); + long timeout = this.appendTimeout.getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appendTimeout.getMilliseconds(); while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) { // Check for timeout @@ -593,7 +563,7 @@ private boolean enqueueEventBlock(Event event) { // Wait before retry // - long waitDuration = Math.min(this.retryMillis, System.currentTimeMillis() - timeout); + long waitDuration = Math.min(this.appendRetryFrequency.getMilliseconds(), System.currentTimeMillis() - timeout); if (waitDuration > 0) { LockSupport.parkNanos(waitDuration * 1_000_000L); } @@ -601,17 +571,6 @@ private boolean enqueueEventBlock(Event event) { return true; } - - /** - * Attempt to enqueue the given {@code event} in the ring buffer without blocking. Drop the event - * if the ring buffer is full. - * - * @param event the {@link Event} to enqueue - * @return {@code true} when the even is successfully enqueued in the ring buffer - */ - private boolean enqueueEventDrop(Event event) { - return this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event); - } protected String calculateThreadName() { List threadNameFormatParams = getThreadNameFormatParams(); @@ -719,26 +678,19 @@ public void setWaitStrategy(WaitStrategy waitStrategy) { public void setWaitStrategyType(String waitStrategyType) { setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType)); } - - public AsyncMode getAsyncMode() { - return asyncMode; - } - public void setAsyncMode(AsyncMode asyncMode) { - this.asyncMode = asyncMode; - } - public long getRetryMillis() { - return retryMillis; + public Duration getAppendRetryFrequency() { + return appendRetryFrequency; } - public void setRetryMillis(long retryMillis) { - this.retryMillis = retryMillis; + public void setAppendRetryFrequency(Duration appendRetryFrequency) { + this.appendRetryFrequency = Objects.requireNonNull(appendRetryFrequency); } - public Duration getRetryTimeout() { - return retryTimeout; + public Duration getAppendTimeout() { + return appendTimeout; } - public void setRetryTimeout(Duration retryTimeout) { - this.retryTimeout = Objects.requireNonNull(retryTimeout); + public void setAppendTimeout(Duration appendTimeout) { + this.appendTimeout = Objects.requireNonNull(appendTimeout); } public void setShutdownGracePeriod(Duration shutdownGracePeriod) { diff --git a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java index 7d024760..2dfb4dc9 100644 --- a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import net.logstash.logback.appender.AsyncDisruptorAppender.AsyncMode; import net.logstash.logback.appender.AsyncDisruptorAppender.LogEvent; import net.logstash.logback.appender.listener.AppenderListener; @@ -240,7 +239,7 @@ public void appendBlockingWhenFull() { try { TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); appender.setRingBufferSize(1); - appender.setAsyncMode(AsyncMode.BLOCK); + appender.setAppendTimeout(toLogback(Duration.ofMillis(-1))); // block until space is available appender.setEventHandler(eventHandler); appender.start(); @@ -284,8 +283,7 @@ public void appendBlockingWithTimeout() throws Exception { try { TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); appender.setRingBufferSize(1); - appender.setAsyncMode(AsyncMode.BLOCK); - appender.setRetryTimeout(toLogback(timeout)); + appender.setAppendTimeout(toLogback(timeout)); appender.setEventHandler(eventHandler); appender.start(); @@ -336,7 +334,7 @@ public void appendBlockingReleasedOnStop() { try { TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); appender.setRingBufferSize(1); - appender.setAsyncMode(AsyncMode.BLOCK); + appender.setAppendTimeout(toLogback(Duration.ofMillis(-1))); // block until space is available appender.setShutdownGracePeriod(toLogback(Duration.ofMillis(0))); // don't want to wait for inflight events... appender.setEventHandler(eventHandler); appender.start();