Skip to content

Commit

Permalink
96 greater timestamp precision (#97)
Browse files Browse the repository at this point in the history
* Re 96: Using fine precision event time provided by log4j for log line timestamps. Loki as well as our serializer support it.

* Re #96: Fixed tests.

* Re #96: Fixed tests.

* Re #96: Improved tests.
  • Loading branch information
tkowalcz authored Aug 26, 2022
1 parent a8b80c7 commit 4b8d644
Show file tree
Hide file tree
Showing 19 changed files with 92 additions and 36 deletions.
20 changes: 14 additions & 6 deletions core/src/main/java/pl/tkowalcz/tjahzi/LogBufferSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@ public int calculateRequiredSize(
) {
int logLineSize = Integer.BYTES + line.remaining();

int timestampSize = Long.BYTES;
int nanosecondResolutionTimestampSize = 2 * Long.BYTES;
int labelsCountSize = Integer.BYTES;
int headerAndLabelsSize = timestampSize + labelsCountSize + labelSerializer.sizeBytes();
int headerAndLabelsSize = nanosecondResolutionTimestampSize
+ labelsCountSize
+ labelSerializer.sizeBytes();

return headerAndLabelsSize + logLineSize;
}

public void writeTo(
int cursor,
long timestamp,
long epochMillisecond,
long nanoOfMillisecond,
LabelSerializer serializedLabels,
ByteBuffer line
) {
cursor = writeHeader(
cursor,
timestamp,
epochMillisecond,
nanoOfMillisecond,
serializedLabels
);

Expand All @@ -43,10 +47,14 @@ public void writeTo(

private int writeHeader(
int cursor,
long timestamp,
long epochMillisecond,
long nanoOfMillisecond,
LabelSerializer serializedLabels
) {
buffer.putLong(cursor, timestamp);
buffer.putLong(cursor, epochMillisecond);
cursor += Long.BYTES;

buffer.putLong(cursor, nanoOfMillisecond);
cursor += Long.BYTES;

buffer.putInt(cursor, serializedLabels.getLabelsCount());
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/pl/tkowalcz/tjahzi/LogBufferTranscoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ public LogBufferTranscoder(Map<String, String> staticLabels, AtomicBuffer buffer
}

public void deserializeIntoByteBuf(DirectBuffer buffer, int index, OutputBuffer outputBuffer) {
long timestamp = buffer.getLong(index);
long epochMillisecond = buffer.getLong(index);
index += Long.BYTES;

long nanoOfMillisecond = buffer.getLong(index);
index += Long.BYTES;

TextBuilder labelsBuilder = TextBuilders.threadLocal();
Expand All @@ -49,7 +52,12 @@ public void deserializeIntoByteBuf(DirectBuffer buffer, int index, OutputBuffer
);

logLineHolder.readerIndex(index);
outputBuffer.addLogLine(actualLabels, timestamp, logLineHolder);
outputBuffer.addLogLine(
actualLabels,
epochMillisecond,
nanoOfMillisecond,
logLineHolder
);
}

private int readLabels(
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/pl/tkowalcz/tjahzi/OutputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ public void clear() {
target.clear();
}

public void addLogLine(CharSequence labels, long timestamp, ByteBuf logLine) {
public void addLogLine(
CharSequence labels,
long epochMillisecond,
long nanoOfMillisecond,
ByteBuf logLine
) {
PushRequestSerializer.serialize(target);
StreamSerializer.serialize(
timestamp,
epochMillisecond,
nanoOfMillisecond,
logLine,
labels,
target
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/java/pl/tkowalcz/tjahzi/TjahziLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ public TjahziLogger(ManyToOneRingBuffer logBuffer, MonitoringModule monitoringMo
this.serializer = new LogBufferSerializer(logBuffer.buffer());
}

public TjahziLogger log(long timestamp,
LabelSerializer serializedLabels,
ByteBuffer line) {
public TjahziLogger log(
long epochMillisecond,
long nanoOfMillisecond,
LabelSerializer serializedLabels,
ByteBuffer line
) {
int requiredSize = serializer.calculateRequiredSize(
serializedLabels,
line
Expand All @@ -32,7 +35,8 @@ public TjahziLogger log(long timestamp,
int claim = logBuffer.tryClaim(LOG_MESSAGE_TYPE_ID, requiredSize);
if (claim > 0) {
putMessageOnRing(
timestamp,
epochMillisecond,
nanoOfMillisecond,
serializedLabels,
line,
claim
Expand All @@ -45,7 +49,8 @@ public TjahziLogger log(long timestamp,
}

private void putMessageOnRing(
long timestamp,
long epochMillisecond,
long nanoOfMillisecond,
LabelSerializer serializedLabels,
ByteBuffer line,
int claim
Expand All @@ -54,7 +59,8 @@ private void putMessageOnRing(
serializer
.writeTo(
claim,
timestamp,
epochMillisecond,
nanoOfMillisecond,
serializedLabels,
line
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ public class EntrySerializer {
public static final int LOG_LINE_FIELD_NUMBER = 2;

public static void serialize(
long timestamp,
long epochMillisecond,
long nanoOfMillisecond,
ByteBuf logLine,
ByteBuf target
) {
int messageStartIndex = target.writerIndex();
target.writeInt(0);

target.writeByte(TIMESTAMP_FIELD_NUMBER << 3 | LENGTH_DELIMITED_TYPE);
TimestampSerializer.serialize(timestamp, target);
TimestampSerializer.serialize(epochMillisecond, nanoOfMillisecond, target);

target.writeByte(LOG_LINE_FIELD_NUMBER << 3 | LENGTH_DELIMITED_TYPE);
StringSerializer.serialize(logLine, target);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public class StreamSerializer {
public static final int ENTRY_FIELD_NUMBER = 2;

public static void serialize(
long timestamp,
long epochMillisecond,
long nanoOfMillisecond,
ByteBuf logLine,
CharSequence labels,
ByteBuf target
Expand All @@ -22,7 +23,7 @@ public static void serialize(
StringSerializer.serialize(labels, target);

target.writeByte(ENTRY_FIELD_NUMBER << 3 | LENGTH_DELIMITED_TYPE);
EntrySerializer.serialize(timestamp, logLine, target);
EntrySerializer.serialize(epochMillisecond, nanoOfMillisecond, logLine, target);

Protobuf.writeSize(target, messageStartIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
public class TimestampSerializer {

public static void serialize(
long timestamp,
long epochMillisecond,
long nanoOfMillisecond,
ByteBuf target
) {
long timestampSeconds = timestamp / 1000;
int timestampNanos = (int) (timestamp % 1000) * 1000_000;
long timestampSeconds = epochMillisecond / 1000;
long timestampNanos = (epochMillisecond % 1000) * 1000_000 + nanoOfMillisecond;

int messageStartIndex = target.writerIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void shouldDeserialize() throws IOException {
serializer.writeTo(
0,
System.currentTimeMillis(),
0,
labelSerializer,
ByteBuffer.wrap("Test".getBytes())
);
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/java/pl/tkowalcz/tjahzi/HeadersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void shouldIncludeAdditionalHeaders() {
TjahziLogger logger = loggingSystem.createLogger();
logger.log(
System.currentTimeMillis(),
0,
labelSerializer,
ByteBuffer.wrap("Test".getBytes())
);
Expand Down Expand Up @@ -152,6 +153,7 @@ void shouldHandleCaseWithNoAdditionalHeaders() {
TjahziLogger logger = loggingSystem.createLogger();
logger.log(
System.currentTimeMillis(),
0,
labelSerializer,
ByteBuffer.wrap("Test".getBytes())
);
Expand Down Expand Up @@ -207,6 +209,7 @@ void shouldNotOverrideCrucialHeaders() {
TjahziLogger logger = loggingSystem.createLogger();
logger.log(
System.currentTimeMillis(),
0,
labelSerializer,
ByteBuffer.wrap("Test".getBytes())
);
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/pl/tkowalcz/tjahzi/LogBufferAgentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void shouldSendDataIfOverSizeLimit() throws IOException {
for (int i = 0; i < 100; i++) {
logger.log(
42L,
0,
LabelSerializerCreator.from(Map.of()),
ByteBuffer.wrap((
"Cupcake ipsum dolor sit amet cake wafer. " +
Expand Down Expand Up @@ -118,6 +119,7 @@ void shouldNotSendDataBelowSizeLimit() throws IOException {

logger.log(
42L,
0,
LabelSerializerCreator.from(Map.of()),
ByteBuffer.wrap("Test".getBytes())
);
Expand Down Expand Up @@ -207,6 +209,7 @@ void shouldSendDataBelowSizeLimitIfTimeoutExpires() throws IOException {
// When
logger.log(
42L,
0,
LabelSerializerCreator.from(Map.of()),
ByteBuffer.wrap("Test".getBytes())
);
Expand All @@ -215,6 +218,7 @@ void shouldSendDataBelowSizeLimitIfTimeoutExpires() throws IOException {

logger.log(
42L,
0,
LabelSerializerCreator.from(Map.of()),
ByteBuffer.wrap("Test".getBytes())
);
Expand Down Expand Up @@ -263,6 +267,7 @@ void shouldDrainAllMessagesOnClose() throws IOException {
for (int i = 0; i < LogShipperAgent.MAX_MESSAGES_TO_RETRIEVE * 2 + 1; i++) {
logger.log(
42L,
0,
LabelSerializerCreator.from(Map.of()),
ByteBuffer.wrap("Test".getBytes())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static Stream<Arguments> variousMessageConfigurations() {
+ 21 // log level
+ 45 // labels string sizes
+ 6 * 4 // labels string length indicators
+ 8 // timestamp
+ 8 // timestamp millis
+ 8 // timestamp nanos
+ 4 // labels count
),
Arguments.of(
Expand All @@ -48,7 +49,8 @@ static Stream<Arguments> variousMessageConfigurations() {
"[Mando] You have something I want.",
38 // log line
+ 21 // log level
+ 8 // timestamp
+ 8 // timestamp millis
+ 8 // timestamp nanos
+ 4 // labels count
),
Arguments.of(
Expand All @@ -63,7 +65,8 @@ static Stream<Arguments> variousMessageConfigurations() {
38 // log line
+ 45 // labels string sizes
+ 6 * 4 // labels string length indicators
+ 8 // timestamp
+ 8 // timestamp millis
+ 8 // timestamp nanos
+ 4 // labels count
)
);
Expand Down Expand Up @@ -93,6 +96,7 @@ void shouldSerializeMessage(
serializer.writeTo(
0,
32042L,
882L,
labelSerializer,
ByteBuffer.wrap(logLine.getBytes())
);
Expand Down Expand Up @@ -120,7 +124,7 @@ void shouldSerializeMessage(
assertThat(stream.getEntriesList().get(0).getTimestamp()).isEqualTo(
Timestamp.newBuilder()
.setSeconds(32)
.setNanos(42_000_000)
.setNanos(42_000_882)
.build()
);
assertThat(stream.getEntriesList().get(0).getLine()).isEqualTo(logLine);
Expand All @@ -130,9 +134,9 @@ void shouldSerializeMessage(

assertThat(stream.getLabels()).isEqualToIgnoringWhitespace(
Streams.concat(
incomingLabelsStream,
logLevelStream
)
incomingLabelsStream,
logLevelStream
)
.collect(joining(",", "{", "}"))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void shouldDeserializeMessageAndAddStaticLabels(
serializer.writeTo(
0,
32042L,
882L,
labelSerializer,
ByteBuffer.wrap(logLine.getBytes())
);
Expand Down Expand Up @@ -80,7 +81,7 @@ void shouldDeserializeMessageAndAddStaticLabels(
assertThat(stream.getEntriesList().get(0).getTimestamp()).isEqualTo(
Timestamp.newBuilder()
.setSeconds(32)
.setNanos(42_000_000)
.setNanos(42_000_882)
.build()
);
assertThat(stream.getEntriesList().get(0).getLine()).isEqualTo(logLine);
Expand Down Expand Up @@ -122,6 +123,7 @@ void shouldOverrideStaticLabelsWithIncoming() throws InvalidProtocolBufferExcept
serializer.writeTo(
0,
32042L,
0,
labelSerializer,
ByteBuffer.wrap("[Mando] You have something I want.".getBytes())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void sendData() {
// When
logger.log(
timestamp,
882L,
labelSerializer,
ByteBuffer.wrap("Test".getBytes())
);
Expand Down Expand Up @@ -124,7 +125,7 @@ void sendData() {
.body("data.result[0].stream.server", equalTo("127.0.0.1"))
.body("data.result[0].stream.version", equalTo("0.43"))
.body("data.result[0].stream.level", equalTo("warn"))
.body("data.result[0].values[0]", hasItems("" + (timestamp * 1000_000), "Test"));
.body("data.result[0].values[0]", hasItems("" + (timestamp * 1000_000 +882), "Test"));
});
}
}
3 changes: 2 additions & 1 deletion core/src/test/java/pl/tkowalcz/tjahzi/LoggingSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void sendData() {
for (int i = 0; i < 1000; i++) {
logger.log(
timestamp + i,
9974,
LabelSerializerCreator.from(
Map.of("level", "warn")
),
Expand Down Expand Up @@ -149,7 +150,7 @@ public boolean matches(Object o) {
}

long actualTimestamp = Long.parseLong(list.get(0).toString());
long expectedTimestamp = (timestamp + index) * 1000_000;
long expectedTimestamp = (timestamp + index) * 1000_000 + 9974;

String actualLogLine = list.get(1).toString();
Object expectedLogLine = "Test" + index;
Expand Down
Loading

0 comments on commit 4b8d644

Please sign in to comment.