Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643))
- Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933))
- Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009))
- Bulk collection logic for metrics and cardinality aggregations ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067))
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,64 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valueCount = values.docValueCount();
int valueCount = values.docValueCount();
setKahanSummation(bucket);
counts.increment(bucket, valueCount);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);

kahanSummation.reset(sum, compensation);

for (int i = 0; i < valueCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
}

sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
public void collect(DocIdStream stream, long bucket) throws IOException {
setKahanSummation(bucket);
final int[] count = { 0 };
stream.forEach((doc) -> {
if (values.advanceExact(doc)) {
int valueCount = values.docValueCount();
count[0] += valueCount;
for (int i = 0; i < valueCount; i++) {
kahanSummation.add(values.nextValue());
}
}
});
counts.increment(bucket, count[0]);
sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}

@Override
public void collectRange(int min, int max) throws IOException {
super.collectRange(min, max);
setKahanSummation(0);
int count = 0;
for (int docId = min; docId < max; docId++) {
if (values.advanceExact(docId)) {
int valueCount = values.docValueCount();
count += valueCount;
for (int i = 0; i < valueCount; i++) {
kahanSummation.add(values.nextValue());
}
}
}
counts.increment(0, count);
sums.set(0, kahanSummation.value());
compensations.set(0, kahanSummation.delta());
}

private void setKahanSummation(long bucket) {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,34 @@ public static long memoryOverhead(long maxOrd) {

@Override
public void collect(int doc, long bucketOrd) throws IOException {
visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1);
BitArray bits = visitedOrds.get(bucketOrd);
collect(doc, getBitArray(bucketOrd));
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
final BitArray bits = getBitArray(owningBucketOrd);
stream.forEach((doc) -> collect(doc, bits));
}

@Override
public void collectRange(int minDoc, int maxDoc) throws IOException {
final BitArray bits = getBitArray(0);
for (int doc = minDoc; doc < maxDoc; ++doc) {
collect(doc, bits);
}
}

private BitArray getBitArray(long bucket) {
visitedOrds = bigArrays.grow(visitedOrds, bucket + 1);
BitArray bits = visitedOrds.get(bucket);
if (bits == null) {
bits = new BitArray(maxOrd, bigArrays);
visitedOrds.set(bucketOrd, bits);
visitedOrds.set(bucket, bits);
}
return bits;
}

private void collect(final int doc, final BitArray bits) throws IOException {
if (values.advanceExact(doc)) {
int count = values.docValueCount();
long ord;
Expand All @@ -594,16 +616,6 @@ public void collect(int doc, long bucketOrd) throws IOException {
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
}

@Override
public void collectRange(int min, int max) throws IOException {
super.collectRange(min, max);
}

@Override
public void postCollect() throws IOException {
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ public CompensatedSum add(double value, double delta) {
return this;
}

/**
* Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors.
*/
public void add(double[] values, int count) {
// If the value is Inf or NaN, just add it to the running tally to "convert" to
// Inf/NaN. This keeps the behavior bwc from before kahan summing
double sum = value;
double c = delta; // Compensation for lost low-order bits

for (int i = 0; i < count; i++) {
double y = values[i] - c;
double t = sum + y;
c = (t - sum) - y; // Calculate the lost part
sum = t;
}

this.value = sum;
this.delta = c;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= maxes.size()) {
long from = maxes.size();
maxes = bigArrays.grow(maxes, bucket + 1);
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
}
growMaxes(bucket);
if (values.advanceExact(doc)) {
final double value = values.doubleValue();
double max = maxes.get(bucket);
Expand All @@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
public void collect(DocIdStream stream, long bucket) throws IOException {
growMaxes(bucket);
final double[] max = { maxes.get(bucket) };
stream.forEach((doc) -> {
if (values.advanceExact(doc)) {
max[0] = Math.max(max[0], values.doubleValue());
}
});
maxes.set(bucket, max[0]);
}

@Override
public void collectRange(int min, int max) throws IOException {
super.collectRange(min, max);
growMaxes(0);
double maximum = maxes.get(0);
for (int doc = min; doc < max; doc++) {
if (values.advanceExact(doc)) {
maximum = Math.max(maximum, values.doubleValue());
}
}
maxes.set(0, maximum);
}

private void growMaxes(long bucket) {
if (bucket >= maxes.size()) {
long from = maxes.size();
maxes = bigArrays.grow(maxes, bucket + 1);
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= mins.size()) {
long from = mins.size();
mins = bigArrays.grow(mins, bucket + 1);
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
}
growMins(bucket);
if (values.advanceExact(doc)) {
final double value = values.doubleValue();
double min = mins.get(bucket);
Expand All @@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
public void collect(DocIdStream stream, long bucket) throws IOException {
growMins(bucket);
final double[] min = { mins.get(bucket) };
stream.forEach((doc) -> {
if (values.advanceExact(doc)) {
min[0] = Math.min(min[0], values.doubleValue());
}
});
mins.set(bucket, min[0]);
}

@Override
public void collectRange(int min, int max) throws IOException {
super.collectRange(min, max);
growMins(0);
double minimum = mins.get(0);
for (int doc = min; doc < max; doc++) {
if (values.advanceExact(doc)) {
minimum = Math.min(minimum, values.doubleValue());
}
}
mins.set(0, minimum);
}

private void growMins(long bucket) {
if (bucket >= mins.size()) {
long from = mins.size();
mins = bigArrays.grow(mins, bucket + 1);
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,13 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= counts.size()) {
final long from = counts.size();
final long overSize = BigArrays.overSize(bucket + 1);
counts = bigArrays.resize(counts, overSize);
sums = bigArrays.resize(sums, overSize);
compensations = bigArrays.resize(compensations, overSize);
mins = bigArrays.resize(mins, overSize);
maxes = bigArrays.resize(maxes, overSize);
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
}
growStats(bucket);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
counts.increment(bucket, valuesCount);
double min = mins.get(bucket);
double max = maxes.get(bucket);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
Expand All @@ -144,13 +129,73 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
public void collect(DocIdStream stream, long bucket) throws IOException {
growStats(bucket);

double[] min = { mins.get(bucket) };
double[] max = { maxes.get(bucket) };
stream.forEach((doc) -> {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
counts.increment(bucket, valuesCount);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
min[0] = Math.min(min[0], value);
max[0] = Math.max(max[0], value);
}
}
});
sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
mins.set(bucket, min[0]);
maxes.set(bucket, max[0]);
}

@Override
public void collectRange(int min, int max) throws IOException {
super.collectRange(min, max);
growStats(0);

double minimum = mins.get(0);
double maximum = maxes.get(0);
for (int doc = min; doc < maximum; doc++) {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
counts.increment(0, valuesCount);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
minimum = Math.min(minimum, value);
maximum = Math.max(maximum, value);
}
}
}
sums.set(0, kahanSummation.value());
compensations.set(0, kahanSummation.delta());
mins.set(0, minimum);
maxes.set(0, maximum);
}

private void growStats(long bucket) {
if (bucket >= counts.size()) {
final long from = counts.size();
final long overSize = BigArrays.overSize(bucket + 1);
counts = bigArrays.resize(counts, overSize);
sums = bigArrays.resize(sums, overSize);
compensations = bigArrays.resize(compensations, overSize);
mins = bigArrays.resize(mins, overSize);
maxes = bigArrays.resize(maxes, overSize);
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
}

// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
}
};
}
Expand Down
Loading
Loading