Skip to content

Commit 3a5da00

Browse files
authored
[Composite Terms Aggregation] Optimize by removing unnecessary object allocations (#18531)
* reuse slot objects for lookups Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com> * refactor streams to for loop Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com> --------- Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
1 parent 42a09e1 commit 3a5da00

File tree

3 files changed

+43
-21
lines changed

3 files changed

+43
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4747
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
4848
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
4949
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
50+
- Optimize Composite Aggregations by removing unnecessary object allocations ([#18531](https://github.com/opensearch-project/OpenSearch/pull/18531))
5051

5152
### Changed
5253
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@
9191
import java.util.function.BiConsumer;
9292
import java.util.function.Function;
9393
import java.util.function.LongUnaryOperator;
94-
import java.util.stream.Collectors;
9594

9695
import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
9796
import static org.opensearch.search.aggregations.bucket.filterrewrite.AggregatorBridge.segmentMatchAll;
@@ -135,11 +134,7 @@ public final class CompositeAggregator extends BucketsAggregator {
135134
) throws IOException {
136135
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
137136
this.size = size;
138-
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
139-
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
140-
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
141-
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
142-
this.sources = new SingleDimensionValuesSource[sourceConfigs.length];
137+
143138
// check that the provided size is not greater than the search.max_buckets setting
144139
int bucketLimit = context.aggregations().multiBucketConsumer().getLimit();
145140
if (size > bucketLimit) {
@@ -155,15 +150,33 @@ public final class CompositeAggregator extends BucketsAggregator {
155150
bucketLimit
156151
);
157152
}
153+
158154
this.sourceConfigs = sourceConfigs;
159-
for (int i = 0; i < sourceConfigs.length; i++) {
160-
this.sources[i] = sourceConfigs[i].createValuesSource(
155+
156+
// Pre-initialize the destination collections with the correct size
157+
final int numSources = sourceConfigs.length;
158+
this.sourceNames = new ArrayList<>(numSources);
159+
this.reverseMuls = new int[numSources];
160+
this.missingOrders = new MissingOrder[numSources];
161+
this.formats = new ArrayList<>(numSources);
162+
this.sources = new SingleDimensionValuesSource[numSources];
163+
164+
// Populate all collections from sourceConfigs
165+
for (int i = 0; i < numSources; i++) {
166+
CompositeValuesSourceConfig sourceConfig = sourceConfigs[i];
167+
this.sourceNames.add(sourceConfig.name());
168+
this.reverseMuls[i] = sourceConfig.reverseMul();
169+
this.missingOrders[i] = sourceConfig.missingOrder();
170+
this.formats.add(sourceConfig.format());
171+
172+
this.sources[i] = sourceConfig.createValuesSource(
161173
context.bigArrays(),
162174
context.searcher().getIndexReader(),
163175
size,
164176
this::addRequestCircuitBreakerBytes
165177
);
166178
}
179+
167180
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
168181
this.rawAfterKey = rawAfterKey;
169182

@@ -230,7 +243,7 @@ protected int getSize() {
230243

231244
@Override
232245
protected Function<Long, Long> bucketOrdProducer() {
233-
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
246+
return (key) -> bucketOrds.add(0, getRoundingPrepared().round(key));
234247
}
235248
};
236249
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
@@ -247,14 +260,14 @@ protected void doClose() {
247260
}
248261

249262
@Override
250-
protected void doPreCollection() throws IOException {
263+
protected void doPreCollection() {
251264
List<BucketCollector> collectors = Arrays.asList(subAggregators);
252265
deferredCollectors = MultiBucketCollector.wrap(collectors);
253266
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
254267
}
255268

256269
@Override
257-
protected void doPostCollection() throws IOException {
270+
protected void doPostCollection() {
258271
finishLeaf();
259272
}
260273

@@ -719,14 +732,7 @@ public void collect(int doc, long zeroBucket) throws IOException {
719732
*
720733
* @opensearch.internal
721734
*/
722-
private static class Entry {
723-
final LeafReaderContext context;
724-
final DocIdSet docIdSet;
725-
726-
Entry(LeafReaderContext context, DocIdSet docIdSet) {
727-
this.context = context;
728-
this.docIdSet = docIdSet;
729-
}
735+
private record Entry(LeafReaderContext context, DocIdSet docIdSet) {
730736
}
731737

732738
@Override

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ private class Slot {
6060
this.value = initial;
6161
}
6262

63+
// This is to be only for reusable slot
64+
public void set(int newValue) {
65+
this.value = newValue;
66+
}
67+
6368
@Override
6469
public boolean equals(Object o) {
6570
if (this == o) return true;
@@ -82,6 +87,14 @@ public int hashCode() {
8287
private final Map<Slot, Integer> map; // to quickly find the slot for a value
8388
private final SingleDimensionValuesSource<?>[] arrays;
8489

90+
/**
91+
* A reusable, flyweight Slot instance to avoid object allocation and reduce GC pressure
92+
* during map lookups in the high-frequency collect() path. This object is NOT
93+
* thread-safe, but is safe here because each collector instance is confined to a
94+
* single thread.
95+
*/
96+
private final Slot reusableSlot = new Slot(0);
97+
8598
private LongArray docCounts;
8699
private boolean afterKeyIsSet = false;
87100

@@ -125,7 +138,8 @@ boolean isFull() {
125138
* the slot if the candidate is already in the queue or null if the candidate is not present.
126139
*/
127140
Integer getCurrentSlot() {
128-
return map.get(new Slot(CANDIDATE_SLOT));
141+
reusableSlot.set(CANDIDATE_SLOT); // Update the state of the reusable slot
142+
return map.get(reusableSlot); // Use the single reusable slot instance for the lookup
129143
}
130144

131145
/**
@@ -322,7 +336,8 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
322336
if (size() >= maxSize) {
323337
// the queue is full, we replace the last key with this candidate
324338
int slot = pop();
325-
map.remove(new Slot(slot));
339+
reusableSlot.set(slot); // Use reusable for remove
340+
map.remove(reusableSlot);
326341
// and we recycle the deleted slot
327342
newSlot = slot;
328343
} else {

0 commit comments

Comments
 (0)