Skip to content

Commit 4169bb0

Browse files
committed
Restore support for Disruptor 3.x
1 parent ee58635 commit 4169bb0

File tree

8 files changed

+140
-59
lines changed

8 files changed

+140
-59
lines changed

log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerConfigDisruptor.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.lmax.disruptor.ExceptionHandler;
2323
import com.lmax.disruptor.RingBuffer;
2424
import com.lmax.disruptor.Sequence;
25+
import com.lmax.disruptor.SequenceReportingEventHandler;
2526
import com.lmax.disruptor.TimeoutException;
2627
import com.lmax.disruptor.WaitStrategy;
2728
import com.lmax.disruptor.dsl.Disruptor;
@@ -97,7 +98,9 @@ private static class Log4jEventWrapperHandler implements EventHandler<Log4jEvent
9798
private Sequence sequenceCallback;
9899
private int counter;
99100

100-
@Override
101+
/*
102+
* Overrides a method from Disruptor 4.x. Do not remove.
103+
*/
101104
public void setSequenceCallback(final Sequence sequenceCallback) {
102105
this.sequenceCallback = sequenceCallback;
103106
}
@@ -124,6 +127,12 @@ private void notifyIntermediateProgress(final long sequence) {
124127
}
125128
}
126129

130+
/**
131+
* A version of Log4jEventWrapperHandler for LMAX Disruptor 3.x.
132+
*/
133+
private static final class Log4jEventWrapperHandler3 extends Log4jEventWrapperHandler
134+
implements SequenceReportingEventHandler<Log4jEventWrapper> {}
135+
127136
/**
128137
* Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
129138
* RingBuffer.
@@ -155,6 +164,10 @@ private void notifyIntermediateProgress(final long sequence) {
155164
ringBufferElement.loggerConfig = loggerConfig;
156165
};
157166

167+
private Log4jEventWrapperHandler createEventHandler() {
168+
return DisruptorUtil.DISRUPTOR_VERSION == 3 ? new Log4jEventWrapperHandler3() : new Log4jEventWrapperHandler();
169+
}
170+
158171
private int ringBufferSize;
159172
private AsyncQueueFullPolicy asyncQueueFullPolicy;
160173
private Boolean mutable = Boolean.FALSE;
@@ -220,7 +233,7 @@ public Thread newThread(final Runnable r) {
220233
final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
221234
disruptor.setDefaultExceptionHandler(errorHandler);
222235

223-
final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
236+
final Log4jEventWrapperHandler[] handlers = {createEventHandler()};
224237
disruptor.handleEventsWith(handlers);
225238

226239
LOGGER.debug(

log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public Thread newThread(final Runnable r) {
122122
final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
123123
disruptor.setDefaultExceptionHandler(errorHandler);
124124

125-
final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
125+
final RingBufferLogEventHandler4[] handlers = {RingBufferLogEventHandler4.create()};
126126
disruptor.handleEventsWith(handlers);
127127

128128
LOGGER.debug(

log4j-core/src/main/java/org/apache/logging/log4j/core/async/DefaultAsyncWaitStrategyFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,19 @@ static WaitStrategy createDefaultWaitStrategy(final String propertyName) {
7474
LOGGER.trace(
7575
"DefaultAsyncWaitStrategyFactory creating TimeoutBlockingWaitStrategy(timeout={}, unit=MILLIS)",
7676
timeoutMillis);
77+
// Check for the v 4.x version of the strategy
78+
try {
79+
final Class<? extends WaitStrategy> strategyClass =
80+
(Class<? extends WaitStrategy>) Class.forName("com.lmax.disruptor.TimeoutBlockingWaitStrategy");
81+
return strategyClass
82+
.getConstructor(long.class, TimeUnit.class)
83+
.newInstance(timeoutMillis, TimeUnit.MILLISECONDS);
84+
} catch (final ReflectiveOperationException e) {
85+
LOGGER.debug(
86+
"DefaultAsyncWaitStrategyFactory failed to load 'com.lmax.disruptor.TimeoutBlockingWaitStrategy', using '{}' instead.",
87+
TimeoutBlockingWaitStrategy.class.getName());
88+
}
89+
// Use our version
7790
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
7891
}
7992

log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ final class DisruptorUtil {
5050
static final boolean ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL = PropertiesUtil.getProperties()
5151
.getBooleanProperty("AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull", true);
5252

53+
static final int DISRUPTOR_VERSION =
54+
LoaderUtil.isClassAvailable("com.lmax.disruptor.SequenceReportingEventHandler") ? 3 : 4;
55+
5356
private DisruptorUtil() {}
5457

5558
static WaitStrategy createWaitStrategy(

log4j-core/src/main/java/org/apache/logging/log4j/core/async/RingBufferLogEventHandler.java

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,68 +16,21 @@
1616
*/
1717
package org.apache.logging.log4j.core.async;
1818

19-
import com.lmax.disruptor.EventHandler;
20-
import com.lmax.disruptor.Sequence;
19+
import com.lmax.disruptor.LifecycleAware;
20+
import com.lmax.disruptor.SequenceReportingEventHandler;
2121

2222
/**
2323
* This event handler gets passed messages from the RingBuffer as they become
2424
* available. Processing of these messages is done in a separate thread,
2525
* controlled by the {@code Executor} passed to the {@code Disruptor}
2626
* constructor.
2727
*/
28-
public class RingBufferLogEventHandler implements EventHandler<RingBufferLogEvent> {
29-
30-
private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
31-
private Sequence sequenceCallback;
32-
private int counter;
33-
private long threadId = -1;
34-
35-
@Override
36-
public void setSequenceCallback(final Sequence sequenceCallback) {
37-
this.sequenceCallback = sequenceCallback;
38-
}
39-
40-
@Override
41-
public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch)
42-
throws Exception {
43-
try {
44-
// RingBufferLogEvents are populated by an EventTranslator. If an exception is thrown during event
45-
// translation, the event may not be fully populated, but Disruptor requires that the associated sequence
46-
// still be published since a slot has already been claimed in the ring buffer. Ignore any such unpopulated
47-
// events. The exception that occurred during translation will have already been propagated.
48-
if (event.isPopulated()) {
49-
event.execute(endOfBatch);
50-
}
51-
} finally {
52-
event.clear();
53-
// notify the BatchEventProcessor that the sequence has progressed.
54-
// Without this callback the sequence would not be progressed
55-
// until the batch has completely finished.
56-
notifyCallback(sequence);
57-
}
58-
}
59-
60-
private void notifyCallback(final long sequence) {
61-
if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
62-
sequenceCallback.set(sequence);
63-
counter = 0;
64-
}
65-
}
28+
public class RingBufferLogEventHandler extends RingBufferLogEventHandler4
29+
implements SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware {
6630

6731
/**
68-
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
69-
* yet.
70-
* @return the thread ID of the background consumer thread, or {@code -1}
32+
* @deprecated Use the {@link RingBufferLogEventHandler4#create()} factory method instead.
7133
*/
72-
public long getThreadId() {
73-
return threadId;
74-
}
75-
76-
@Override
77-
public void onStart() {
78-
threadId = Thread.currentThread().getId();
79-
}
80-
81-
@Override
82-
public void onShutdown() {}
34+
@Deprecated
35+
public RingBufferLogEventHandler() {}
8336
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.logging.log4j.core.async;
18+
19+
import com.lmax.disruptor.EventHandler;
20+
import com.lmax.disruptor.Sequence;
21+
22+
/**
23+
* This event handler gets passed messages from the RingBuffer as they become
24+
* available. Processing of these messages is done in a separate thread,
25+
* controlled by the {@code Executor} passed to the {@code Disruptor}
26+
* constructor.
27+
*/
28+
class RingBufferLogEventHandler4 implements EventHandler<RingBufferLogEvent> {
29+
30+
private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
31+
private Sequence sequenceCallback;
32+
private int counter;
33+
private long threadId = -1;
34+
35+
/**
36+
* Returns the appropriate {@link EventHandler} for the version of LMAX Disruptor used.
37+
*/
38+
public static RingBufferLogEventHandler4 create() {
39+
return DisruptorUtil.DISRUPTOR_VERSION == 3
40+
? new RingBufferLogEventHandler()
41+
: new RingBufferLogEventHandler4();
42+
}
43+
44+
/*
45+
* Overrides a method from Disruptor 4.x. Do not remove.
46+
*/
47+
public void setSequenceCallback(final Sequence sequenceCallback) {
48+
this.sequenceCallback = sequenceCallback;
49+
}
50+
51+
@Override
52+
public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch)
53+
throws Exception {
54+
try {
55+
// RingBufferLogEvents are populated by an EventTranslator. If an exception is thrown during event
56+
// translation, the event may not be fully populated, but Disruptor requires that the associated sequence
57+
// still be published since a slot has already been claimed in the ring buffer. Ignore any such unpopulated
58+
// events. The exception that occurred during translation will have already been propagated.
59+
if (event.isPopulated()) {
60+
event.execute(endOfBatch);
61+
}
62+
} finally {
63+
event.clear();
64+
// notify the BatchEventProcessor that the sequence has progressed.
65+
// Without this callback the sequence would not be progressed
66+
// until the batch has completely finished.
67+
notifyCallback(sequence);
68+
}
69+
}
70+
71+
private void notifyCallback(final long sequence) {
72+
if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
73+
sequenceCallback.set(sequence);
74+
counter = 0;
75+
}
76+
}
77+
78+
/**
79+
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
80+
* yet.
81+
*
82+
* @return the thread ID of the background consumer thread, or {@code -1}
83+
*/
84+
public long getThreadId() {
85+
return threadId;
86+
}
87+
88+
/*
89+
* Overrides a method from Disruptor 4.x. Do not remove.
90+
*/
91+
public void onStart() {
92+
threadId = Thread.currentThread().getId();
93+
}
94+
95+
/*
96+
* Overrides a method from Disruptor 4.x. Do not remove.
97+
*/
98+
public void onShutdown() {}
99+
}

log4j-core/src/main/java/org/apache/logging/log4j/core/async/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* Provides Asynchronous Logger classes and interfaces for low-latency logging.
1919
*/
2020
@Export
21-
@Version("2.21.0")
21+
@Version("2.23.0")
2222
package org.apache.logging.log4j.core.async;
2323

2424
import org.osgi.annotation.bundle.Export;

log4j-parent/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
<commons-logging.version>1.3.0</commons-logging.version>
8080
<!-- `com.conversantmedia:disruptor` version 1.2.16 requires Java 9: -->
8181
<conversant.disruptor.version>1.2.15</conversant.disruptor.version>
82-
<disruptor.version>4.0.0</disruptor.version>
82+
<disruptor.version>3.4.4</disruptor.version>
8383
<elasticsearch-java.version>8.11.2</elasticsearch-java.version>
8484
<embedded-ldap.version>0.9.0</embedded-ldap.version>
8585
<felix.version>7.0.5</felix.version>

0 commit comments

Comments
 (0)