Skip to content

Commit 50bb6d8

Browse files
Read Aggregations to Pooled Buffers (#72309)
These aggregations can be of considerable size. We must not allocate a single `byte[]` of them. Especially nowadays, using G1GC, contiguous allocations of this size are problematic. This commit makes it so that we take the aggregation bytes as a slice out of the network buffers in a 0-copy fashion, cutting the peak memory use for reading them in half effectively for large allocations.
1 parent 5838e4d commit 50bb6d8

File tree

4 files changed

+43
-18
lines changed

4 files changed

+43
-18
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ private MergeResult partialReduce(QuerySearchResult[] toConsume,
182182
aggsList.add(lastMerge.reducedAggs);
183183
}
184184
for (QuerySearchResult result : toConsume) {
185-
aggsList.add(result.consumeAggs().expand());
185+
aggsList.add(result.consumeAggs());
186186
}
187187
newAggs = InternalAggregations.topLevelReduce(aggsList, aggReduceContextBuilder.forPartialReduction());
188188
} else {
@@ -310,6 +310,7 @@ public void consume(QuerySearchResult result, Runnable next) {
310310
try {
311311
addEstimateAndMaybeBreak(aggsSize);
312312
} catch (Exception exc) {
313+
result.releaseAggs();
313314
onMergeFailure(exc);
314315
next.run();
315316
return;
@@ -458,7 +459,7 @@ public synchronized List<InternalAggregations> consumeAggs() {
458459
aggsList.add(mergeResult.reducedAggs);
459460
}
460461
for (QuerySearchResult result : buffer) {
461-
aggsList.add(result.consumeAggs().expand());
462+
aggsList.add(result.consumeAggs());
462463
}
463464
return aggsList;
464465
}

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
package org.elasticsearch.common.io.stream;
1010

1111
import org.elasticsearch.Version;
12-
import org.elasticsearch.common.bytes.BytesReference;
12+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
13+
import org.elasticsearch.common.lease.Releasable;
1314

1415
import java.io.IOException;
1516
import java.io.UncheckedIOException;
@@ -32,7 +33,7 @@
3233
* to force their buffering in serialized format by calling
3334
* {@link #asSerialized(Reader, NamedWriteableRegistry)}.
3435
*/
35-
public abstract class DelayableWriteable<T extends Writeable> implements Writeable {
36+
public abstract class DelayableWriteable<T extends Writeable> implements Writeable, Releasable {
3637
/**
3738
* Build a {@linkplain DelayableWriteable} that wraps an existing object
3839
* but is serialized so that deserializing it can be delayed.
@@ -46,7 +47,7 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(T referenc
4647
* when {@link #expand()} is called.
4748
*/
4849
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
49-
return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readBytesReference());
50+
return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readReleasableBytesReference());
5051
}
5152

5253
private DelayableWriteable() {}
@@ -98,7 +99,8 @@ public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry regis
9899
} catch (IOException e) {
99100
throw new RuntimeException("unexpected error writing writeable to buffer", e);
100101
}
101-
return new Serialized<>(reader, Version.CURRENT, registry, buffer.bytes());
102+
// TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers
103+
return new Serialized<>(reader, Version.CURRENT, registry, ReleasableBytesReference.wrap(buffer.bytes()));
102104
}
103105

104106
@Override
@@ -118,19 +120,25 @@ private BytesStreamOutput writeToBuffer(Version version) throws IOException {
118120
return buffer;
119121
}
120122
}
123+
124+
@Override
125+
public void close() {
126+
//noop
127+
}
121128
}
122129

123130
/**
124-
* A {@link Writeable} stored in serialized form.
131+
* A {@link Writeable} stored in serialized form backed by a {@link ReleasableBytesReference}. Once an instance is no longer used its
132+
* backing memory must be manually released by invoking {@link #close()} on it.
125133
*/
126134
public static class Serialized<T extends Writeable> extends DelayableWriteable<T> {
127135
private final Writeable.Reader<T> reader;
128136
private final Version serializedAtVersion;
129137
private final NamedWriteableRegistry registry;
130-
private final BytesReference serialized;
138+
private final ReleasableBytesReference serialized;
131139

132-
private Serialized(Writeable.Reader<T> reader, Version serializedAtVersion,
133-
NamedWriteableRegistry registry, BytesReference serialized) {
140+
private Serialized(Writeable.Reader<T> reader, Version serializedAtVersion, NamedWriteableRegistry registry,
141+
ReleasableBytesReference serialized) {
134142
this.reader = reader;
135143
this.serializedAtVersion = serializedAtVersion;
136144
this.registry = registry;
@@ -186,6 +194,11 @@ public long getSerializedSize() {
186194
// We're already serialized
187195
return serialized.length();
188196
}
197+
198+
@Override
199+
public void close() {
200+
serialized.close();
201+
}
189202
}
190203

191204
/**

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,23 @@ public boolean hasAggs() {
190190
* Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed.
191191
* @throws IllegalStateException if the aggregations have already been consumed.
192192
*/
193-
public DelayableWriteable<InternalAggregations> consumeAggs() {
193+
public InternalAggregations consumeAggs() {
194194
if (aggregations == null) {
195195
throw new IllegalStateException("aggs already consumed");
196196
}
197-
DelayableWriteable<InternalAggregations> aggs = aggregations;
198-
aggregations = null;
199-
return aggs;
197+
try {
198+
return aggregations.expand();
199+
} finally {
200+
aggregations.close();
201+
aggregations = null;
202+
}
203+
}
204+
205+
public void releaseAggs() {
206+
if (aggregations != null) {
207+
aggregations.close();
208+
aggregations = null;
209+
}
200210
}
201211

202212
public void aggregations(InternalAggregations aggregations) {
@@ -233,8 +243,9 @@ public void consumeAll() {
233243
if (hasConsumedTopDocs() == false) {
234244
consumeTopDocs();
235245
}
236-
if (hasAggs()) {
237-
consumeAggs();
246+
if (aggregations != null) {
247+
aggregations.close();
248+
aggregations = null;
238249
}
239250
}
240251

server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public void testSerialization() throws Exception {
7878
assertEquals(querySearchResult.size(), deserialized.size());
7979
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
8080
if (deserialized.hasAggs()) {
81-
Aggregations aggs = querySearchResult.consumeAggs().expand();
82-
Aggregations deserializedAggs = deserialized.consumeAggs().expand();
81+
Aggregations aggs = querySearchResult.consumeAggs();
82+
Aggregations deserializedAggs = deserialized.consumeAggs();
8383
assertEquals(aggs.asList(), deserializedAggs.asList());
8484
}
8585
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());

0 commit comments

Comments
 (0)