Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] add support for geo_line aggregation in pivot function #69299

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ currently supported:
* <<search-aggregations-bucket-filter-aggregation,Filter>>
* <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
* <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
* <<search-aggregations-metrics-geo-line,Geo line>>
* <<search-aggregations-metrics-max-aggregation,Max>>
* <<search-aggregations-metrics-median-absolute-deviation-aggregation,Median absolute deviation>>
* <<search-aggregations-metrics-min-aggregation,Min>>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.spatial.search.aggregations;

import java.util.Map;

/**
* This interface provides a way for spatial aggs to easily provide appropriately formatted geoJSON geometry to describe their
* aggregated results.
*/
public interface GeoShapeMetricAggregation {
/**
* Provides the geometry calculated by the aggregation in an indexible format.
*
* @return geometry as a geoJSON object
*/
Map<String, Object> geoJSONGeometry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.spatial.search.aggregations.GeoShapeMetricAggregation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A single line string representing a sorted sequence of geo-points
*/
public class InternalGeoLine extends InternalAggregation {
public class InternalGeoLine extends InternalAggregation implements GeoShapeMetricAggregation {
private static final double SCALE = Math.pow(10, 6);

private long[] line;
Expand Down Expand Up @@ -147,21 +149,9 @@ public int size() {

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
final List<double[]> coordinates = new ArrayList<>();
for (int i = 0; i < line.length; i++) {
int x = (int) (line[i] >> 32);
int y = (int) line[i];
coordinates.add(new double[] {
roundDegrees(GeoEncodingUtils.decodeLongitude(x)),
roundDegrees(GeoEncodingUtils.decodeLatitude(y))
});
}
builder
.field("type", "Feature")
.startObject("geometry")
.field("type", "LineString")
.array("coordinates", coordinates.toArray())
.endObject()
.field("geometry", geoJSONGeometry())
.startObject("properties")
.field("complete", isComplete());
if (includeSorts) {
Expand Down Expand Up @@ -212,4 +202,21 @@ public boolean equals(Object obj) {
&& Objects.equals(size, that.size);

}

@Override
public Map<String, Object> geoJSONGeometry() {
final List<double[]> coordinates = new ArrayList<>();
for (int i = 0; i < line.length; i++) {
int x = (int) (line[i] >> 32);
int y = (int) line[i];
coordinates.add(new double[] {
roundDegrees(GeoEncodingUtils.decodeLongitude(x)),
roundDegrees(GeoEncodingUtils.decodeLatitude(y))
});
}
final Map<String, Object> geoJSON = new HashMap<>();
geoJSON.put("type", "LineString");
geoJSON.put("coordinates", coordinates.toArray());
return geoJSON;
}
}
1 change: 1 addition & 0 deletions x-pack/plugin/transform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(path: xpackModule('analytics'))
testImplementation project(path: ':modules:aggs-matrix-stats')
testImplementation project(path: xpackModule('spatial'))
}

addQaCheckDependencies()
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -1241,6 +1242,65 @@ public void testPivotWithGeoCentroidAgg() throws Exception {
assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
}

@SuppressWarnings("unchecked")
public void testPivotWithGeoLineAgg() throws Exception {
String transformId = "geo_line_pivot";
String transformIndex = "geo_line_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);

final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);

String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"location\": {"
+ " \"geo_line\": {\"point\": {\"field\":\"location\"}, \"sort\": {\"field\": \"timestamp\"}}"
+ " } } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(transformIndex));

// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some users
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
Map<String, Object> actualString = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.location",
searchResult
)).get(0);
assertThat(actualString, hasEntry("type", "LineString"));
}

@SuppressWarnings("unchecked")
public void testPivotWithGeotileGroupBy() throws Exception {
String transformId = "geotile_grid_group_by";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.Percentiles;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
import org.elasticsearch.xpack.core.spatial.search.aggregations.GeoShapeMetricAggregation;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource;
Expand Down Expand Up @@ -61,6 +62,7 @@ public final class AggregationResultUtils {
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
tempMap.put(GeoShapeMetricAggregation.class.getName(), new GeoShapeMetricAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

Expand Down Expand Up @@ -155,6 +157,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
} else if (aggregation instanceof MultiBucketsAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiBucketsAggregation.class.getName());
} else if (aggregation instanceof GeoShapeMetricAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoShapeMetricAggregation.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
Expand Down Expand Up @@ -210,6 +214,10 @@ public static class AggregationExtractionException extends ElasticsearchExceptio
AggregationExtractionException(String msg, Object... args) {
super(msg, args);
}

AggregationExtractionException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

/**
Expand Down Expand Up @@ -390,6 +398,20 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
}
}

static class GeoShapeMetricAggExtractor implements AggValueExtractor {

@Override
public Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
assert aggregation instanceof GeoShapeMetricAggregation
: "Unexpected type ["
+ aggregation.getClass().getName()
+ "] for aggregation ["
+ aggregation.getName()
+ "]";
return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry();
}
}

static class GeoTileBucketKeyExtractor implements BucketKeyExtractor {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ enum AggregationType {
MAX("max", SOURCE),
MIN("min", SOURCE),
SUM("sum", DOUBLE),
GEO_CENTROID("geo_centroid", GEO_POINT),
GEO_BOUNDS("geo_bounds", GEO_SHAPE),
GEO_CENTROID("geo_centroid", GEO_POINT),
GEO_LINE("geo_line", GEO_SHAPE),
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
WEIGHTED_AVG("weighted_avg", DOUBLE),
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.spatial.SpatialPlugin;
import org.elasticsearch.xpack.transform.Transform;
import org.elasticsearch.xpack.transform.transforms.Function;
import org.elasticsearch.xpack.transform.transforms.pivot.TransformAggregations.AggregationType;
Expand All @@ -54,7 +56,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.anyOf;
Expand All @@ -75,7 +76,7 @@ public class PivotTests extends ESTestCase {
@Before
public void registerAggregationNamedObjects() throws Exception {
// register aggregations as NamedWriteable
SearchModule searchModule = new SearchModule(Settings.EMPTY, emptyList());
SearchModule searchModule = new SearchModule(Settings.EMPTY, List.of(new TestSpatialPlugin()));
namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
}

Expand Down Expand Up @@ -305,6 +306,11 @@ private AggregationConfig getAggregationConfig(String agg) throws IOException {
"{" + "\"pivot_filter\": {" + " \"filter\": {" + " \"term\": {\"field\": \"value\"}" + " }" + "}" + "}"
);
}
if (agg.equals(AggregationType.GEO_LINE.getName())) {
return parseAggregations(
"{\"pivot_geo_line\": {\"geo_line\": {\"point\": {\"field\": \"values\"}, \"sort\":{\"field\": \"timestamp\"}}}}"
);
}

return parseAggregations(
"{\n" + " \"pivot_" + agg + "\": {\n" + " \"" + agg + "\": {\n" + " \"field\": \"values\"\n" + " }\n" + " }" + "}"
Expand Down Expand Up @@ -345,4 +351,14 @@ private static void validate(Client client, SourceConfig source, Function pivot,
fail("Expected config to be invalid");
}
}

// This is to pass license checks :)
private static class TestSpatialPlugin extends SpatialPlugin {

@Override
protected XPackLicenseState getLicenseState() {
return new XPackLicenseState(Settings.EMPTY, System::currentTimeMillis);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public void testResolveTargetMapping() {
assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_bounds", "geo_shape"));
assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_bounds", null));

// geo_line
assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_line", "geo_shape"));
assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_line", null));

// scripted_metric
assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("scripted_metric", null));
assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("scripted_metric", "int"));
Expand Down