Skip to content

Commit

Permalink
[Transform] add support for "missing" aggregation (#63651)
Browse files Browse the repository at this point in the history
add support for the missing (bucket) aggregation (counts docs with a configured missing field value)
in transform. The output is mapped to name:count, the mapping type is long.
  • Loading branch information
Hendrik Muhs authored Oct 15, 2020
1 parent 37b261b commit d1e39e8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ supported:
* <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
* <<search-aggregations-metrics-max-aggregation,Max>>
* <<search-aggregations-metrics-min-aggregation,Min>>
* <<search-aggregations-bucket-missing-aggregation,Missing>>
* <<search-aggregations-metrics-percentile-aggregation,Percentiles>>
* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
* <<search-aggregations-metrics-scripted-metric-aggregation,Scripted metric>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@ public void testSimplePivot() throws Exception {

// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOneCount(transformIndex + "/_search?q=reviewer:user_0", "hits.hits._source.affiliate_missing", 0);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertOneCount(transformIndex + "/_search?q=reviewer:user_5", "hits.hits._source.affiliate_missing", 25);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
assertOneCount(transformIndex + "/_search?q=reviewer:user_26", "hits.hits._source.affiliate_missing", 0);
}

public void testSimpleDataStreamPivot() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ protected void createReviewsIndex(
long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27);
int stars = distributionTable[(i * 33) % distributionTable.length];
long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13);
long affiliate = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 11);

if (i % 12 == 0) {
hour = 10 + (i % 13);
}
Expand Down Expand Up @@ -133,6 +135,9 @@ protected void createReviewsIndex(
if ((user == userWithMissingBuckets && missingBucketField.equals("timestamp")) == false) {
bulk.append("\"timestamp\":\"").append(date_string).append("\",");
}
if ((user == userWithMissingBuckets && missingBucketField.equals("affiliate_id")) == false) {
bulk.append("\"affiliate_id\":\"").append("affiliate_").append(affiliate).append("\",");
}

// always add @timestamp to avoid complicated logic regarding ','
bulk.append("\"@timestamp\":\"").append(date_string).append("\"");
Expand Down Expand Up @@ -185,6 +190,9 @@ protected void putReviewsIndex(String indexName, String dateType, boolean isData
.startObject("location")
.field("type", "geo_point")
.endObject()
.startObject("affiliate_id")
.field("type", "keyword")
.endObject()
.endObject()
.endObject();
}
Expand Down Expand Up @@ -221,7 +229,7 @@ protected void createReviewsIndex() throws IOException {
}

protected void createReviewsIndex(String indexName) throws IOException {
createReviewsIndex(indexName, 1000, "date", false, -1, null);
createReviewsIndex(indexName, 1000, "date", false, 5, "affiliate_id");
}

protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
Expand Down Expand Up @@ -298,6 +306,11 @@ protected void createPivotReviewsTransform(
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"affiliate_missing\": {"
+ " \"missing\": {"
+ " \"field\": \"affiliate_id\""

+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
Expand Down Expand Up @@ -480,8 +493,13 @@ public void wipeTransforms() throws IOException {

// the configuration index should be empty
Request request = new Request("GET", TransformInternalIndexConstants.LATEST_INDEX_NAME + "/_search");
request.setOptions(expectWarnings("this request accesses system indices: [" + TransformInternalIndexConstants.LATEST_INDEX_NAME +
"], but in a future major version, direct access to system indices will be prevented by default"));
request.setOptions(
expectWarnings(
"this request accesses system indices: ["
+ TransformInternalIndexConstants.LATEST_INDEX_NAME
+ "], but in a future major version, direct access to system indices will be prevented by default"
)
);
try {
Response searchResponse = adminClient().performRequest(request);
Map<String, Object> searchResult = entityAsMap(searchResponse);
Expand Down Expand Up @@ -541,6 +559,14 @@ protected void assertOnePivotValue(String query, double expected) throws IOExcep
assertEquals(expected, actual, 0.000001);
}

protected void assertOneCount(String query, String field, int expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
int actual = (Integer) ((List<?>) XContentMapValues.extractValue(field, searchResult)).get(0);
assertEquals(expected, actual);
}

protected static String getTransformEndpoint() {
return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public final class TransformAggregations {
"ip_range",
"matrix_stats",
"median_absolute_deviation",
"missing",
"nested",
"percentile_ranks",
"range",
Expand Down Expand Up @@ -110,7 +109,8 @@ enum AggregationType {
PERCENTILES("percentiles", DOUBLE),
FILTER("filter", LONG),
TERMS("terms", FLATTENED),
RARE_TERMS("rare_terms", FLATTENED);
RARE_TERMS("rare_terms", FLATTENED),
MISSING("missing", LONG);

private final String aggregationType;
private final String targetMapping;
Expand Down

0 comments on commit d1e39e8

Please sign in to comment.