Skip to content

Commit

Permalink
Implement a pool of reusable JsonGenerator (#631)
Browse files Browse the repository at this point in the history
Implement a pool of reusable JsonGenerator similar to the ReusableByteBufferPool.
`CompositeJsonFormatter` now acts as a factory creating JsonFormatter instances bound to an OutputStream. The output stream is backed by a ReusableByteBuffer.
CompositeJsonEncoder/CompositeJsonLayout acquire a reusable JsonFormatter from the pool, write the event to it then flush the underlying buffer into a byte array, a string or an another output stream if it implements StreamingEncoder.
This approach reuses the JsonGenerator and the associated memory buffers. It also reduces the number of memory copy operations required to move data to destination.

Jackson buffer recycling is also disabled. It works by maintaining a pool of buffers per thread. This strategy works best when one JsonGenerator is created per thread, typically in J2EE environments - which is not the case here. Each JsonFormatter uses its own instance of JsonGenerator and is reused multiple times possibly on different threads. The memory buffers allocated by the JsonGenerator do not belong to a particular thread - hence the recycling feature should be disabled.
  • Loading branch information
brenuart authored Sep 9, 2021
1 parent b4b7eb2 commit 03cc481
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package net.logstash.logback.composite;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.util.Objects;
import java.util.ServiceConfigurationError;

Expand All @@ -34,6 +34,7 @@
import ch.qos.logback.core.spi.LifeCycle;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonFactory.Feature;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -71,7 +72,7 @@ public abstract class CompositeJsonFormatter<Event extends DeferredProcessingAwa
/**
* The providers that are used to populate the output JSON object.
*/
private JsonProviders<Event> jsonProviders = new JsonProviders<Event>();
private JsonProviders<Event> jsonProviders = new JsonProviders<>();

private JsonEncoding encoding = JsonEncoding.UTF8;

Expand Down Expand Up @@ -118,6 +119,41 @@ public boolean isStarted() {
return started;
}

/**
* Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}.
*
* @param outputStream the output stream used by the {@link JsonFormatter}
* @return {@link JsonFormatter} writing JSON content in the output stream
* @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content
*/
public JsonFormatter createJsonFormatter(OutputStream outputStream) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Formatter is not started");
}

JsonGenerator generator = createGenerator(outputStream);
return new JsonFormatter(generator);
}


public class JsonFormatter implements Closeable {
private final JsonGenerator generator;

public JsonFormatter(JsonGenerator generator) {
this.generator = Objects.requireNonNull(generator);
}

public void writeEvent(Event event) throws IOException {
writeEventToGenerator(generator, event);
}

@Override
public void close() throws IOException {
this.generator.close();
}
}


private JsonFactory createJsonFactory() {
ObjectMapper objectMapper = new ObjectMapper()
/*
Expand All @@ -133,27 +169,23 @@ private JsonFactory createJsonFactory() {
}
}

return this.jsonFactoryDecorator.decorate(objectMapper.getFactory());
return decorateFactory(objectMapper.getFactory());
}

public void writeEventToOutputStream(Event event, OutputStream outputStream) throws IOException {
try (JsonGenerator generator = createGenerator(outputStream)) {
writeEventToGenerator(generator, event);
}
/*
* Do not flush the outputStream.
*
* Allow something higher in the stack (e.g. the encoder/appender)
* to determine appropriate times to flush.
*/
}

public void writeEventToWriter(Event event, Writer writer) throws IOException {
try (JsonGenerator generator = createGenerator(writer)) {
writeEventToGenerator(generator, event);
}
private JsonFactory decorateFactory(JsonFactory factory) {
return this.jsonFactoryDecorator.decorate(factory)
/*
* Jackson buffer recycling works by maintaining a pool of buffers per thread. This
* feature works best when one JsonGenerator is created per thread, typically in J2EE
* environments.
*
* Each JsonFormatter uses its own instance of JsonGenerator and is reused multiple times
* possibly on different threads. The memory buffers allocated by the JsonGenerator do
* not belong to a particular thread - hence the recycling feature should be disabled.
*/
.disable(Feature.USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING);
}

protected void writeEventToGenerator(JsonGenerator generator, Event event) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Encoding attempted before starting.");
Expand All @@ -172,10 +204,6 @@ protected void prepareForDeferredProcessing(Event event) {
private JsonGenerator createGenerator(OutputStream outputStream) throws IOException {
return decorateGenerator(jsonFactory.createGenerator(outputStream, encoding));
}

private JsonGenerator createGenerator(Writer writer) throws IOException {
return decorateGenerator(jsonFactory.createGenerator(writer));
}

private JsonGenerator decorateGenerator(JsonGenerator generator) {
return this.jsonGeneratorDecorator.decorate(generator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Objects;

import net.logstash.logback.composite.CompositeJsonFormatter;
import net.logstash.logback.composite.JsonProviders;
import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;
import net.logstash.logback.util.ReusableJsonFormatterPool;

import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
Expand All @@ -46,16 +46,12 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware
* unnecessary memory allocations and reduce pressure on the garbage collector.
*/
private int minBufferSize = 1024;

/**
* Provides reusable byte buffers (initialized when the encoder is started).
*/
private ReusableByteBufferPool bufferPool;

private Encoder<Event> prefix;
private Encoder<Event> suffix;

private final CompositeJsonFormatter<Event> formatter;
private ReusableJsonFormatterPool<Event> formatterPool;

private String lineSeparator = System.lineSeparator();

Expand All @@ -65,7 +61,7 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware

public CompositeJsonEncoder() {
super();
this.formatter = createFormatter();
this.formatter = Objects.requireNonNull(createFormatter());
}

protected abstract CompositeJsonFormatter<Event> createFormatter();
Expand All @@ -75,12 +71,11 @@ public void encode(Event event, OutputStream outputStream) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Encoder is not started");
}

encode(prefix, event, outputStream);
formatter.writeEventToOutputStream(event, outputStream);
encode(suffix, event, outputStream);

outputStream.write(lineSeparatorBytes);
try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
encode(cachedFormatter, event);
cachedFormatter.getBuffer().writeTo(outputStream);
}
}

@Override
Expand All @@ -89,18 +84,23 @@ public byte[] encode(Event event) {
throw new IllegalStateException("Encoder is not started");
}

ReusableByteBuffer buffer = bufferPool.acquire();
try {
encode(event, buffer);
return buffer.toByteArray();
try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
encode(cachedFormatter, event);
return cachedFormatter.getBuffer().toByteArray();

} catch (IOException e) {
addWarn("Error encountered while encoding log event. Event: " + event, e);
return EMPTY_BYTES;
} finally {
bufferPool.release(buffer);
}
}

private void encode(ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter, Event event) throws IOException {
encode(prefix, event, cachedFormatter.getBuffer());
cachedFormatter.write(event);
encode(suffix, event, cachedFormatter.getBuffer());
cachedFormatter.getBuffer().write(lineSeparatorBytes);
}

private void encode(Encoder<Event> encoder, Event event, OutputStream outputStream) throws IOException {
if (encoder != null) {
byte[] data = encoder.encode(event);
Expand All @@ -117,7 +117,7 @@ public void start() {
}

super.start();
this.bufferPool = new ReusableByteBufferPool(this.minBufferSize);

formatter.setContext(getContext());
formatter.start();
charset = Charset.forName(formatter.getEncoding());
Expand All @@ -126,6 +126,8 @@ public void start() {
: this.lineSeparator.getBytes(charset);
startWrapped(prefix);
startWrapped(suffix);

this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -168,6 +170,8 @@ public void stop() {
formatter.stop();
stopWrapped(prefix);
stopWrapped(suffix);

this.formatterPool = null;
}
}

Expand Down
58 changes: 33 additions & 25 deletions src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Objects;

import net.logstash.logback.composite.CompositeJsonFormatter;
import net.logstash.logback.composite.JsonProviders;
import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.encoder.CompositeJsonEncoder;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;
import net.logstash.logback.util.ReusableJsonFormatterPool;

import ch.qos.logback.core.Layout;
import ch.qos.logback.core.LayoutBase;
Expand Down Expand Up @@ -61,17 +61,13 @@ public abstract class CompositeJsonLayout<Event extends DeferredProcessingAware>
*/
private int minBufferSize = 1024;

/**
* Provides reusable byte buffers (initialized when layout is started)
*/
private ReusableByteBufferPool bufferPool;


private final CompositeJsonFormatter<Event> formatter;

private ReusableJsonFormatterPool<Event> formatterPool;

public CompositeJsonLayout() {
super();
this.formatter = createFormatter();
this.formatter = Objects.requireNonNull(createFormatter());
}

protected abstract CompositeJsonFormatter<Event> createFormatter();
Expand All @@ -82,26 +78,29 @@ public String doLayout(Event event) {
throw new IllegalStateException("Layout is not started");
}

ReusableByteBuffer buffer = this.bufferPool.acquire();
try (OutputStreamWriter writer = new OutputStreamWriter(buffer)) {
try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
writeEvent(cachedFormatter, event);
return new String(cachedFormatter.getBuffer().toByteArray());

} catch (IOException e) {
addWarn("Error formatting logging event", e);
return null;

}
}

private void writeEvent(ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter, Event event) throws IOException {
try (Writer writer = new OutputStreamWriter(cachedFormatter.getBuffer())) {
writeLayout(prefix, writer, event);
writeFormatter(writer, event);
cachedFormatter.write(event);
writeLayout(suffix, writer, event);

if (lineSeparator != null) {
writer.write(lineSeparator);
}
writer.flush();

return new String(buffer.toByteArray());
} catch (IOException e) {
addWarn("Error formatting logging event", e);
return null;
} finally {
bufferPool.release(buffer);
}
}


private void writeLayout(Layout<Event> wrapped, Writer writer, Event event) throws IOException {
if (wrapped == null) {
Expand All @@ -111,22 +110,25 @@ private void writeLayout(Layout<Event> wrapped, Writer writer, Event event) thro
String str = wrapped.doLayout(event);
if (str != null) {
writer.write(str);
writer.flush();
}
}

private void writeFormatter(Writer writer, Event event) throws IOException {
this.formatter.writeEventToWriter(event, writer);
}


@Override
public void start() {
if (isStarted()) {
return;
}

super.start();
this.bufferPool = new ReusableByteBufferPool(this.minBufferSize);
formatter.setContext(getContext());
formatter.start();

startWrapped(prefix);
startWrapped(suffix);

this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize);
}

private void startWrapped(Layout<Event> wrapped) {
Expand All @@ -151,10 +153,16 @@ private void startWrapped(Layout<Event> wrapped) {

@Override
public void stop() {
if (!isStarted()) {
return;
}

super.stop();
formatter.stop();
stopWrapped(prefix);
stopWrapped(suffix);

this.formatterPool = null;
}

private void stopWrapped(Layout<Event> wrapped) {
Expand Down
Loading

0 comments on commit 03cc481

Please sign in to comment.