Skip to content

Commit

Permalink
Drop sorting records by time
Browse files Browse the repository at this point in the history
  • Loading branch information
nehaev committed Nov 1, 2024
1 parent ea9bd9b commit 1812eed
Show file tree
Hide file tree
Showing 11 changed files with 10 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@

public final class AsyncBufferPipeline {

private static final Comparator<LogRecord> compareByTime = (e1, e2) -> {
var tsCmp = Long.compare(e1.timestampMs, e2.timestampMs);
return tsCmp == 0 ? Integer.compare(e1.nanosInMs, e2.nanosInMs) : tsCmp;
};

private static final Comparator<LogRecord> compareByStream = (e1, e2) ->
Long.compare(e1.stream.hash, e2.stream.hash);

Expand Down Expand Up @@ -98,14 +93,10 @@ public final class AsyncBufferPipeline {
private ScheduledFuture<?> drainScheduledFuture;

public AsyncBufferPipeline(PipelineConfig conf) {
Optional<Comparator<LogRecord>> logRecordComparator = Optional.empty();
if (conf.staticLabels) {
if (conf.sortByTime)
logRecordComparator = Optional.of(compareByTime);
} else {
logRecordComparator = Optional.of(
conf.sortByTime ? compareByStream.thenComparing(compareByTime) : compareByStream);
}
Optional<Comparator<LogRecord>> logRecordComparator = conf.staticLabels
? Optional.empty()
: Optional.of(compareByStream);

ByteBufferFactory bufferFactory = new ByteBufferFactory(conf.useDirectBuffers);

batcher = new Batcher(conf.batchMaxItems, conf.batchMaxBytes, conf.batchTimeoutMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final long batchTimeoutMs;

/**
* If true, log records in batch are sorted by timestamp.
* If false, records will be sent to Loki in arrival order.
* Turn this on if you see 'entry out of order' error from Loki.
*/
public final boolean sortByTime;

/**
* If you use only one label for all log records, you can
* set this flag to true and save some CPU time on grouping records by label.
Expand Down Expand Up @@ -162,7 +155,6 @@ private PipelineConfig(
int batchMaxItems,
int batchMaxBytes,
long batchTimeoutMs,
boolean sortByTime,
boolean staticLabels,
long sendQueueMaxBytes,
int maxRetries,
Expand All @@ -181,7 +173,6 @@ private PipelineConfig(
this.batchMaxItems = batchMaxItems;
this.batchMaxBytes = batchMaxBytes;
this.batchTimeoutMs = batchTimeoutMs;
this.sortByTime = sortByTime;
this.staticLabels = staticLabels;
this.sendQueueMaxBytes = sendQueueMaxBytes;
this.maxRetries = maxRetries;
Expand Down Expand Up @@ -209,7 +200,6 @@ public static class Builder {
private int batchMaxItems = 1000;
private int batchMaxBytes = 4 * 1024 * 1024;
private long batchTimeoutMs = 60 * 1000;
private boolean sortByTime = false;
private boolean staticLabels = false;
private long sendQueueMaxBytes = batchMaxBytes * 10;
private int maxRetries = 2;
Expand All @@ -232,7 +222,6 @@ public PipelineConfig build() {
batchMaxItems,
batchMaxBytes,
batchTimeoutMs,
sortByTime,
staticLabels,
sendQueueMaxBytes,
maxRetries,
Expand Down Expand Up @@ -270,11 +259,6 @@ public Builder setBatchTimeoutMs(long batchTimeoutMs) {
return this;
}

public Builder setSortByTime(boolean sortByTime) {
this.sortByTime = sortByTime;
return this;
}

public Builder setStaticLabels(boolean staticLabels) {
this.staticLabels = staticLabels;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ public void setStreamCache(Cache<String, LogRecordStream> streamCache) {

private LabelCfg label = new LabelCfg();

/**
* If true, log records in batch are sorted by timestamp.
* If false, records will be sent to Loki in arrival order.
* Turn this on if you see 'entry out of order' error from Loki.
*/
private boolean sortByTime = false;

/**
* If you use only one label for all log records, you can
* set this flag to true and save some CPU time on grouping records by label.
Expand Down Expand Up @@ -293,13 +286,6 @@ public void setMessage(Layout<ILoggingEvent> message) {
this.messageLayout = message;
}

public boolean getSortByTime() {
return sortByTime;
}
public void setSortByTime(boolean sortByTime) {
this.sortByTime = sortByTime;
}

public boolean getStaticLabels() {
return staticLabels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public void start() {
.setBatchMaxItems(batchMaxItems)
.setBatchMaxBytes(batchMaxBytes)
.setBatchTimeoutMs(batchTimeoutMs)
.setSortByTime(encoder.getSortByTime())
.setStaticLabels(encoder.getStaticLabels())
.setSendQueueMaxBytes(sendQueueMaxBytes)
.setMaxRetries(maxRetries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ public interface Loki4jEncoder extends ContextAware, LifeCycle {

WriterFactory getWriterFactory();

boolean getSortByTime();

boolean getStaticLabels();

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void testLabelParsingFailed() {
var encoder1 = toStringEncoder(
labelCfg("level=%level,app=", ",", "=", true, false),
plainTextMsgLayout("l=%level c=%logger{20} t=%thread | %msg %ex{1}"),
true,
false);
var sender = dummySender();
assertThrows("KV separation failed", IllegalArgumentException.class, () ->
Expand All @@ -91,7 +90,6 @@ public void testLabelParsingFailed() {
var encoder2 = toStringEncoder(
labelCfg("level=%lev{,app=x", ",", "=", true, false),
plainTextMsgLayout("l=%level c=%logger{20} t=%thread | %msg %ex{1}"),
true,
false);
assertThrows("Converter parsing failed", IllegalArgumentException.class, () ->
withAppender(appender(30, 400L, encoder2, sender), appender -> {
Expand All @@ -106,7 +104,6 @@ public void testLogRecordStreams() {
var encoder = toStringEncoder(
labelCfg("level=%level,app=my-app,thread=%thread", ",", "=", true, false),
plainTextMsgLayout("l=%level | %msg %ex{1}"),
true,
false);
encoder.setContext(new LoggerContext());
encoder.start();
Expand All @@ -131,7 +128,6 @@ public void testLabelValuesUnaffectedByKVSeparation() {
var encoder = toStringEncoder(
labelCfg("level=%level.class=%logger.thread=%thread", ".", "=", true, false),
plainTextMsgLayout("l=%level | %msg %ex{1}"),
true,
false);
encoder.setContext(new LoggerContext());
encoder.start();
Expand Down Expand Up @@ -162,7 +158,7 @@ public void testLabelMarker() {
appender(
4,
1000L,
toStringEncoder(labelCfg("l=%level", ",", "=", true, true), plainTextMsgLayout("%level | %msg"), false, false),
toStringEncoder(labelCfg("l=%level", ",", "=", true, true), plainTextMsgLayout("%level | %msg"), false),
sender), appender -> {
appender.append(events);
appender.waitAllAppended();
Expand Down Expand Up @@ -200,7 +196,7 @@ public void testMetadataMarker() {
appender(
4,
1000L,
toStringEncoder(labelMetadataCfg("l=%level", "t=%thread,c=%logger", true), plainTextMsgLayout("%level | %msg"), false, false),
toStringEncoder(labelMetadataCfg("l=%level", "t=%thread,c=%logger", true), plainTextMsgLayout("%level | %msg"), false),
sender), appender -> {
appender.append(events);
appender.waitAllAppended();
Expand Down Expand Up @@ -233,7 +229,7 @@ public void testLabelAndMetadataMarker() {
appender(
4,
1000L,
toStringEncoder(labelMetadataCfg("l=%level", "t=%thread,c=%logger", true), plainTextMsgLayout("%level | %msg"), false, false),
toStringEncoder(labelMetadataCfg("l=%level", "t=%thread,c=%logger", true), plainTextMsgLayout("%level | %msg"), false),
sender), appender -> {
appender.append(events);
appender.waitAllAppended();
Expand Down Expand Up @@ -265,7 +261,7 @@ public void testOrdering() {
appender(
6,
1000L,
toStringEncoder(labelCfg("l=%level", ",", "=", true, false), plainTextMsgLayout("%level | %msg"), false, true),
toStringEncoder(labelCfg("l=%level", ",", "=", true, false), plainTextMsgLayout("%level | %msg"), true),
sender), appender -> {
appender.append(eventsToOrder);
appender.waitAllAppended();
Expand All @@ -288,30 +284,7 @@ public void testOrdering() {
appender(
6,
1000L,
toStringEncoder(labelCfg("l=%level", ",", "=", true, false), plainTextMsgLayout("%level | %msg"), true, true),
sender), appender -> {
appender.append(eventsToOrder);
appender.waitAllAppended();
assertEquals(
"static labels, sort by time",
StringPayload.builder()
.stream("[l, INFO]",
"ts=100 INFO | Test message 3",
"ts=103 DEBUG | Test message 2",
"ts=103 ERROR | Test message 5",
"ts=104 WARN | Test message 4",
"ts=105 INFO | Test message 1",
"ts=110 INFO | Test message 6")
.build(),
StringPayload.parse(sender.lastSendData()));
return null;
});

withAppender(
appender(
6,
1000L,
toStringEncoder(labelCfg("l=%level", ",", "=", true, false), plainTextMsgLayout("%level | %msg"), false, false),
toStringEncoder(labelCfg("l=%level", ",", "=", true, false), plainTextMsgLayout("%level | %msg"), false),
sender), appender -> {
appender.append(eventsToOrder);
appender.waitAllAppended();
Expand All @@ -332,31 +305,5 @@ public void testOrdering() {
StringPayload.parse(sender.lastSendData()));
return null;
});

withAppender(
appender(
6,
1000L,
toStringEncoder(labelCfg("l=%level", ",", "=", true, false), plainTextMsgLayout("%level | %msg"), true, false),
sender), appender -> {
appender.append(eventsToOrder);
appender.waitAllAppended();
assertEquals(
"dynamic labels, sort by time",
StringPayload.builder()
.stream("[l, INFO]",
"ts=100 INFO | Test message 3",
"ts=105 INFO | Test message 1",
"ts=110 INFO | Test message 6")
.stream("[l, DEBUG]",
"ts=103 DEBUG | Test message 2")
.stream("[l, WARN]",
"ts=104 WARN | Test message 4")
.stream("[l, ERROR]",
"ts=103 ERROR | Test message 5")
.build(),
StringPayload.parse(sender.lastSendData()));
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public static JsonEncoder jsonEncoder(boolean staticLabels, String testLabel, La
encoder.setStaticLabels(staticLabels);
encoder.setLabel(labelCfg("test=" + testLabel + ",level=%level,service_name=my-app", ",", "=", true, false));
encoder.setMessage(msgLayout);
encoder.setSortByTime(true);
return encoder;
}

Expand Down Expand Up @@ -129,7 +128,6 @@ public static AbstractLoki4jEncoder defaultToStringEncoder() {
return toStringEncoder(
labelCfg("level=%level,app=my-app", ",", "=", true, false),
plainTextMsgLayout("l=%level c=%logger{20} t=%thread | %msg %ex{1}"),
true,
false);
}

Expand All @@ -151,7 +149,6 @@ public static AbstractLoki4jEncoder wrapToEncoder(
BiFunction<Integer, ByteBufferFactory, Writer> writerFactory,
AbstractLoki4jEncoder.LabelCfg label,
Layout<ILoggingEvent> messageLayout,
boolean sortByTime,
boolean staticLabels) {
var encoder = new AbstractLoki4jEncoder() {
@Override
Expand All @@ -161,17 +158,15 @@ public PipelineConfig.WriterFactory getWriterFactory() {
};
encoder.setLabel(label);
encoder.setMessage(messageLayout);
encoder.setSortByTime(sortByTime);
encoder.setStaticLabels(staticLabels);
return encoder;
}

public static AbstractLoki4jEncoder toStringEncoder(
AbstractLoki4jEncoder.LabelCfg label,
Layout<ILoggingEvent> messageLayout,
boolean sortByTime,
boolean staticLabels) {
return wrapToEncoder(Generators::stringWriter, label, messageLayout, sortByTime, staticLabels);
return wrapToEncoder(Generators::stringWriter, label, messageLayout, staticLabels);
}

public static AbstractLoki4jEncoder.LabelCfg labelCfg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public void testWorksInAppender() {
var encoder = toStringEncoder(
labelCfg("app=my-app", ",", "=", true, false),
jsonMsgLayout(),
false,
false
);
var sender = dummySender();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public void testDrainOnStopWhileEncoderFails() {
},
labelCfg("level=%level,app=my-app", ",", "=", true, false),
plainTextMsgLayout("l=%level c=%logger{20} t=%thread | %msg %ex{1}"),
true,
false);
var sender = dummySender();
var appender = appender(4, 4000L, encoder, sender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ private static AbstractLoki4jEncoder initEncoder(Cache<String, LogRecordStream>
var e = toStringEncoder(
labelCfg("level=%level,app=my-app,date=%date{HH:mm:ss.SSS}", ",", "=", true, false),
plainTextMsgLayout("l=%level c=%logger{20} t=%thread | %msg %ex{1}"),
false,
false);
e.setContext(new LoggerContext());
e.getLabel().setStreamCache(streamCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ private static AppenderWrapper initApp(LabelCfg labelCfg) {
var e = toStringEncoder(
labelCfg,
plainTextMsgLayout("l=%level c=%logger{20} t=%thread | %msg %ex{1}"),
true,
false);
var a = appender(1000, 60_000L, e, dummySender());
a.setSendQueueMaxBytes(Long.MAX_VALUE);
Expand Down

0 comments on commit 1812eed

Please sign in to comment.