forked from logfellow/logstash-logback-encoder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AsyncDisruptorAppender.java
626 lines (547 loc) · 22.2 KB
/
AsyncDisruptorAppender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.logstash.logback.appender;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Formatter;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.logstash.logback.appender.listener.AppenderListener;
import net.logstash.logback.status.LevelFilteringStatusListener;
import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.AsyncAppender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
/**
* An asynchronous appender that uses an LMAX Disruptor {@link RingBuffer}
* as the interthread data exchange mechanism (as opposed to a {@link BlockingQueue}
* used by logback's {@link AsyncAppender}).
* <p>
*
* See the <a href="https://lmax-exchange.github.io/disruptor/">LMAX Disruptor documentation</a>
* for more information about the advantages of using a {@link RingBuffer} over a {@link BlockingQueue}.
* <p>
*
* This appender will never block the logging thread, since it uses
* {@link RingBuffer#tryPublishEvent(EventTranslatorOneArg, Object)}
* to publish events (rather than {@link RingBuffer#publishEvent(EventTranslatorOneArg, Object)}).
* <p>
*
* If the RingBuffer is full, and the event cannot be published,
* the event will be dropped. A warning message will be logged to
* logback's context every {@link #droppedWarnFrequency} consecutive dropped events.
* <p>
*
* A single handler thread will be used to handle the actual handling of the event.
* <p>
*
* Subclasses are required to set the {@link #eventHandler} to define
* the logic that executes in the handler thread.
* For example, {@link DelegatingAsyncDisruptorAppender} for will delegate
* appending of the event to another appender in the handler thread.
* <p>
*
* By default, child threads created by this appender will be daemon threads,
* and therefore allow the JVM to exit gracefully without
* needing to explicitly shut down the appender.
* Note that in this case, it is possible for appended log events to not
* be handled (if the child thread has not had a chance to process them yet).
*
* By setting {@link #setDaemon(boolean)} to false, you can change this behavior.
* When false, child threads created by this appender will not be daemon threads,
* and therefore will prevent the JVM from shutting down
* until the appender is explicitly shut down.
* Set this to false if you want to ensure that every log event
* prior to shutdown is handled.
*
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
*/
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> {
protected static final String APPENDER_NAME_FORMAT = "%1$s";
protected static final String THREAD_INDEX_FORMAT = "%2$d";
public static final String DEFAULT_THREAD_NAME_FORMAT = "logback-appender-" + APPENDER_NAME_FORMAT + "-" + THREAD_INDEX_FORMAT;
public static final int DEFAULT_RING_BUFFER_SIZE = 8192;
public static final ProducerType DEFAULT_PRODUCER_TYPE = ProducerType.MULTI;
public static final WaitStrategy DEFAULT_WAIT_STRATEGY = new BlockingWaitStrategy();
public static final int DEFAULT_DROPPED_WARN_FREQUENCY = 1000;
private static final RingBufferFullException RING_BUFFER_FULL_EXCEPTION = new RingBufferFullException();
static {
RING_BUFFER_FULL_EXCEPTION.setStackTrace(new StackTraceElement[] {new StackTraceElement(AsyncDisruptorAppender.class.getName(), "append(..)", null, -1)});
}
/**
* The size of the {@link RingBuffer}.
* Defaults to {@value #DEFAULT_RING_BUFFER_SIZE}.
* If the handler thread is not as fast as the producing threads,
* then the {@link RingBuffer} will eventually fill up,
* at which point events will be dropped.
* <p>
* Must be a positive power of 2.
*/
private int ringBufferSize = DEFAULT_RING_BUFFER_SIZE;
/**
* The {@link ProducerType} to use to configure the disruptor.
* By default this is {@link ProducerType#MULTI}.
* Only set to {@link ProducerType#SINGLE} if only one thread
* will ever be appending to this appender.
*/
private ProducerType producerType = DEFAULT_PRODUCER_TYPE;
/**
* The {@link WaitStrategy} to used by the RingBuffer
* when pulling events to be processed by {@link #eventHandler}.
* <p>
* By default, a {@link BlockingWaitStrategy} is used, which is the most
* CPU conservative, but results in a higher latency.
* If you need lower latency (at the cost of higher CPU usage),
* consider using a {@link SleepingWaitStrategy} or a {@link PhasedBackoffWaitStrategy}.
*/
private WaitStrategy waitStrategy = DEFAULT_WAIT_STRATEGY;
/**
* Pattern used by the {@link WorkerThreadFactory} to set the
* handler thread name.
* Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}.
* <p>
*
* If you change the {@link #threadFactory}, then this
* value may not be honored.
* <p>
*
* The string is a format pattern understood by {@link Formatter#format(String, Object...)}.
* {@link Formatter#format(String, Object...)} is used to
* construct the actual thread name prefix.
* The first argument (%1$s) is the string appender name.
* The second argument (%2$d) is the numerical thread index.
* Other arguments can be made available by subclasses.
*/
private String threadNameFormat = DEFAULT_THREAD_NAME_FORMAT;
/**
* When true, child threads created by this appender will be daemon threads,
* and therefore allow the JVM to exit gracefully without
* needing to explicitly shut down the appender.
* Note that in this case, it is possible for log events to not
* be handled.
* <p>
*
* When false, child threads created by this appender will not be daemon threads,
* and therefore will prevent the JVM from shutting down
* until the appender is explicitly shut down.
* Set this to false if you want to ensure that every log event
* prior to shutdown is handled.
* <p>
*
* If you change the {@link #threadFactory}, then this
* value may not be honored.
*/
private boolean useDaemonThread = true;
/**
* When true, if no status listener is registered, then a default {@link OnConsoleStatusListener}
* will be registered, so that error messages are seen on the console.
*/
private boolean addDefaultStatusListener = true;
/**
* For every droppedWarnFrequency consecutive dropped events, log a warning.
* Defaults to {@value #DEFAULT_DROPPED_WARN_FREQUENCY}.
*/
private int droppedWarnFrequency = DEFAULT_DROPPED_WARN_FREQUENCY;
/**
* The {@link ThreadFactory} used to create the handler thread.
*/
private ThreadFactory threadFactory = new WorkerThreadFactory();
/**
* The {@link ScheduledExecutorService} used to execute the handler task.
*/
private ScheduledThreadPoolExecutor executorService;
/**
* Size of the thread pool to create.
*/
private int threadPoolCoreSize = 1;
/**
* The {@link Disruptor} containing the {@link RingBuffer} onto
* which to publish events.
*/
private Disruptor<LogEvent<Event>> disruptor;
/**
* Sets the {@link LogEvent#event} to the logback Event.
* Used when publishing events to the {@link RingBuffer}.
*/
private EventTranslatorOneArg<LogEvent<Event>, Event> eventTranslator = new LogEventTranslator<Event>();
/**
* Used by the handler thread to process the event.
*/
private EventHandler<LogEvent<Event>> eventHandler;
/**
* Defines what happens when there is an exception during
* {@link RingBuffer} processing.
*/
private ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler();
/**
* Consecutive number of dropped events.
*/
private final AtomicLong consecutiveDroppedCount = new AtomicLong();
/**
* The {@link EventFactory} used to create {@link LogEvent}s for the RingBuffer.
*/
private LogEventFactory<Event> eventFactory = new LogEventFactory<Event>();
/**
* Incrementor number used as part of thread names for uniqueness.
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
/**
* These listeners will be notified when certain events occur on this appender.
*/
protected final List<Listener> listeners = new ArrayList<>();
/**
* Event wrapper object used for each element of the {@link RingBuffer}.
*/
protected static class LogEvent<Event> {
/**
* The logback event.
*/
public volatile Event event;
}
/**
* Factory for creating the initial {@link LogEvent}s to populate
* the {@link RingBuffer}.
*/
private static class LogEventFactory<Event> implements EventFactory<LogEvent<Event>> {
@Override
public LogEvent<Event> newInstance() {
return new LogEvent<Event>();
}
}
/**
* The default {@link ThreadFactory} used to create the handler thread.
*/
private class WorkerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(calculateThreadName());
t.setDaemon(useDaemonThread);
return t;
}
}
/**
* Sets the {@link LogEvent#event} to the logback Event.
* Used when publishing events to the {@link RingBuffer}.
*/
protected static class LogEventTranslator<Event> implements EventTranslatorOneArg<LogEvent<Event>, Event> {
@Override
public void translateTo(LogEvent<Event> logEvent, long sequence, Event event) {
logEvent.event = event;
}
}
/**
* Defines what happens when there is an exception during
* {@link RingBuffer} processing.
*
* Currently, just logs to the logback context.
*/
private class LogEventExceptionHandler implements ExceptionHandler<LogEvent<Event>> {
@Override
public void handleEventException(Throwable ex, long sequence, LogEvent<Event> event) {
addError("Unable to process event: " + ex.getMessage(), ex);
}
@Override
public void handleOnStartException(Throwable ex) {
addError("Unable start disruptor", ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
addError("Unable shutdown disruptor", ex);
}
}
/**
* Clears the event after a delegate event handler has processed the event,
* so that the event can be garbage collected.
*/
private static class EventClearingEventHandler<Event> implements EventHandler<LogEvent<Event>>, LifecycleAware {
private final EventHandler<LogEvent<Event>> delegate;
EventClearingEventHandler(EventHandler<LogEvent<Event>> delegate) {
super();
this.delegate = delegate;
}
@Override
public void onEvent(LogEvent<Event> event, long sequence, boolean endOfBatch) throws Exception {
try {
delegate.onEvent(event, sequence, endOfBatch);
} finally {
/*
* Clear the event so that it can be garbage collected.
*/
event.event = null;
}
}
@Override
public void onStart() {
if (delegate instanceof LifecycleAware) {
((LifecycleAware) delegate).onStart();
}
}
@Override
public void onShutdown() {
if (delegate instanceof LifecycleAware) {
((LifecycleAware) delegate).onShutdown();
}
}
}
@SuppressWarnings("unchecked")
@Override
public void start() {
if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
LevelFilteringStatusListener statusListener = new LevelFilteringStatusListener();
statusListener.setLevelValue(Status.WARN);
statusListener.setDelegate(new OnConsoleStatusListener());
statusListener.setContext(getContext());
statusListener.start();
getStatusManager().add(statusListener);
}
if (this.eventHandler == null) {
addError("No eventHandler was configured for appender " + name + ".");
return;
}
this.executorService = new ScheduledThreadPoolExecutor(
getThreadPoolCoreSize(),
this.threadFactory);
/*
* This ensures that cancelled tasks
* (such as the keepAlive task in AbstractLogstashTcpSocketAppender)
* do not hold up shutdown.
*/
this.executorService.setRemoveOnCancelPolicy(true);
this.disruptor = new Disruptor<LogEvent<Event>>(
this.eventFactory,
this.ringBufferSize,
this.executorService,
this.producerType,
this.waitStrategy);
/*
* Define the exceptionHandler first, so that it applies
* to all future eventHandlers.
*/
this.disruptor.setDefaultExceptionHandler(this.exceptionHandler);
this.disruptor.handleEventsWith(new EventClearingEventHandler<Event>(this.eventHandler));
this.disruptor.start();
super.start();
fireAppenderStarted();
}
@Override
public void stop() {
/*
* Check super.isStarted() instead of isStarted() because subclasses
* might override isStarted() to perform other comparisons that we don't
* want to check here. Those should be checked by subclasses
* prior to calling super.stop()
*/
if (!super.isStarted()) {
return;
}
/*
* Don't allow any more events to be appended.
*/
super.stop();
try {
this.disruptor.shutdown(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
this.executorService.shutdown();
try {
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
}
fireAppenderStopped();
}
@Override
protected void append(Event event) {
long startTime = System.nanoTime();
try {
prepareForDeferredProcessing(event);
} catch (RuntimeException e) {
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
}
if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped) % this.droppedWarnFrequency == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
} else {
long endTime = System.nanoTime();
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
fireEventAppended(event, endTime - startTime);
}
}
protected void prepareForDeferredProcessing(Event event) {
event.prepareForDeferredProcessing();
}
protected String calculateThreadName() {
List<Object> threadNameFormatParams = getThreadNameFormatParams();
return String.format(threadNameFormat, threadNameFormatParams.toArray());
}
protected List<Object> getThreadNameFormatParams() {
return Arrays.<Object>asList(
getName(),
threadNumber.incrementAndGet());
}
protected void fireAppenderStarted() {
for (Listener listener : listeners) {
listener.appenderStarted(this);
}
}
protected void fireAppenderStopped() {
for (Listener listener : listeners) {
listener.appenderStopped(this);
}
}
protected void fireEventAppended(Event event, long durationInNanos) {
for (Listener listener : listeners) {
listener.eventAppended(this, event, durationInNanos);
}
}
protected void fireEventAppendFailed(Event event, Throwable reason) {
for (Listener listener : listeners) {
listener.eventAppendFailed(this, event, reason);
}
}
protected void setEventFactory(LogEventFactory<Event> eventFactory) {
this.eventFactory = eventFactory;
}
protected EventTranslatorOneArg<LogEvent<Event>, Event> getEventTranslator() {
return eventTranslator;
}
protected void setEventTranslator(EventTranslatorOneArg<LogEvent<Event>, Event> eventTranslator) {
this.eventTranslator = eventTranslator;
}
protected ScheduledExecutorService getExecutorService() {
return executorService;
}
protected Disruptor<LogEvent<Event>> getDisruptor() {
return disruptor;
}
protected int getThreadPoolCoreSize() {
return threadPoolCoreSize;
}
protected void setThreadPoolCoreSize(int threadPoolCoreSize) {
this.threadPoolCoreSize = threadPoolCoreSize;
}
public String getThreadNameFormat() {
return threadNameFormat;
}
/**
* Pattern used by the to set the handler thread names.
* Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}.
* <p>
*
* If you change the {@link #threadFactory}, then this
* value may not be honored.
* <p>
*
* The string is a format pattern understood by {@link Formatter#format(String, Object...)}.
* {@link Formatter#format(String, Object...)} is used to
* construct the actual thread name prefix.
* The first argument (%1$s) is the string appender name.
* The second argument (%2$d) is the numerical thread index.
* Other arguments can be made available by subclasses.
*/
public void setThreadNameFormat(String threadNameFormat) {
this.threadNameFormat = threadNameFormat;
}
public int getRingBufferSize() {
return ringBufferSize;
}
public void setRingBufferSize(int ringBufferSize) {
this.ringBufferSize = ringBufferSize;
}
public ProducerType getProducerType() {
return producerType;
}
public void setProducerType(ProducerType producerType) {
this.producerType = producerType;
}
public WaitStrategy getWaitStrategy() {
return waitStrategy;
}
public void setWaitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
}
public void setWaitStrategyType(String waitStrategyType) {
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
public int getDroppedWarnFrequency() {
return droppedWarnFrequency;
}
public void setDroppedWarnFrequency(int droppedWarnFrequency) {
this.droppedWarnFrequency = droppedWarnFrequency;
}
protected EventHandler<LogEvent<Event>> getEventHandler() {
return eventHandler;
}
protected void setEventHandler(EventHandler<LogEvent<Event>> eventHandler) {
this.eventHandler = eventHandler;
}
public boolean isDaemon() {
return useDaemonThread;
}
public void setDaemon(boolean useDaemonThread) {
this.useDaemonThread = useDaemonThread;
}
public void addListener(Listener listener) {
this.listeners.add(listener);
}
public void removeListener(Listener listener) {
this.listeners.remove(listener);
}
public boolean isAddDefaultStatusListener() {
return addDefaultStatusListener;
}
public void setAddDefaultStatusListener(boolean addDefaultStatusListener) {
this.addDefaultStatusListener = addDefaultStatusListener;
}
}