Skip to content

Commit

Permalink
Use long in Centroid count (#99491)
Browse files Browse the repository at this point in the history
* Use long in Centroid count

Centroids currently use integers to track how many samples their mean
tracks. This can overflow in case the digest tracks billions of samples
or more.

TDigestState already serializes the count as VLong, so it can be read as
VInt without compatibility issues.

Fixes #80153

* Update docs/changelog/99491.yaml

* More test fixes

* Bump TransportVersion

* Revert TransportVersion change
  • Loading branch information
kkrik-es authored Sep 13, 2023
1 parent 4e1fb3f commit 0247cfe
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/99491.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 99491
summary: Use long in Centroid count
area: Aggregations
type: bug
issues:
- 80153
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
final class AVLGroupTree extends AbstractCollection<Centroid> {
/* For insertions into the tree */
private double centroid;
private int count;
private long count;
private double[] centroids;
private int[] counts;
private int[] aggregatedCounts;
private long[] counts;
private long[] aggregatedCounts;
private final IntAVLTree tree;

AVLGroupTree() {
Expand Down Expand Up @@ -78,8 +78,8 @@ protected void fixAggregates(int node) {

};
centroids = new double[tree.capacity()];
counts = new int[tree.capacity()];
aggregatedCounts = new int[tree.capacity()];
counts = new long[tree.capacity()];
aggregatedCounts = new long[tree.capacity()];
}

/**
Expand Down Expand Up @@ -113,14 +113,14 @@ public double mean(int node) {
/**
* Return the count for the provided node.
*/
public int count(int node) {
public long count(int node) {
return counts[node];
}

/**
* Add the provided centroid to the tree.
*/
public void add(double centroid, int count) {
public void add(double centroid, long count) {
this.centroid = centroid;
this.count = count;
tree.add();
Expand All @@ -135,7 +135,7 @@ public boolean add(Centroid centroid) {
/**
* Update values associated with a node, readjusting the tree if necessary.
*/
public void update(int node, double centroid, int count) {
public void update(int node, double centroid, long count) {
// have to do full scale update
this.centroid = centroid;
this.count = count;
Expand Down Expand Up @@ -242,7 +242,7 @@ public void remove() {
/**
* Return the total count of points that have been added to the tree.
*/
public int sum() {
public long sum() {
return aggregatedCounts[tree.root()];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public int centroidCount() {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
checkValue(x);
needsCompression = true;

Expand All @@ -84,7 +84,7 @@ public void add(double x, int w) {
}

if (start == NIL) { // empty summary
assert summary.size() == 0;
assert summary.isEmpty();
summary.add(x, w);
count = w;
} else {
Expand Down Expand Up @@ -127,7 +127,7 @@ public void add(double x, int w) {
// if the nearest point was not unique, then we may not be modifying the first copy
// which means that ordering can change
double centroid = summary.mean(closest);
int count = summary.count(closest);
long count = summary.count(closest);
centroid = weightedAverage(centroid, count, x, w);
count += w;
summary.update(closest, centroid, count);
Expand Down Expand Up @@ -189,7 +189,7 @@ public long size() {
@Override
public double cdf(double x) {
AVLGroupTree values = summary;
if (values.size() == 0) {
if (values.isEmpty()) {
return Double.NaN;
}
if (values.size() == 1) {
Expand Down Expand Up @@ -272,7 +272,7 @@ public double quantile(double q) {
}

AVLGroupTree values = summary;
if (values.size() == 0) {
if (values.isEmpty()) {
// no centroids means no data, no way to get a quantile
return Double.NaN;
} else if (values.size() == 1) {
Expand All @@ -293,7 +293,7 @@ public double quantile(double q) {
}

int currentNode = values.first();
int currentWeight = values.count(currentNode);
long currentWeight = values.count(currentNode);

// Total mass to the left of the center of the current node.
double weightSoFar = currentWeight / 2.0;
Expand All @@ -305,7 +305,7 @@ public double quantile(double q) {

for (int i = 0; i < values.size() - 1; i++) {
int nextNode = values.next(currentNode);
int nextWeight = values.count(nextNode);
long nextWeight = values.count(nextNode);
// this is the mass between current center and next center
double dw = (currentWeight + nextWeight) / 2.0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class Centroid implements Comparable<Centroid> {
private static final AtomicInteger uniqueCount = new AtomicInteger(1);

private double centroid = 0;
private int count = 0;
private long count = 0;

// The ID is transient because it must be unique within a given JVM. A new
// ID should be generated from uniqueCount when a Centroid is deserialized.
Expand All @@ -45,22 +45,22 @@ public Centroid(double x) {
start(x, 1, uniqueCount.getAndIncrement());
}

public Centroid(double x, int w) {
public Centroid(double x, long w) {
this();
start(x, w, uniqueCount.getAndIncrement());
}

public Centroid(double x, int w, int id) {
public Centroid(double x, long w, int id) {
this();
start(x, w, id);
}

private void start(double x, int w, int id) {
private void start(double x, long w, int id) {
this.id = id;
add(x, w);
}

public void add(double x, int w) {
public void add(double x, long w) {
count += w;
centroid += w * (x - centroid) / count;
}
Expand All @@ -69,7 +69,7 @@ public double mean() {
return centroid;
}

public int count() {
public long count() {
return count;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class HybridDigest extends AbstractTDigest {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
reserve(w);
if (mergingDigest != null) {
mergingDigest.add(x, w);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public MergingDigest(double compression, int bufferSize, int size) {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
checkValue(x);
if (tempUsed >= tempWeight.length - lastUsedCell - 1) {
mergeNewValues();
Expand Down Expand Up @@ -514,7 +514,7 @@ public boolean hasNext() {

@Override
public Centroid next() {
Centroid rc = new Centroid(mean[i], (int) weight[i]);
Centroid rc = new Centroid(mean[i], (long) weight[i]);
i++;
return rc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SortingDigest extends AbstractTDigest {
private boolean isSorted = true;

@Override
public void add(double x, int w) {
public void add(double x, long w) {
checkValue(x);
isSorted = isSorted && (values.isEmpty() || values.get(values.size() - 1) <= x);
for (int i = 0; i < w; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static TDigest createHybridDigest(double compression) {
* @param x The value to add.
* @param w The weight of this point.
*/
public abstract void add(double x, int w);
public abstract void add(double x, long w);

/**
* Add a single sample to this TDigest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testSingletonsAtEnds() {
d.add(x);
}
}
int last = 0;
long last = 0;
for (Centroid centroid : d.centroids()) {
if (last == 0) {
assertEquals(1, centroid.count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public EmptyTDigestState() {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
throw new UnsupportedOperationException("Immutable Empty TDigest");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static TDigestState read(StreamInput in) throws IOException {
state.tdigest.reserve(size);
}
for (int i = 0; i < n; i++) {
state.add(in.readDouble(), in.readVInt());
state.add(in.readDouble(), in.readVLong());
}
return state;
}
Expand Down Expand Up @@ -189,7 +189,7 @@ public int hashCode() {
h = 31 * h + Integer.hashCode(centroidCount());
for (Centroid centroid : centroids()) {
h = 31 * h + Double.hashCode(centroid.mean());
h = 31 * h + centroid.count();
h = 31 * h + (int) centroid.count();
}
h = 31 * h + Double.hashCode(getMax());
h = 31 * h + Double.hashCode(getMin());
Expand All @@ -205,7 +205,7 @@ public void add(TDigestState other) {
tdigest.add(other.tdigest);
}

public void add(double x, int w) {
public void add(double x, long w) {
tdigest.add(x, w);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static BinaryDocValuesField histogramFieldDocValues(String fieldName, dou
BytesStreamOutput streamOutput = new BytesStreamOutput();
histogram.compress();
for (Centroid centroid : histogram.centroids()) {
streamOutput.writeVInt(centroid.count());
streamOutput.writeVLong(centroid.count());
streamOutput.writeDouble(centroid.mean());
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private void setupTDigestHistogram(int compression) throws Exception {
client().bulk(bulkRequest);
bulkRequest = new BulkRequest();
List<Double> values = new ArrayList<>();
List<Integer> counts = new ArrayList<>();
List<Long> counts = new ArrayList<>();
Collection<Centroid> centroids = histogram.centroids();
for (Centroid centroid : centroids) {
values.add(centroid.mean());
Expand All @@ -196,7 +196,7 @@ private void setupTDigestHistogram(int compression) throws Exception {
.startObject("inner")
.startObject("data")
.field("values", values.toArray(new Double[values.size()]))
.field("counts", counts.toArray(new Integer[counts.size()]))
.field("counts", counts.toArray(new Long[counts.size()]))
.endObject()
.endObject()
.endObject();
Expand Down

0 comments on commit 0247cfe

Please sign in to comment.