Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add value_count mode to rate agg #63687

Merged
merged 1 commit into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions docs/reference/aggregations/metrics/rate-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ be automatically calculated by multiplying monthly rate by 12.
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

Instead of counting the number of documents, it is also possible to calculate a sum of all values of the fields in the documents in each
bucket. The following request will group all sales records into monthly bucket and than calculate the total monthly sales and convert them
into average daily sales.
bucket or the number of values in each bucket. The following request will group all sales records into monthly bucket and than calculate
the total monthly sales and convert them into average daily sales.

[source,console]
--------------------------------------------------
Expand Down Expand Up @@ -164,6 +164,84 @@ The response will contain the average daily sale prices for each month.
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

By adding the `mode` parameter with the value `value_count`, we can change the calculation from `sum` to the number of values of the field:

[source,console]
--------------------------------------------------
GET sales/_search
{
"size": 0,
"aggs": {
"by_date": {
"date_histogram": {
"field": "date",
"calendar_interval": "month" <1>
},
"aggs": {
"avg_number_of_sales_per_year": {
"rate": {
"field": "price", <2>
"unit": "year", <3>
"mode": "value_count" <4>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> Histogram is grouped by month.
<2> Calculate number of of all sale prices
<3> Convert to annual counts
<4> Changing the mode to value count

The response will contain the average daily sale prices for each month.

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"by_date" : {
"buckets" : [
{
"key_as_string" : "2015/01/01 00:00:00",
"key" : 1420070400000,
"doc_count" : 3,
"avg_number_of_sales_per_year" : {
"value" : 36.0
}
},
{
"key_as_string" : "2015/02/01 00:00:00",
"key" : 1422748800000,
"doc_count" : 2,
"avg_number_of_sales_per_year" : {
"value" : 24.0
}
},
{
"key_as_string" : "2015/03/01 00:00:00",
"key" : 1425168000000,
"doc_count" : 2,
"avg_number_of_sales_per_year" : {
"value" : 24.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

By default `sum` mode is used.

`"mode": "sum"`:: calculate the sum of all values field
`"mode": "value_count"`:: use the number of values in the field

The `mode` parameter can only be used with fields and scripts.

==== Relationship between bucket sizes and rate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si
protected final ValuesSource valuesSource;
private final DocValueFormat format;
private final Rounding.DateTimeUnit rateUnit;
protected final RateMode rateMode;
private final SizedBucketAggregator sizedBucketAggregator;

protected DoubleArray sums;
Expand All @@ -35,6 +36,7 @@ public AbstractRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Expand All @@ -45,8 +47,12 @@ public AbstractRateAggregator(
if (valuesSource != null) {
sums = context.bigArrays().newDoubleArray(1, true);
compensations = context.bigArrays().newDoubleArray(1, true);
if (rateMode == null) {
rateMode = RateMode.SUM;
}
}
this.rateUnit = rateUnit;
this.rateMode = rateMode;
this.sizedBucketAggregator = findSizedBucketAncestor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ public HistogramRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
}

@Override
Expand All @@ -51,7 +52,18 @@ public void collect(int doc, long bucket) throws IOException {
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
kahanSummation.add(sketch.value());
final double value;
switch (rateMode) {
case SUM:
value = sketch.value();
break;
case VALUE_COUNT:
value = sketch.count();
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}
kahanSummation.add(value);
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ public NumericRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
}

@Override
Expand All @@ -51,10 +52,17 @@ public void collect(int doc, long bucket) throws IOException {
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
kahanSummation.add(values.nextValue());
}
break;
case VALUE_COUNT:
kahanSummation.add(valuesCount);
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

compensations.set(bucket, kahanSummation.delta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
*/
package org.elasticsearch.xpack.analytics.rate;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -24,13 +29,10 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, RateAggregationBuilder> {
public static final String NAME = "rate";
public static final ParseField UNIT_FIELD = new ParseField("unit");
public static final ParseField MODE_FIELD = new ParseField("mode");
public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
NAME,
RateAggregatorSupplier.class
Expand All @@ -40,9 +42,11 @@ public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafO
static {
ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, false, false);
PARSER.declareString(RateAggregationBuilder::rateUnit, UNIT_FIELD);
PARSER.declareString(RateAggregationBuilder::rateMode, MODE_FIELD);
}

Rounding.DateTimeUnit rateUnit;
RateMode rateMode;

public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
RateAggregatorFactory.registerAggregators(builder);
Expand All @@ -58,6 +62,8 @@ protected RateAggregationBuilder(
Map<String, Object> metadata
) {
super(clone, factoriesBuilder, metadata);
this.rateUnit = clone.rateUnit;
this.rateMode = clone.rateMode;
}

@Override
Expand All @@ -76,6 +82,11 @@ public RateAggregationBuilder(StreamInput in) throws IOException {
} else {
rateUnit = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
if (in.readBoolean()) {
rateMode = in.readEnum(RateMode.class);
}
}
}

@Override
Expand All @@ -90,6 +101,14 @@ protected void innerWriteTo(StreamOutput out) throws IOException {
} else {
out.writeByte((byte) 0);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
if (rateMode != null) {
out.writeBoolean(true);
out.writeEnum(rateMode);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand All @@ -104,14 +123,22 @@ protected RateAggregatorFactory innerBuild(
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
return new RateAggregatorFactory(name, config, rateUnit, context, parent, subFactoriesBuilder, metadata);
if (field() == null && script() == null) {
if (rateMode != null) {
throw new IllegalArgumentException("The mode parameter is only supported with field or script");
}
}
return new RateAggregatorFactory(name, config, rateUnit, rateMode, context, parent, subFactoriesBuilder, metadata);
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (rateUnit != null) {
builder.field(UNIT_FIELD.getPreferredName(), rateUnit.shortName());
}
if (rateMode != null) {
builder.field(MODE_FIELD.getPreferredName(), rateMode.value());
}
return builder;
}

Expand All @@ -129,6 +156,15 @@ public RateAggregationBuilder rateUnit(Rounding.DateTimeUnit rateUnit) {
return this;
}

public RateAggregationBuilder rateMode(String rateMode) {
return rateMode(RateMode.resolve(rateMode));
}

public RateAggregationBuilder rateMode(RateMode rateMode) {
this.rateMode = rateMode;
return this;
}

static Rounding.DateTimeUnit parse(String rateUnit) {
Rounding.DateTimeUnit parsedRate = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(rateUnit);
if (parsedRate == null) {
Expand All @@ -140,17 +176,7 @@ static Rounding.DateTimeUnit parse(String rateUnit) {
@Override
protected ValuesSourceConfig resolveConfig(AggregationContext context) {
if (field() == null && script() == null) {
return new ValuesSourceConfig(
CoreValuesSourceType.NUMERIC,
null,
true,
null,
null,
1.0,
null,
DocValueFormat.RAW,
context
);
return new ValuesSourceConfig(CoreValuesSourceType.NUMERIC, null, true, null, null, 1.0, null, DocValueFormat.RAW, context);
} else {
return super.resolveConfig(context);
}
Expand All @@ -162,11 +188,11 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RateAggregationBuilder that = (RateAggregationBuilder) o;
return rateUnit == that.rateUnit;
return rateUnit == that.rateUnit && rateMode == that.rateMode;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rateUnit);
return Objects.hash(super.hashCode(), rateUnit, rateMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {

private final Rounding.DateTimeUnit rateUnit;

private final RateMode rateMode;

RateAggregatorFactory(
String name,
ValuesSourceConfig config,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
AggregationContext context,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metadata);
this.rateUnit = rateUnit;
this.rateMode = rateMode;
}

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
Expand All @@ -59,7 +63,7 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new AbstractRateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
return new AbstractRateAggregator(name, config, rateUnit, rateMode, searchContext, parent, metadata) {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
return LeafBucketCollector.NO_OP_COLLECTOR;
Expand All @@ -76,6 +80,6 @@ protected Aggregator doCreateInternal(
) throws IOException {
return context.getValuesSourceRegistry()
.getAggregator(RateAggregationBuilder.REGISTRY_KEY, config)
.build(name, config, rateUnit, searchContext, parent, metadata);
.build(name, config, rateUnit, rateMode, searchContext, parent, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Aggregator build(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Expand Down
Loading