From 0a254cf223cd48201c6f552a3f04835ed73d69fd Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 8 May 2020 23:16:07 +0200 Subject: [PATCH] Serialize Monitoring Bulk Request Compressed (#56410) (#56442) Even with changes from #48854 we're still seeing significant (as in tens and hundreds of MB) buffer usage for bulk exports in some cases which destabilizes master nodes. Since we need to know the serialized length of the bulk body we can't do the serialization in a streaming manner. (also it's not easily doable with the HTTP client API we're using anyway). => let's at least serialize on heap in compressed form and decompress as we're streaming to the HTTP connection. For small requests this adds negligible overhead but for large requests this reduces the size of the payload field by about an order of magnitude (empirically determined) which is a massive reduction in size when considering O(100MB) bulk requests. --- .../exporter/http/HttpExportBulk.java | 63 ++++++++++++++++--- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java index 3468a49423dff..9410f4f67aedb 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.time.DateFormatter; @@ -28,7 +29,9 @@ import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.ExportException; +import java.io.FilterOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Map; @@ -56,10 +59,15 @@ class HttpExportBulk extends ExportBulk { private final DateFormatter formatter; /** - * The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}. + * The compressed bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}. */ private BytesReference payload = null; + /** + * Uncompressed length of {@link #payload} contents. + */ + private long payloadLength = -1L; + HttpExportBulk(final String name, final RestClient client, final Map parameters, final DateFormatter dateTimeFormatter, final ThreadContext threadContext) { super(name, threadContext); @@ -73,14 +81,17 @@ class HttpExportBulk extends ExportBulk { public void doAdd(Collection docs) throws ExportException { try { if (docs != null && docs.isEmpty() == false) { - try (BytesStreamOutput payload = new BytesStreamOutput()) { + final BytesStreamOutput scratch = new BytesStreamOutput(); + final CountingOutputStream countingStream; + try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) { + countingStream = new CountingOutputStream(payload); for (MonitoringDoc monitoringDoc : docs) { - writeDocument(monitoringDoc, payload); + writeDocument(monitoringDoc, countingStream); } - - // store the payload until we flush - this.payload = payload.bytes(); } + payloadLength = countingStream.bytesWritten; + // store the payload until we flush + this.payload = scratch.bytes(); } } catch (Exception e) { throw new ExportException("failed to add documents to export bulk [{}]", e, name); @@ -97,7 +108,8 @@ public void doFlush(ActionListener listener) throws ExportException { request.addParameter(param.getKey(), param.getValue()); } try { - request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON)); + request.setEntity(new InputStreamEntity( + CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON)); } catch (IOException e) { listener.onFailure(e); return; @@ -127,7 +139,7 @@ public void onFailure(Exception exception) { } } - private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException { + private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOException { final XContentType xContentType = XContentType.JSON; final XContent xContent = xContentType.xContent(); @@ -166,4 +178,39 @@ private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOExcepti name, index, id, doc.getType() ); } + + // Counting input stream used to record the uncompressed size of the bulk payload when writing it to a compressed stream + private static final class CountingOutputStream extends FilterOutputStream { + private long bytesWritten = 0; + + CountingOutputStream(final OutputStream out) { + super(out); + } + + @Override + public void write(final int b) throws IOException { + out.write(b); + count(1); + } + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + out.write(b, off, len); + count(len); + } + + @Override + public void close() { + // don't close nested stream + } + + protected void count(final long written) { + if (written != -1) { + bytesWritten += written; + } + } + } }