Skip to content

Commit 044404c

Browse files
committed
Merge branch 'upgrade-cluster-endpoints' of https://github.com/ajleong623/OpenSearch into upgrade-cluster-endpoints
2 parents 29c406b + cd2d0d3 commit 044404c

File tree

9 files changed

+421
-93
lines changed

9 files changed

+421
-93
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643))
1919
- Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933))
2020
- Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009))
21+
- Bulk collection logic for metrics and cardinality aggregations ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067))
2122
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
2223
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
2324
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))

server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -134,38 +134,64 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
134134
return new LeafBucketCollectorBase(sub, values) {
135135
@Override
136136
public void collect(int doc, long bucket) throws IOException {
137-
counts = bigArrays.grow(counts, bucket + 1);
138-
sums = bigArrays.grow(sums, bucket + 1);
139-
compensations = bigArrays.grow(compensations, bucket + 1);
140-
141137
if (values.advanceExact(doc)) {
142-
final int valueCount = values.docValueCount();
138+
int valueCount = values.docValueCount();
139+
setKahanSummation(bucket);
143140
counts.increment(bucket, valueCount);
144-
// Compute the sum of double values with Kahan summation algorithm which is more
145-
// accurate than naive summation.
146-
double sum = sums.get(bucket);
147-
double compensation = compensations.get(bucket);
148-
149-
kahanSummation.reset(sum, compensation);
150-
151141
for (int i = 0; i < valueCount; i++) {
152142
double value = values.nextValue();
153143
kahanSummation.add(value);
154144
}
155-
156145
sums.set(bucket, kahanSummation.value());
157146
compensations.set(bucket, kahanSummation.delta());
158147
}
159148
}
160149

161150
@Override
162-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
163-
super.collect(stream, owningBucketOrd);
151+
public void collect(DocIdStream stream, long bucket) throws IOException {
152+
setKahanSummation(bucket);
153+
final int[] count = { 0 };
154+
stream.forEach((doc) -> {
155+
if (values.advanceExact(doc)) {
156+
int valueCount = values.docValueCount();
157+
count[0] += valueCount;
158+
for (int i = 0; i < valueCount; i++) {
159+
kahanSummation.add(values.nextValue());
160+
}
161+
}
162+
});
163+
counts.increment(bucket, count[0]);
164+
sums.set(bucket, kahanSummation.value());
165+
compensations.set(bucket, kahanSummation.delta());
164166
}
165167

166168
@Override
167169
public void collectRange(int min, int max) throws IOException {
168-
super.collectRange(min, max);
170+
setKahanSummation(0);
171+
int count = 0;
172+
for (int docId = min; docId < max; docId++) {
173+
if (values.advanceExact(docId)) {
174+
int valueCount = values.docValueCount();
175+
count += valueCount;
176+
for (int i = 0; i < valueCount; i++) {
177+
kahanSummation.add(values.nextValue());
178+
}
179+
}
180+
}
181+
counts.increment(0, count);
182+
sums.set(0, kahanSummation.value());
183+
compensations.set(0, kahanSummation.delta());
184+
}
185+
186+
private void setKahanSummation(long bucket) {
187+
counts = bigArrays.grow(counts, bucket + 1);
188+
sums = bigArrays.grow(sums, bucket + 1);
189+
compensations = bigArrays.grow(compensations, bucket + 1);
190+
// Compute the sum of double values with Kahan summation algorithm which is more
191+
// accurate than naive summation.
192+
double sum = sums.get(bucket);
193+
double compensation = compensations.get(bucket);
194+
kahanSummation.reset(sum, compensation);
169195
}
170196
};
171197
}

server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -579,12 +579,34 @@ public static long memoryOverhead(long maxOrd) {
579579

580580
@Override
581581
public void collect(int doc, long bucketOrd) throws IOException {
582-
visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1);
583-
BitArray bits = visitedOrds.get(bucketOrd);
582+
collect(doc, getBitArray(bucketOrd));
583+
}
584+
585+
@Override
586+
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
587+
final BitArray bits = getBitArray(owningBucketOrd);
588+
stream.forEach((doc) -> collect(doc, bits));
589+
}
590+
591+
@Override
592+
public void collectRange(int minDoc, int maxDoc) throws IOException {
593+
final BitArray bits = getBitArray(0);
594+
for (int doc = minDoc; doc < maxDoc; ++doc) {
595+
collect(doc, bits);
596+
}
597+
}
598+
599+
private BitArray getBitArray(long bucket) {
600+
visitedOrds = bigArrays.grow(visitedOrds, bucket + 1);
601+
BitArray bits = visitedOrds.get(bucket);
584602
if (bits == null) {
585603
bits = new BitArray(maxOrd, bigArrays);
586-
visitedOrds.set(bucketOrd, bits);
604+
visitedOrds.set(bucket, bits);
587605
}
606+
return bits;
607+
}
608+
609+
private void collect(final int doc, final BitArray bits) throws IOException {
588610
if (values.advanceExact(doc)) {
589611
int count = values.docValueCount();
590612
long ord;
@@ -594,16 +616,6 @@ public void collect(int doc, long bucketOrd) throws IOException {
594616
}
595617
}
596618

597-
@Override
598-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
599-
super.collect(stream, owningBucketOrd);
600-
}
601-
602-
@Override
603-
public void collectRange(int min, int max) throws IOException {
604-
super.collectRange(min, max);
605-
}
606-
607619
@Override
608620
public void postCollect() throws IOException {
609621
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {

server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,26 @@ public CompensatedSum add(double value, double delta) {
112112
return this;
113113
}
114114

115+
/**
116+
* Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors.
117+
*/
118+
public void add(double[] values, int count) {
119+
// If the value is Inf or NaN, just add it to the running tally to "convert" to
120+
// Inf/NaN. This keeps the behavior bwc from before kahan summing
121+
double sum = value;
122+
double c = delta; // Compensation for lost low-order bits
123+
124+
for (int i = 0; i < count; i++) {
125+
double y = values[i] - c;
126+
double t = sum + y;
127+
c = (t - sum) - y; // Calculate the lost part
128+
sum = t;
129+
}
130+
131+
this.value = sum;
132+
this.delta = c;
133+
}
134+
115135
@Override
116136
public boolean equals(Object o) {
117137
if (this == o) return true;

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
157157
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
158158
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
159159
return new LeafBucketCollectorBase(sub, allValues) {
160-
161160
@Override
162161
public void collect(int doc, long bucket) throws IOException {
163-
if (bucket >= maxes.size()) {
164-
long from = maxes.size();
165-
maxes = bigArrays.grow(maxes, bucket + 1);
166-
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
167-
}
162+
growMaxes(bucket);
168163
if (values.advanceExact(doc)) {
169164
final double value = values.doubleValue();
170165
double max = maxes.get(bucket);
@@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException {
174169
}
175170

176171
@Override
177-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
178-
super.collect(stream, owningBucketOrd);
172+
public void collect(DocIdStream stream, long bucket) throws IOException {
173+
growMaxes(bucket);
174+
final double[] max = { maxes.get(bucket) };
175+
stream.forEach((doc) -> {
176+
if (values.advanceExact(doc)) {
177+
max[0] = Math.max(max[0], values.doubleValue());
178+
}
179+
});
180+
maxes.set(bucket, max[0]);
179181
}
180182

181183
@Override
182184
public void collectRange(int min, int max) throws IOException {
183-
super.collectRange(min, max);
185+
growMaxes(0);
186+
double maximum = maxes.get(0);
187+
for (int doc = min; doc < max; doc++) {
188+
if (values.advanceExact(doc)) {
189+
maximum = Math.max(maximum, values.doubleValue());
190+
}
191+
}
192+
maxes.set(0, maximum);
193+
}
194+
195+
private void growMaxes(long bucket) {
196+
if (bucket >= maxes.size()) {
197+
long from = maxes.size();
198+
maxes = bigArrays.grow(maxes, bucket + 1);
199+
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
200+
}
184201
}
185202
};
186203
}

server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
157157
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
158158
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
159159
return new LeafBucketCollectorBase(sub, allValues) {
160-
161160
@Override
162161
public void collect(int doc, long bucket) throws IOException {
163-
if (bucket >= mins.size()) {
164-
long from = mins.size();
165-
mins = bigArrays.grow(mins, bucket + 1);
166-
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
167-
}
162+
growMins(bucket);
168163
if (values.advanceExact(doc)) {
169164
final double value = values.doubleValue();
170165
double min = mins.get(bucket);
@@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException {
174169
}
175170

176171
@Override
177-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
178-
super.collect(stream, owningBucketOrd);
172+
public void collect(DocIdStream stream, long bucket) throws IOException {
173+
growMins(bucket);
174+
final double[] min = { mins.get(bucket) };
175+
stream.forEach((doc) -> {
176+
if (values.advanceExact(doc)) {
177+
min[0] = Math.min(min[0], values.doubleValue());
178+
}
179+
});
180+
mins.set(bucket, min[0]);
179181
}
180182

181183
@Override
182184
public void collectRange(int min, int max) throws IOException {
183-
super.collectRange(min, max);
185+
growMins(0);
186+
double minimum = mins.get(0);
187+
for (int doc = min; doc < max; doc++) {
188+
if (values.advanceExact(doc)) {
189+
minimum = Math.min(minimum, values.doubleValue());
190+
}
191+
}
192+
mins.set(0, minimum);
193+
}
194+
195+
private void growMins(long bucket) {
196+
if (bucket >= mins.size()) {
197+
long from = mins.size();
198+
mins = bigArrays.grow(mins, bucket + 1);
199+
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
200+
}
184201
}
185202
};
186203
}

server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -107,28 +107,13 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
107107
return new LeafBucketCollectorBase(sub, values) {
108108
@Override
109109
public void collect(int doc, long bucket) throws IOException {
110-
if (bucket >= counts.size()) {
111-
final long from = counts.size();
112-
final long overSize = BigArrays.overSize(bucket + 1);
113-
counts = bigArrays.resize(counts, overSize);
114-
sums = bigArrays.resize(sums, overSize);
115-
compensations = bigArrays.resize(compensations, overSize);
116-
mins = bigArrays.resize(mins, overSize);
117-
maxes = bigArrays.resize(maxes, overSize);
118-
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
119-
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
120-
}
110+
growStats(bucket);
121111

122112
if (values.advanceExact(doc)) {
123113
final int valuesCount = values.docValueCount();
124114
counts.increment(bucket, valuesCount);
125115
double min = mins.get(bucket);
126116
double max = maxes.get(bucket);
127-
// Compute the sum of double values with Kahan summation algorithm which is more
128-
// accurate than naive summation.
129-
double sum = sums.get(bucket);
130-
double compensation = compensations.get(bucket);
131-
kahanSummation.reset(sum, compensation);
132117

133118
for (int i = 0; i < valuesCount; i++) {
134119
double value = values.nextValue();
@@ -144,13 +129,73 @@ public void collect(int doc, long bucket) throws IOException {
144129
}
145130

146131
@Override
147-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
148-
super.collect(stream, owningBucketOrd);
132+
public void collect(DocIdStream stream, long bucket) throws IOException {
133+
growStats(bucket);
134+
135+
double[] min = { mins.get(bucket) };
136+
double[] max = { maxes.get(bucket) };
137+
stream.forEach((doc) -> {
138+
if (values.advanceExact(doc)) {
139+
final int valuesCount = values.docValueCount();
140+
counts.increment(bucket, valuesCount);
141+
142+
for (int i = 0; i < valuesCount; i++) {
143+
double value = values.nextValue();
144+
kahanSummation.add(value);
145+
min[0] = Math.min(min[0], value);
146+
max[0] = Math.max(max[0], value);
147+
}
148+
}
149+
});
150+
sums.set(bucket, kahanSummation.value());
151+
compensations.set(bucket, kahanSummation.delta());
152+
mins.set(bucket, min[0]);
153+
maxes.set(bucket, max[0]);
149154
}
150155

151156
@Override
152157
public void collectRange(int min, int max) throws IOException {
153-
super.collectRange(min, max);
158+
growStats(0);
159+
160+
double minimum = mins.get(0);
161+
double maximum = maxes.get(0);
162+
for (int doc = min; doc < maximum; doc++) {
163+
if (values.advanceExact(doc)) {
164+
final int valuesCount = values.docValueCount();
165+
counts.increment(0, valuesCount);
166+
167+
for (int i = 0; i < valuesCount; i++) {
168+
double value = values.nextValue();
169+
kahanSummation.add(value);
170+
minimum = Math.min(minimum, value);
171+
maximum = Math.max(maximum, value);
172+
}
173+
}
174+
}
175+
sums.set(0, kahanSummation.value());
176+
compensations.set(0, kahanSummation.delta());
177+
mins.set(0, minimum);
178+
maxes.set(0, maximum);
179+
}
180+
181+
private void growStats(long bucket) {
182+
if (bucket >= counts.size()) {
183+
final long from = counts.size();
184+
final long overSize = BigArrays.overSize(bucket + 1);
185+
counts = bigArrays.resize(counts, overSize);
186+
sums = bigArrays.resize(sums, overSize);
187+
compensations = bigArrays.resize(compensations, overSize);
188+
mins = bigArrays.resize(mins, overSize);
189+
maxes = bigArrays.resize(maxes, overSize);
190+
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
191+
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
192+
}
193+
194+
// Compute the sum of double values with Kahan summation algorithm which is more
195+
// accurate than naive summation.
196+
double sum = sums.get(bucket);
197+
double compensation = compensations.get(bucket);
198+
kahanSummation.reset(sum, compensation);
154199
}
155200
};
156201
}

0 commit comments

Comments
 (0)