Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use long in Centroid count #99491

Merged
merged 7 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/99491.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 99491
summary: Use long in Centroid count
area: Aggregations
type: bug
issues:
- 80153
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
final class AVLGroupTree extends AbstractCollection<Centroid> {
/* For insertions into the tree */
private double centroid;
private int count;
private long count;
private double[] centroids;
private int[] counts;
private int[] aggregatedCounts;
private long[] counts;
private long[] aggregatedCounts;
private final IntAVLTree tree;

AVLGroupTree() {
Expand Down Expand Up @@ -78,8 +78,8 @@ protected void fixAggregates(int node) {

};
centroids = new double[tree.capacity()];
counts = new int[tree.capacity()];
aggregatedCounts = new int[tree.capacity()];
counts = new long[tree.capacity()];
aggregatedCounts = new long[tree.capacity()];
}

/**
Expand Down Expand Up @@ -113,14 +113,14 @@ public double mean(int node) {
/**
* Return the count for the provided node.
*/
public int count(int node) {
public long count(int node) {
return counts[node];
}

/**
* Add the provided centroid to the tree.
*/
public void add(double centroid, int count) {
public void add(double centroid, long count) {
this.centroid = centroid;
this.count = count;
tree.add();
Expand All @@ -135,7 +135,7 @@ public boolean add(Centroid centroid) {
/**
* Update values associated with a node, readjusting the tree if necessary.
*/
public void update(int node, double centroid, int count) {
public void update(int node, double centroid, long count) {
// have to do full scale update
this.centroid = centroid;
this.count = count;
Expand Down Expand Up @@ -242,7 +242,7 @@ public void remove() {
/**
* Return the total count of points that have been added to the tree.
*/
public int sum() {
public long sum() {
return aggregatedCounts[tree.root()];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public int centroidCount() {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
checkValue(x);
needsCompression = true;

Expand All @@ -84,7 +84,7 @@ public void add(double x, int w) {
}

if (start == NIL) { // empty summary
assert summary.size() == 0;
assert summary.isEmpty();
summary.add(x, w);
count = w;
} else {
Expand Down Expand Up @@ -127,7 +127,7 @@ public void add(double x, int w) {
// if the nearest point was not unique, then we may not be modifying the first copy
// which means that ordering can change
double centroid = summary.mean(closest);
int count = summary.count(closest);
long count = summary.count(closest);
centroid = weightedAverage(centroid, count, x, w);
count += w;
summary.update(closest, centroid, count);
Expand Down Expand Up @@ -189,7 +189,7 @@ public long size() {
@Override
public double cdf(double x) {
AVLGroupTree values = summary;
if (values.size() == 0) {
if (values.isEmpty()) {
return Double.NaN;
}
if (values.size() == 1) {
Expand Down Expand Up @@ -272,7 +272,7 @@ public double quantile(double q) {
}

AVLGroupTree values = summary;
if (values.size() == 0) {
if (values.isEmpty()) {
// no centroids means no data, no way to get a quantile
return Double.NaN;
} else if (values.size() == 1) {
Expand All @@ -293,7 +293,7 @@ public double quantile(double q) {
}

int currentNode = values.first();
int currentWeight = values.count(currentNode);
long currentWeight = values.count(currentNode);

// Total mass to the left of the center of the current node.
double weightSoFar = currentWeight / 2.0;
Expand All @@ -305,7 +305,7 @@ public double quantile(double q) {

for (int i = 0; i < values.size() - 1; i++) {
int nextNode = values.next(currentNode);
int nextWeight = values.count(nextNode);
long nextWeight = values.count(nextNode);
// this is the mass between current center and next center
double dw = (currentWeight + nextWeight) / 2.0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class Centroid implements Comparable<Centroid> {
private static final AtomicInteger uniqueCount = new AtomicInteger(1);

private double centroid = 0;
private int count = 0;
private long count = 0;

// The ID is transient because it must be unique within a given JVM. A new
// ID should be generated from uniqueCount when a Centroid is deserialized.
Expand All @@ -45,22 +45,22 @@ public Centroid(double x) {
start(x, 1, uniqueCount.getAndIncrement());
}

public Centroid(double x, int w) {
public Centroid(double x, long w) {
this();
start(x, w, uniqueCount.getAndIncrement());
}

public Centroid(double x, int w, int id) {
public Centroid(double x, long w, int id) {
this();
start(x, w, id);
}

private void start(double x, int w, int id) {
private void start(double x, long w, int id) {
this.id = id;
add(x, w);
}

public void add(double x, int w) {
public void add(double x, long w) {
count += w;
centroid += w * (x - centroid) / count;
}
Expand All @@ -69,7 +69,7 @@ public double mean() {
return centroid;
}

public int count() {
public long count() {
return count;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class HybridDigest extends AbstractTDigest {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
reserve(w);
if (mergingDigest != null) {
mergingDigest.add(x, w);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public MergingDigest(double compression, int bufferSize, int size) {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
checkValue(x);
if (tempUsed >= tempWeight.length - lastUsedCell - 1) {
mergeNewValues();
Expand Down Expand Up @@ -514,7 +514,7 @@ public boolean hasNext() {

@Override
public Centroid next() {
Centroid rc = new Centroid(mean[i], (int) weight[i]);
Centroid rc = new Centroid(mean[i], (long) weight[i]);
i++;
return rc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SortingDigest extends AbstractTDigest {
private boolean isSorted = true;

@Override
public void add(double x, int w) {
public void add(double x, long w) {
checkValue(x);
isSorted = isSorted && (values.isEmpty() || values.get(values.size() - 1) <= x);
for (int i = 0; i < w; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static TDigest createHybridDigest(double compression) {
* @param x The value to add.
* @param w The weight of this point.
*/
public abstract void add(double x, int w);
public abstract void add(double x, long w);

/**
* Add a single sample to this TDigest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testSingletonsAtEnds() {
d.add(x);
}
}
int last = 0;
long last = 0;
for (Centroid centroid : d.centroids()) {
if (last == 0) {
assertEquals(1, centroid.count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public EmptyTDigestState() {
}

@Override
public void add(double x, int w) {
public void add(double x, long w) {
throw new UnsupportedOperationException("Immutable Empty TDigest");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static TDigestState read(StreamInput in) throws IOException {
state.tdigest.reserve(size);
}
for (int i = 0; i < n; i++) {
state.add(in.readDouble(), in.readVInt());
state.add(in.readDouble(), in.readVLong());
}
return state;
}
Expand Down Expand Up @@ -189,7 +189,7 @@ public int hashCode() {
h = 31 * h + Integer.hashCode(centroidCount());
for (Centroid centroid : centroids()) {
h = 31 * h + Double.hashCode(centroid.mean());
h = 31 * h + centroid.count();
h = 31 * h + (int) centroid.count();
}
h = 31 * h + Double.hashCode(getMax());
h = 31 * h + Double.hashCode(getMin());
Expand All @@ -205,7 +205,7 @@ public void add(TDigestState other) {
tdigest.add(other.tdigest);
}

public void add(double x, int w) {
public void add(double x, long w) {
tdigest.add(x, w);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static BinaryDocValuesField histogramFieldDocValues(String fieldName, dou
BytesStreamOutput streamOutput = new BytesStreamOutput();
histogram.compress();
for (Centroid centroid : histogram.centroids()) {
streamOutput.writeVInt(centroid.count());
streamOutput.writeVLong(centroid.count());
streamOutput.writeDouble(centroid.mean());
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private void setupTDigestHistogram(int compression) throws Exception {
client().bulk(bulkRequest);
bulkRequest = new BulkRequest();
List<Double> values = new ArrayList<>();
List<Integer> counts = new ArrayList<>();
List<Long> counts = new ArrayList<>();
Collection<Centroid> centroids = histogram.centroids();
for (Centroid centroid : centroids) {
values.add(centroid.mean());
Expand All @@ -196,7 +196,7 @@ private void setupTDigestHistogram(int compression) throws Exception {
.startObject("inner")
.startObject("data")
.field("values", values.toArray(new Double[values.size()]))
.field("counts", counts.toArray(new Integer[counts.size()]))
.field("counts", counts.toArray(new Long[counts.size()]))
.endObject()
.endObject()
.endObject();
Expand Down