Skip to content

Commit 56d6a22

Browse files
authored
Merge branch 'main' into dependabot/gradle/plugins/ingest-attachment/org.tukaani-xz-1.11
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
2 parents fc45024 + 94fdafc commit 56d6a22

File tree

10 files changed

+423
-94
lines changed

10 files changed

+423
-94
lines changed

.github/workflows/detect-breaking-change.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
- if: failure()
2121
run: cat server/build/reports/java-compatibility/report.txt
2222
- if: failure()
23-
uses: actions/upload-artifact@v4
23+
uses: actions/upload-artifact@v5
2424
with:
2525
name: java-compatibility-report.html
2626
path: ${{ github.workspace }}/server/build/reports/java-compatibility/report.html

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643))
1919
- Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933))
2020
- Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009))
21+
- Bulk collection logic for metrics and cardinality aggregations ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067))
2122
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
2223
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
2324
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
@@ -120,6 +121,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
120121
- Bump `commons-cli:commons-cli` from 1.10.0 to 1.11.0 ([#20022](https://github.com/opensearch-project/OpenSearch/pull/20022))
121122
- Bump `com.squareup.okio:okio` from 3.16.0 to 3.16.3 ([#20025](https://github.com/opensearch-project/OpenSearch/pull/20025))
122123
- Bump `org.tukaani:xz` from 1.10 to 1.11 ([#20082](https://github.com/opensearch-project/OpenSearch/pull/20082))
124+
- Bump `actions/upload-artifact` from 4 to 5 ([#20081](https://github.com/opensearch-project/OpenSearch/pull/20081))
123125

124126
### Deprecated
125127
- Deprecated existing constructors in ThreadPoolStats.Stats in favor of the new Builder ([#19317](https://github.com/opensearch-project/OpenSearch/pull/19317))

server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -134,38 +134,64 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
134134
return new LeafBucketCollectorBase(sub, values) {
135135
@Override
136136
public void collect(int doc, long bucket) throws IOException {
137-
counts = bigArrays.grow(counts, bucket + 1);
138-
sums = bigArrays.grow(sums, bucket + 1);
139-
compensations = bigArrays.grow(compensations, bucket + 1);
140-
141137
if (values.advanceExact(doc)) {
142-
final int valueCount = values.docValueCount();
138+
int valueCount = values.docValueCount();
139+
setKahanSummation(bucket);
143140
counts.increment(bucket, valueCount);
144-
// Compute the sum of double values with Kahan summation algorithm which is more
145-
// accurate than naive summation.
146-
double sum = sums.get(bucket);
147-
double compensation = compensations.get(bucket);
148-
149-
kahanSummation.reset(sum, compensation);
150-
151141
for (int i = 0; i < valueCount; i++) {
152142
double value = values.nextValue();
153143
kahanSummation.add(value);
154144
}
155-
156145
sums.set(bucket, kahanSummation.value());
157146
compensations.set(bucket, kahanSummation.delta());
158147
}
159148
}
160149

161150
@Override
162-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
163-
super.collect(stream, owningBucketOrd);
151+
public void collect(DocIdStream stream, long bucket) throws IOException {
152+
setKahanSummation(bucket);
153+
final int[] count = { 0 };
154+
stream.forEach((doc) -> {
155+
if (values.advanceExact(doc)) {
156+
int valueCount = values.docValueCount();
157+
count[0] += valueCount;
158+
for (int i = 0; i < valueCount; i++) {
159+
kahanSummation.add(values.nextValue());
160+
}
161+
}
162+
});
163+
counts.increment(bucket, count[0]);
164+
sums.set(bucket, kahanSummation.value());
165+
compensations.set(bucket, kahanSummation.delta());
164166
}
165167

166168
@Override
167169
public void collectRange(int min, int max) throws IOException {
168-
super.collectRange(min, max);
170+
setKahanSummation(0);
171+
int count = 0;
172+
for (int docId = min; docId < max; docId++) {
173+
if (values.advanceExact(docId)) {
174+
int valueCount = values.docValueCount();
175+
count += valueCount;
176+
for (int i = 0; i < valueCount; i++) {
177+
kahanSummation.add(values.nextValue());
178+
}
179+
}
180+
}
181+
counts.increment(0, count);
182+
sums.set(0, kahanSummation.value());
183+
compensations.set(0, kahanSummation.delta());
184+
}
185+
186+
private void setKahanSummation(long bucket) {
187+
counts = bigArrays.grow(counts, bucket + 1);
188+
sums = bigArrays.grow(sums, bucket + 1);
189+
compensations = bigArrays.grow(compensations, bucket + 1);
190+
// Compute the sum of double values with Kahan summation algorithm which is more
191+
// accurate than naive summation.
192+
double sum = sums.get(bucket);
193+
double compensation = compensations.get(bucket);
194+
kahanSummation.reset(sum, compensation);
169195
}
170196
};
171197
}

server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -579,12 +579,34 @@ public static long memoryOverhead(long maxOrd) {
579579

580580
@Override
581581
public void collect(int doc, long bucketOrd) throws IOException {
582-
visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1);
583-
BitArray bits = visitedOrds.get(bucketOrd);
582+
collect(doc, getBitArray(bucketOrd));
583+
}
584+
585+
@Override
586+
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
587+
final BitArray bits = getBitArray(owningBucketOrd);
588+
stream.forEach((doc) -> collect(doc, bits));
589+
}
590+
591+
@Override
592+
public void collectRange(int minDoc, int maxDoc) throws IOException {
593+
final BitArray bits = getBitArray(0);
594+
for (int doc = minDoc; doc < maxDoc; ++doc) {
595+
collect(doc, bits);
596+
}
597+
}
598+
599+
private BitArray getBitArray(long bucket) {
600+
visitedOrds = bigArrays.grow(visitedOrds, bucket + 1);
601+
BitArray bits = visitedOrds.get(bucket);
584602
if (bits == null) {
585603
bits = new BitArray(maxOrd, bigArrays);
586-
visitedOrds.set(bucketOrd, bits);
604+
visitedOrds.set(bucket, bits);
587605
}
606+
return bits;
607+
}
608+
609+
private void collect(final int doc, final BitArray bits) throws IOException {
588610
if (values.advanceExact(doc)) {
589611
int count = values.docValueCount();
590612
long ord;
@@ -594,16 +616,6 @@ public void collect(int doc, long bucketOrd) throws IOException {
594616
}
595617
}
596618

597-
@Override
598-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
599-
super.collect(stream, owningBucketOrd);
600-
}
601-
602-
@Override
603-
public void collectRange(int min, int max) throws IOException {
604-
super.collectRange(min, max);
605-
}
606-
607619
@Override
608620
public void postCollect() throws IOException {
609621
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {

server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,26 @@ public CompensatedSum add(double value, double delta) {
112112
return this;
113113
}
114114

115+
/**
116+
* Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors.
117+
*/
118+
public void add(double[] values, int count) {
119+
// If the value is Inf or NaN, just add it to the running tally to "convert" to
120+
// Inf/NaN. This keeps the behavior bwc from before kahan summing
121+
double sum = value;
122+
double c = delta; // Compensation for lost low-order bits
123+
124+
for (int i = 0; i < count; i++) {
125+
double y = values[i] - c;
126+
double t = sum + y;
127+
c = (t - sum) - y; // Calculate the lost part
128+
sum = t;
129+
}
130+
131+
this.value = sum;
132+
this.delta = c;
133+
}
134+
115135
@Override
116136
public boolean equals(Object o) {
117137
if (this == o) return true;

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
157157
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
158158
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
159159
return new LeafBucketCollectorBase(sub, allValues) {
160-
161160
@Override
162161
public void collect(int doc, long bucket) throws IOException {
163-
if (bucket >= maxes.size()) {
164-
long from = maxes.size();
165-
maxes = bigArrays.grow(maxes, bucket + 1);
166-
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
167-
}
162+
growMaxes(bucket);
168163
if (values.advanceExact(doc)) {
169164
final double value = values.doubleValue();
170165
double max = maxes.get(bucket);
@@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException {
174169
}
175170

176171
@Override
177-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
178-
super.collect(stream, owningBucketOrd);
172+
public void collect(DocIdStream stream, long bucket) throws IOException {
173+
growMaxes(bucket);
174+
final double[] max = { maxes.get(bucket) };
175+
stream.forEach((doc) -> {
176+
if (values.advanceExact(doc)) {
177+
max[0] = Math.max(max[0], values.doubleValue());
178+
}
179+
});
180+
maxes.set(bucket, max[0]);
179181
}
180182

181183
@Override
182184
public void collectRange(int min, int max) throws IOException {
183-
super.collectRange(min, max);
185+
growMaxes(0);
186+
double maximum = maxes.get(0);
187+
for (int doc = min; doc < max; doc++) {
188+
if (values.advanceExact(doc)) {
189+
maximum = Math.max(maximum, values.doubleValue());
190+
}
191+
}
192+
maxes.set(0, maximum);
193+
}
194+
195+
private void growMaxes(long bucket) {
196+
if (bucket >= maxes.size()) {
197+
long from = maxes.size();
198+
maxes = bigArrays.grow(maxes, bucket + 1);
199+
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
200+
}
184201
}
185202
};
186203
}

server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
157157
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
158158
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
159159
return new LeafBucketCollectorBase(sub, allValues) {
160-
161160
@Override
162161
public void collect(int doc, long bucket) throws IOException {
163-
if (bucket >= mins.size()) {
164-
long from = mins.size();
165-
mins = bigArrays.grow(mins, bucket + 1);
166-
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
167-
}
162+
growMins(bucket);
168163
if (values.advanceExact(doc)) {
169164
final double value = values.doubleValue();
170165
double min = mins.get(bucket);
@@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException {
174169
}
175170

176171
@Override
177-
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
178-
super.collect(stream, owningBucketOrd);
172+
public void collect(DocIdStream stream, long bucket) throws IOException {
173+
growMins(bucket);
174+
final double[] min = { mins.get(bucket) };
175+
stream.forEach((doc) -> {
176+
if (values.advanceExact(doc)) {
177+
min[0] = Math.min(min[0], values.doubleValue());
178+
}
179+
});
180+
mins.set(bucket, min[0]);
179181
}
180182

181183
@Override
182184
public void collectRange(int min, int max) throws IOException {
183-
super.collectRange(min, max);
185+
growMins(0);
186+
double minimum = mins.get(0);
187+
for (int doc = min; doc < max; doc++) {
188+
if (values.advanceExact(doc)) {
189+
minimum = Math.min(minimum, values.doubleValue());
190+
}
191+
}
192+
mins.set(0, minimum);
193+
}
194+
195+
private void growMins(long bucket) {
196+
if (bucket >= mins.size()) {
197+
long from = mins.size();
198+
mins = bigArrays.grow(mins, bucket + 1);
199+
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
200+
}
184201
}
185202
};
186203
}

0 commit comments

Comments
 (0)