Skip to content

Commit

Permalink
Improvements to Elasticsearch aggregations (#1970)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbey authored Jun 28, 2023
1 parent 22376e3 commit 9c18f50
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private static Function<Object, Object> csvValueExtractor(String type)
case "Integer":
return ((Function<Object, Number>) Number.class::cast).andThen(Number::longValue);
case "Float":
case "Number":
return ((Function<Object, Number>) Number.class::cast).andThen(Number::doubleValue);
case "Decimal":
return BigDecimal.class::cast;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public TDSColumnWithSerializer(TDSColumn tdsColumn)
case "Integer":
return (ThrowingProcedure2<JsonGenerator, Long>) JsonGenerator::writeNumber;
case "Float":
case "Number":
return (ThrowingProcedure2<JsonGenerator, Double>) JsonGenerator::writeNumber;
case "Decimal":
return (ThrowingProcedure2<JsonGenerator, BigDecimal>) JsonGenerator::writeNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ function <<access.private>> meta::pure::tds::aggRows<X,Y>(rows : TDSRow[*], colu
let values = $aggValues->map(aggValue|
let rowMapValues = $rows->map(row|$aggValue.mapFn->eval($row));
let value = $aggValue.aggregateFn->eval($rowMapValues);
$value;
if($value->isEmpty(), |^TDSNull(), |$value);
);
^TDSRow(values = $values, parent=$parent);
}, {|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ private static Function<JsonNode, Object> jsonToTDSValue(TDSColumn column)
case "Integer":
return JsonNode::asLong;
case "Float":
case "Number":
return JsonNode::asDouble;
case "Decimal":
return Functions.chain(JsonNode::asText, BigDecimal::new);
Expand All @@ -154,6 +155,7 @@ private static Function<Object, Object> rawToTDSValue(TDSColumn column)
case "Integer":
return Functions.chain(Number.class::cast, Number::longValue);
case "Float":
case "Number":
return Functions.chain(Number.class::cast, Number::doubleValue);
case "Decimal":
return x ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.global.search.ResponseBody;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.global.search.SearchRequest;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.global.search.types.Hit;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.global.search.types.TotalHits;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.AbstractRequestBaseVisitor;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.FieldValue;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.RequestBase;
Expand All @@ -64,6 +65,8 @@
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.AggregateBase;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.AvgAggregate;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.CompositeAggregate;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.MaxAggregate;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.MinAggregate;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.SumAggregate;
import org.finos.legend.engine.protocol.store.elasticsearch.v7.specification.types.aggregations.ValueCountAggregate;
import org.finos.legend.engine.shared.core.api.request.RequestContext;
Expand Down Expand Up @@ -145,7 +148,7 @@ public Result visit(SearchRequest val)

private Stream<Object[]> processAggregateResponse(ResponseBody<?> responseBody) throws IOException
{
AggregateTDSResultVisitor aggregateTDSResultVisitor = new AggregateTDSResultVisitor();
AggregateTDSResultVisitor aggregateTDSResultVisitor = new AggregateTDSResultVisitor(responseBody.hits.total);
List<TDSColumn> tdsColumns = ((TDSResultType) this.node.resultType).tdsColumns;
List<TDSColumnResultPath> columnResultPaths = ((TDSMetadata) node.metadata).columnResultPaths;

Expand Down Expand Up @@ -251,6 +254,13 @@ private InputStream post(HttpUriRequest request, Span span, long startTime) thro

private static class AggregateTDSResultVisitor extends AbstractAggregateBaseVisitor<Object>
{
private final TotalHits total;

private AggregateTDSResultVisitor(TotalHits total)
{
this.total = total;
}

@Override
protected Object defaultValue(AggregateBase val)
{
Expand All @@ -260,7 +270,7 @@ protected Object defaultValue(AggregateBase val)
@Override
public Object visit(AvgAggregate val)
{
return val.value;
return this.total.value == 0L ? null : val.value;
}

@Override
Expand All @@ -269,6 +279,18 @@ public Object visit(SumAggregate val)
return val.value;
}

@Override
public Object visit(MaxAggregate val)
{
return this.total.value == 0L ? null : val.value;
}

@Override
public Object visit(MinAggregate val)
{
return this.total.value == 0L ? null : val.value;
}

@Override
public Object visit(ValueCountAggregate val)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,21 @@ function meta::external::store::elasticsearch::v7::pureToEs::supportedAggregatio
{
let supported = [
pair(x: Function<Any>[1] | $x == meta::pure::functions::math::sum_Integer_MANY__Integer_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(sum = ^SumAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::sum_Float_MANY__Float_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(sum = ^SumAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::sum_Number_MANY__Number_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(sum = ^SumAggregation(field = $y))})

,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::max_Integer_MANY__Integer_$0_1$_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(max = ^MaxAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::max_Float_MANY__Float_$0_1$_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(max = ^MaxAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::max_Number_MANY__Number_$0_1$_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(max = ^MaxAggregation(field = $y))})

,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::min_Integer_MANY__Integer_$0_1$_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(min = ^MinAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::min_Float_MANY__Float_$0_1$_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(min = ^MinAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::min_Number_MANY__Number_$0_1$_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(min = ^MinAggregation(field = $y))})

,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::average_Integer_MANY__Float_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(avg = ^AverageAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::average_Float_MANY__Float_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(avg = ^AverageAggregation(field = $y))})
,pair(x: Function<Any>[1] | $x == meta::pure::functions::math::average_Number_MANY__Float_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(avg = ^AverageAggregation(field = $y))})

,pair(x: Function<Any>[1] | $x == meta::pure::functions::collection::count_Any_MANY__Integer_1_, {x: FunctionExpression[1], y: String[1] | ^AggregationContainer(value_count = ^ValueCountAggregation(field = $y))})
];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::test::*;
import meta::pure::tds::*;
import meta::external::store::elasticsearch::executionTest::testCase::*;
import meta::external::store::elasticsearch::executionTest::testCase::tds::*;
import meta::external::store::elasticsearch::executionTest::test::*;
import meta::external::store::elasticsearch::executionTest::utils::*;

function
<<paramTest.Test>>
{doc.doc = 'Test avg aggregation on Integer field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::avg::testAverageAggregationIntegerField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('avg', r | $r.getInteger('Budget'), agg | $agg->average())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test avg aggregation on Float field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::avg::testAverageAggregationFloatField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('avg', r | $r.getFloat('Revenue'), agg | $agg->average())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test avg aggregation on Number field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::avg::testAverageAggregationNumberField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('avg', r | $r.getNumber('Revenue'), agg | $agg->average())));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::test::*;
import meta::pure::tds::*;
import meta::external::store::elasticsearch::executionTest::testCase::*;
import meta::external::store::elasticsearch::executionTest::testCase::tds::*;
import meta::external::store::elasticsearch::executionTest::test::*;
import meta::external::store::elasticsearch::executionTest::utils::*;

function
<<paramTest.Test>>
{doc.doc = 'Test count aggregation on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::count::testCountAggregation(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('count', r | $r.getString('MainActor.Name'), agg | $agg->count())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test count no result aggregation on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::count::testEmptyCountAggregation(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->filter(r | $r.getString('_id') == 'N/A')->groupBy([], agg('count', r | $r.getString('MainActor.Name'), agg | $agg->count())));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::test::*;
import meta::pure::tds::*;
import meta::external::store::elasticsearch::executionTest::testCase::*;
import meta::external::store::elasticsearch::executionTest::testCase::tds::*;
import meta::external::store::elasticsearch::executionTest::test::*;
import meta::external::store::elasticsearch::executionTest::utils::*;

function
<<paramTest.Test>>
{doc.doc = 'Test max aggregation on Integer field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::max::testMaxAggregationIntegerField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('max', r | $r.getInteger('Budget'), agg | $agg->max())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test max aggregation on Float field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::max::testMaxAggregationFloatField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('max', r | $r.getFloat('Revenue'), agg | $agg->max())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test max aggregation on Number field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::max::testMaxAggregationNumberField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('max', r | $r.getNumber('Revenue'), agg | $agg->max())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test max aggregation no result on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::max::testEmptyMaxAggregation(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->filter(r | $r.getString('_id') == 'N/A')->groupBy([], agg('max', r | $r.getNumber('Revenue'), agg | $agg->max())));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::test::*;
import meta::pure::tds::*;
import meta::external::store::elasticsearch::executionTest::testCase::*;
import meta::external::store::elasticsearch::executionTest::testCase::tds::*;
import meta::external::store::elasticsearch::executionTest::test::*;
import meta::external::store::elasticsearch::executionTest::utils::*;

function
<<paramTest.Test>>
{doc.doc = 'Test min aggregation on Integer field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::min::testMinAggregationIntegerField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('min', r | $r.getInteger('Budget'), agg | $agg->min())));
}


function
<<paramTest.Test>>
{doc.doc = 'Test min aggregation on Float field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::min::testMinAggregationFloatField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('min', r | $r.getFloat('Revenue'), agg | $agg->min())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test min aggregation on Number field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::min::testMinAggregationNumberField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('min', r | $r.getNumber('Revenue'), agg | $agg->min())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test min aggregation no result on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::min::testEmptyMinAggregation(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->filter(r | $r.getString('_id') == 'N/A')->groupBy([], agg('min', r | $r.getNumber('Revenue'), agg | $agg->min())));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::test::*;
import meta::pure::tds::*;
import meta::external::store::elasticsearch::executionTest::testCase::*;
import meta::external::store::elasticsearch::executionTest::testCase::tds::*;
import meta::external::store::elasticsearch::executionTest::test::*;
import meta::external::store::elasticsearch::executionTest::utils::*;

function
<<paramTest.Test>>
{doc.doc = 'Test sum aggregation on Integer field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::sum::testSumAggregationIntegerField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('sum', r | $r.getInteger('Budget'), agg | $agg->sum())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test sum aggregation on Float field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::sum::testSumAggregationFloatField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('sum', r | $r.getFloat('Revenue'), agg | $agg->sum())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test sum aggregation on Number field on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::sum::testSumAggregationNumberField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy([], agg('sum', r | $r.getNumber('Revenue'), agg | $agg->sum())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test sum aggregation no result on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::sum::testEmptySumAggregation(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->filter(r | $r.getString('_id') == 'N/A')->groupBy([], agg('sum', r | $r.getNumber('Revenue'), agg | $agg->sum())));
}

0 comments on commit 9c18f50

Please sign in to comment.