Skip to content

Commit

Permalink
Get rid of AsyncMode and rely solely on “appendTimeout”
Browse files Browse the repository at this point in the history
Block/drop behaviour is entirely controlled by the value of the “appendTimeout” property:
- `-1` to disable timeout and wait until enough space becomes available
- `0` for no timeout at all and drop the event immediately when the buffer is full
- `> 0` to retry during the specified amount of time
  • Loading branch information
brenuart committed Jul 10, 2021
1 parent 64901f5 commit 184a265
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.util.Duration;

import net.logstash.logback.appender.listener.AppenderListener;
import net.logstash.logback.status.LevelFilteringStatusListener;

import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.AsyncAppender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import net.logstash.logback.status.LevelFilteringStatusListener;

import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.util.Duration;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
Expand All @@ -50,7 +49,6 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
Expand Down Expand Up @@ -258,41 +256,29 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
*/
protected final List<Listener> listeners = new ArrayList<>();

public enum AsyncMode {
/**
* Appender thread is blocked until space is available in the ring buffer
* or the retry timeout expires.
*/
BLOCK,

/**
* Event is dropped when the ring buffer is full
*/
DROP
}
private AsyncMode asyncMode = AsyncMode.DROP;

/**
* Delay (in millis) between consecutive attempts to append an event in the ring buffer when full.
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
* Maximum time to wait when appending events to the ring buffer when full before the event
* is dropped. Use the following values:
* <ul>
* <li>{@code -1} to disable timeout and wait until space becomes available.
* <li>{@code 0} for no timeout and drop the event immediately when the buffer is full.
* <li>{@code > 0} to retry during the specified amount of time.
* </ul>
*/
private long retryMillis = 100;
private Duration appendTimeout = Duration.buildByMilliseconds(0);

/**
* Maximum time to wait for space in the ring buffer before dropping the event.
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
*
* <p>Use {@code -1} for no timeout, i.e. block until space is available.
* Delay (in millis) between consecutive attempts to append an event in the ring buffer when
* full.
*/
private Duration retryTimeout = Duration.buildByMilliseconds(1000);
private Duration appendRetryFrequency = Duration.buildByMilliseconds(100);

/**
* How long to wait for in-flight events during shutdown.
*/
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);



/**
* Event wrapper object used for each element of the {@link RingBuffer}.
*/
Expand Down Expand Up @@ -406,7 +392,6 @@ public void onShutdown() {

}

@SuppressWarnings("unchecked")
@Override
public void start() {
if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
Expand Down Expand Up @@ -561,28 +546,13 @@ protected void prepareForDeferredProcessing(Event event) {
}

/**
* Enqueue the given {@code event} in the ring buffer according to the configured {@link #asyncMode}.
* Enqueue the given {@code event} in the ring buffer.
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
protected boolean enqueueEvent(Event event) {
if (this.asyncMode == AsyncMode.BLOCK) {
return enqueueEventBlock(event);
} else {
return enqueueEventDrop(event);
}
}

/**
* Enqueue the given {@code event} in the ring buffer, blocking until enough space
* is available or the {@link #retryTimeout} expires (if configured).
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
private boolean enqueueEventBlock(Event event) {
long timeout = this.retryTimeout.getMilliseconds() <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.retryTimeout.getMilliseconds();
long timeout = this.appendTimeout.getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appendTimeout.getMilliseconds();

while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
// Check for timeout
Expand All @@ -593,25 +563,14 @@ private boolean enqueueEventBlock(Event event) {

// Wait before retry
//
long waitDuration = Math.min(this.retryMillis, System.currentTimeMillis() - timeout);
long waitDuration = Math.min(this.appendRetryFrequency.getMilliseconds(), System.currentTimeMillis() - timeout);
if (waitDuration > 0) {
LockSupport.parkNanos(waitDuration * 1_000_000L);
}
}

return true;
}

/**
* Attempt to enqueue the given {@code event} in the ring buffer without blocking. Drop the event
* if the ring buffer is full.
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
private boolean enqueueEventDrop(Event event) {
return this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event);
}

protected String calculateThreadName() {
List<Object> threadNameFormatParams = getThreadNameFormatParams();
Expand Down Expand Up @@ -719,26 +678,19 @@ public void setWaitStrategy(WaitStrategy waitStrategy) {
public void setWaitStrategyType(String waitStrategyType) {
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
}

public AsyncMode getAsyncMode() {
return asyncMode;
}
public void setAsyncMode(AsyncMode asyncMode) {
this.asyncMode = asyncMode;
}

public long getRetryMillis() {
return retryMillis;
public Duration getAppendRetryFrequency() {
return appendRetryFrequency;
}
public void setRetryMillis(long retryMillis) {
this.retryMillis = retryMillis;
public void setAppendRetryFrequency(Duration appendRetryFrequency) {
this.appendRetryFrequency = Objects.requireNonNull(appendRetryFrequency);
}

public Duration getRetryTimeout() {
return retryTimeout;
public Duration getAppendTimeout() {
return appendTimeout;
}
public void setRetryTimeout(Duration retryTimeout) {
this.retryTimeout = Objects.requireNonNull(retryTimeout);
public void setAppendTimeout(Duration appendTimeout) {
this.appendTimeout = Objects.requireNonNull(appendTimeout);
}

public void setShutdownGracePeriod(Duration shutdownGracePeriod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import net.logstash.logback.appender.AsyncDisruptorAppender.AsyncMode;
import net.logstash.logback.appender.AsyncDisruptorAppender.LogEvent;
import net.logstash.logback.appender.listener.AppenderListener;

Expand Down Expand Up @@ -240,7 +239,7 @@ public void appendBlockingWhenFull() {
try {
TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter);
appender.setRingBufferSize(1);
appender.setAsyncMode(AsyncMode.BLOCK);
appender.setAppendTimeout(toLogback(Duration.ofMillis(-1))); // block until space is available
appender.setEventHandler(eventHandler);
appender.start();

Expand Down Expand Up @@ -284,8 +283,7 @@ public void appendBlockingWithTimeout() throws Exception {
try {
TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter);
appender.setRingBufferSize(1);
appender.setAsyncMode(AsyncMode.BLOCK);
appender.setRetryTimeout(toLogback(timeout));
appender.setAppendTimeout(toLogback(timeout));
appender.setEventHandler(eventHandler);
appender.start();

Expand Down Expand Up @@ -336,7 +334,7 @@ public void appendBlockingReleasedOnStop() {
try {
TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter);
appender.setRingBufferSize(1);
appender.setAsyncMode(AsyncMode.BLOCK);
appender.setAppendTimeout(toLogback(Duration.ofMillis(-1))); // block until space is available
appender.setShutdownGracePeriod(toLogback(Duration.ofMillis(0))); // don't want to wait for inflight events...
appender.setEventHandler(eventHandler);
appender.start();
Expand Down

0 comments on commit 184a265

Please sign in to comment.