Skip to content

Commit

Permalink
[Transform] add support for terms agg in transforms (elastic#56696)
Browse files Browse the repository at this point in the history
This adds support for `terms` and `rare_terms` aggs in transforms. 

The default behavior is that the results are collapsed in the following manner:
`<AGG_NAME>.<BUCKET_NAME>.<SUBAGGS...>...`
Or if no sub aggs exist
`<AGG_NAME>.<BUCKET_NAME>.<_doc_count>`

The mapping is also defined as `flattened` by default. This is to avoid field explosion while still providing (limited) search and aggregation capabilities.
  • Loading branch information
benwtrent authored May 15, 2020
1 parent 0cc2345 commit fd812d2
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 7 deletions.
2 changes: 2 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ are supported:
* <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>
* <<search-aggregations-metrics-cardinality-aggregation,Cardinality>>
* <<search-aggregations-bucket-filter-aggregation,Filter>>
* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
* <<search-aggregations-bucket-terms-aggregation, Terms>>
* <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
* <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
* <<search-aggregations-metrics-max-aggregation,Max>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ setup:
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
"aggs": {
"vals": {"terms": {"field":"airline"}}
"vals": {"significant_terms": {"field":"airline"}}
}
}
}
- do:
catch: /Unsupported aggregation type \[terms\]/
catch: /Unsupported aggregation type \[significant_terms\]/
transform.preview_transform:
body: >
{
Expand All @@ -280,7 +280,7 @@ setup:
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
"aggs": {
"vals": {"terms": {"field":"airline"}}
"vals": {"significant_terms": {"field":"airline"}}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -508,6 +510,97 @@ public void testDateHistogramPivotNanos() throws Exception {
assertDateHistogramPivot(REVIEWS_DATE_NANO_INDEX_NAME);
}

@SuppressWarnings("unchecked")
public void testPivotWithTermsAgg() throws Exception {
String transformId = "simple_terms_agg_pivot";
String transformIndex = "pivot_reviews_via_histogram_with_terms_agg";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);

final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);

String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"every_2\": {"
+ " \"histogram\": {"
+ " \"interval\": 2,\"field\":\"stars\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"common_users\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\","
+ " \"size\": 2"
+ " },"
+ " \"aggs\" : {"
+ " \"common_businesses\": {"
+ " \"terms\": {"
+ " \"field\": \"business_id\","
+ " \"size\": 2"
+ " }}"
+ " } "
+" },"
+ " \"rare_users\": {"
+ " \"rare_terms\": {"
+ " \"field\": \"user_id\""
+ " } } } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));

// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some term results
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Map<String, Integer> commonUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.common_users",
searchResult
)).get(0);
Map<String, Integer> rareUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.rare_users",
searchResult
)).get(0);
assertThat(commonUsers, is(not(nullValue())));
assertThat(commonUsers, equalTo(new HashMap<>(){{
put("user_10",
Collections.singletonMap(
"common_businesses",
new HashMap<>(){{
put("business_12", 6);
put("business_9", 4);
}}));
put("user_0", Collections.singletonMap(
"common_businesses",
new HashMap<>(){{
put("business_0", 35);
}}));
}}));
assertThat(rareUsers, is(not(nullValue())));
assertThat(rareUsers, equalTo(new HashMap<>(){{
put("user_5", 1);
put("user_12", 1);
}}));
}

private void assertDateHistogramPivot(String indexName) throws Exception {
String transformId = "simple_date_histogram_pivot_" + indexName;
String transformIndex = "pivot_reviews_via_date_histogram_" + indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
Expand Down Expand Up @@ -52,6 +53,7 @@ public final class AggregationResultUtils {
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

Expand Down Expand Up @@ -120,6 +122,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else if (aggregation instanceof SingleBucketAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
} else if (aggregation instanceof MultiBucketsAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiBucketsAggregation.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
Expand Down Expand Up @@ -246,6 +250,35 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
}
}

static class MultiBucketsAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
MultiBucketsAggregation aggregation = (MultiBucketsAggregation) agg;

HashMap<String, Object> nested = new HashMap<>();

for (MultiBucketsAggregation.Bucket bucket : aggregation.getBuckets()) {
if (bucket.getAggregations().iterator().hasNext() == false) {
nested.put(bucket.getKeyAsString(), bucket.getDocCount());
} else {
HashMap<String, Object> nestedBucketObject = new HashMap<>();
for (Aggregation subAgg : bucket.getAggregations()) {
nestedBucketObject.put(
subAgg.getName(),
getExtractor(subAgg).value(
subAgg,
fieldTypeMap,
lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()
)
);
}
nested.put(bucket.getKeyAsString(), nestedBucketObject);
}
}
return nested;
}
}

static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class Aggregations {
private static final String SOURCE = "_source";

public static final String FLOAT = "float";
public static final String FLATTENED = "flattened";
public static final String SCALED_FLOAT = "scaled_float";
public static final String DOUBLE = "double";
public static final String LONG = "long";
Expand Down Expand Up @@ -69,14 +70,12 @@ public final class Aggregations {
"nested",
"percentile_ranks",
"range",
"rare_terms",
"reverse_nested",
"sampler",
"significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
"significant_text",
"stats", // https://github.com/elastic/elasticsearch/issues/51925
"string_stats", // https://github.com/elastic/elasticsearch/issues/51925
"terms", // https://github.com/elastic/elasticsearch/issues/51073
"top_hits",
"top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
"t_test" // https://github.com/elastic/elasticsearch/issues/54503
Expand Down Expand Up @@ -107,7 +106,9 @@ enum AggregationType {
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", DOUBLE),
FILTER("filter", LONG);
FILTER("filter", LONG),
TERMS("terms", FLATTENED),
RARE_TERMS("rare_terms", FLATTENED);

private final String aggregationType;
private final String targetMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ public void testResolveTargetMapping() {
assertEquals("long", Aggregations.resolveTargetMapping("filter", "long"));
assertEquals("long", Aggregations.resolveTargetMapping("filter", "double"));

// terms
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", null));
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "keyword"));
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "text"));

// rare_terms
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", null));
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "text"));
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "keyword"));

// corner case: source type null
assertEquals(null, Aggregations.resolveTargetMapping("min", null));
}
Expand Down

0 comments on commit fd812d2

Please sign in to comment.