From ee566e4f130a37310f6dd2edfe6ea6653969cb7b Mon Sep 17 00:00:00 2001 From: Oleksandr Kolomiiets Date: Wed, 24 Apr 2024 14:08:21 -0700 Subject: [PATCH 1/3] [TEST] Use GET API instead of search in range field synthetic source tests (#107874) --- .../test/range/20_synthetic_source.yml | 296 ++++++++++++++---- 1 file changed, 240 insertions(+), 56 deletions(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/range/20_synthetic_source.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/range/20_synthetic_source.yml index eac0fb9a52aa2..60c61ddbb698e 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/range/20_synthetic_source.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/range/20_synthetic_source.yml @@ -73,28 +73,58 @@ setup: indices.refresh: {} - do: - search: + get: index: synthetic_source_test - - match: { hits.total.value: 7 } + id: "1" - match: - hits.hits.0._source: + _source: integer_range: { "gte": 1, "lte": 5 } + + - do: + get: + index: synthetic_source_test + id: "2" - match: - hits.hits.1._source: + _source: integer_range: { "gte": 2, "lte": 3 } + + - do: + get: + index: synthetic_source_test + id: "3" - match: - hits.hits.2._source: + _source: integer_range: { "gte": 4, "lte": 4 } + + - do: + get: + index: synthetic_source_test + id: "4" - match: - hits.hits.3._source: + _source: integer_range: [ { "gte": 5, "lte": 6 }, { "gte": 5, "lte": 7 } ] + + - do: + get: + index: synthetic_source_test + id: "5" - match: - hits.hits.4._source: {} + _source: {} + + - do: + get: + index: synthetic_source_test + id: "6" - match: - hits.hits.5._source: + _source: integer_range: { "gte": -2147483648, "lte": 10 } + + - do: + get: + index: synthetic_source_test + id: "7" - match: - hits.hits.6._source: + _source: integer_range: { "gte": 1, "lte": 2147483647 } --- @@ -146,28 +176,58 @@ setup: indices.refresh: {} - do: - search: + get: index: synthetic_source_test - - match: { hits.total.value: 7 } + id: "1" - match: - hits.hits.0._source: + _source: long_range: { "gte": 1, "lte": 5 } + + - do: + get: + index: synthetic_source_test + id: "2" - match: - hits.hits.1._source: + _source: long_range: { "gte": 2, "lte": 3 } + + - do: + get: + index: synthetic_source_test + id: "3" - match: - hits.hits.2._source: + _source: long_range: { "gte": 4, "lte": 4 } + + - do: + get: + index: synthetic_source_test + id: "4" - match: - hits.hits.3._source: + _source: long_range: [ { "gte": 5, "lte": 6 }, { "gte": 5, "lte": 7 } ] + + - do: + get: + index: synthetic_source_test + id: "5" - match: - hits.hits.4._source: {} + _source: {} + + - do: + get: + index: synthetic_source_test + id: "6" - match: - hits.hits.5._source: + _source: long_range: { "gte": -9223372036854775808, "lte": 10 } + + - do: + get: + index: synthetic_source_test + id: "7" - match: - hits.hits.6._source: + _source: long_range: { "gte": 1, "lte": 9223372036854775807 } --- @@ -213,25 +273,50 @@ setup: indices.refresh: {} - do: - search: + get: index: synthetic_source_test - - match: { hits.total.value: 6 } + id: "1" - match: - hits.hits.0._source: + _source: float_range: { "gte": 1.0, "lte": 5.0 } + + - do: + get: + index: synthetic_source_test + id: "2" - match: - hits.hits.1._source: + _source: float_range: { "gte": 4.0, "lte": 5.0 } + + - do: + get: + index: synthetic_source_test + id: "3" - match: - hits.hits.2._source: + _source: float_range: [ { "gte": 4.0, "lte": 7.0 }, { "gte": 4.0, "lte": 8.0 } ] + + - do: + get: + index: synthetic_source_test + id: "4" - match: - hits.hits.3._source: {} + _source: {} + + - do: + get: + index: synthetic_source_test + id: "5" - match: - hits.hits.4._source: + _source: float_range: { "gte": "-Infinity", "lte": 10.0 } + + - do: + get: + index: synthetic_source_test + id: "6" - match: - hits.hits.5._source: + _source: float_range: { "gte": 1.0, "lte": "Infinity" } --- @@ -277,25 +362,50 @@ setup: indices.refresh: {} - do: - search: + get: index: synthetic_source_test - - match: { hits.total.value: 6 } + id: "1" - match: - hits.hits.0._source: + _source: double_range: { "gte": 1.0, "lte": 5.0 } + + - do: + get: + index: synthetic_source_test + id: "2" - match: - hits.hits.1._source: + _source: double_range: { "gte": 4.0, "lte": 5.0 } + + - do: + get: + index: synthetic_source_test + id: "3" - match: - hits.hits.2._source: + _source: double_range: [ { "gte": 4.0, "lte": 7.0 }, { "gte": 4.0, "lte": 8.0 } ] + + - do: + get: + index: synthetic_source_test + id: "4" - match: - hits.hits.3._source: {} + _source: {} + + - do: + get: + index: synthetic_source_test + id: "5" - match: - hits.hits.4._source: + _source: double_range: { "gte": "-Infinity", "lte": 10.0 } + + - do: + get: + index: synthetic_source_test + id: "6" - match: - hits.hits.5._source: + _source: double_range: { "gte": 1.0, "lte": "Infinity" } --- @@ -353,31 +463,66 @@ setup: indices.refresh: {} - do: - search: + get: index: synthetic_source_test - - match: { hits.total.value: 8 } + id: "1" - match: - hits.hits.0._source: + _source: ip_range: { "gte": "192.168.0.1", "lte": "192.168.0.5" } + + - do: + get: + index: synthetic_source_test + id: "2" - match: - hits.hits.1._source: + _source: ip_range: { "gte": "192.168.0.2", "lte": "192.168.0.3" } + + - do: + get: + index: synthetic_source_test + id: "3" - match: - hits.hits.2._source: + _source: ip_range: { "gte": "192.168.0.4", "lte": "192.168.0.4" } + + - do: + get: + index: synthetic_source_test + id: "4" - match: - hits.hits.3._source: + _source: ip_range: { "gte": "2001:db8::1", "lte": "200a:ff:ffff:ffff:ffff:ffff:ffff:ffff" } + + - do: + get: + index: synthetic_source_test + id: "5" - match: - hits.hits.4._source: + _source: ip_range: { "gte": "74.125.227.0", "lte": "74.125.227.127" } + + - do: + get: + index: synthetic_source_test + id: "6" - match: - hits.hits.5._source: {} + _source: {} + + - do: + get: + index: synthetic_source_test + id: "7" - match: - hits.hits.6._source: + _source: ip_range: { "gte": "0.0.0.0", "lte": "10.10.10.10" } + + - do: + get: + index: synthetic_source_test + id: "8" - match: - hits.hits.7._source: + _source: ip_range: { "gte": "2001:db8::", "lte": "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" } --- @@ -441,33 +586,72 @@ setup: indices.refresh: {} - do: - search: + get: index: synthetic_source_test - - match: { hits.total.value: 9 } + id: "1" - match: - hits.hits.0._source: + _source: date_range: { "gte": "2017-09-01T00:00:00.000Z", "lte": "2017-09-05T00:00:00.000Z" } + + - do: + get: + index: synthetic_source_test + id: "2" - match: - hits.hits.1._source: + _source: date_range: { "gte": "2017-09-01T00:00:00.001Z", "lte": "2017-09-03T00:00:00.000Z" } + + - do: + get: + index: synthetic_source_test + id: "3" - match: - hits.hits.2._source: + _source: date_range: { "gte": "2017-09-04T00:00:00.000Z", "lte": "2017-09-04T23:59:59.999Z" } + + - do: + get: + index: synthetic_source_test + id: "4" - match: - hits.hits.3._source: + _source: date_range: [ { "gte": "2017-09-04T00:00:00.001Z", "lte": "2017-09-06T23:59:59.999Z" }, { "gte": "2017-09-04T00:00:00.001Z", "lte": "2017-09-07T23:59:59.999Z" } ] + + - do: + get: + index: synthetic_source_test + id: "5" - match: - hits.hits.4._source: + _source: date_range: { "gte": "2017-09-01T00:00:00.000Z", "lte": "2017-09-05T00:00:00.000Z" } + + - do: + get: + index: synthetic_source_test + id: "6" - match: - hits.hits.5._source: + _source: date_range: { "gte": "2017-09-01T10:20:30.123Z", "lte": "2017-09-05T03:04:05.789Z" } + + - do: + get: + index: synthetic_source_test + id: "7" - match: - hits.hits.6._source: {} + _source: {} + + - do: + get: + index: synthetic_source_test + id: "8" - match: - hits.hits.7._source: + _source: date_range: { "gte": "-292275055-05-16T16:47:04.192Z", "lte": "2017-09-05T00:00:00.000Z" } + + - do: + get: + index: synthetic_source_test + id: "9" - match: - hits.hits.8._source: + _source: date_range: { "gte": "2017-09-05T00:00:00.000Z", "lte": "+292278994-08-17T07:12:55.807Z" } - From 0719c906be0f66bb0b74e2b549b2e30d0e21dfc4 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 24 Apr 2024 17:39:24 -0600 Subject: [PATCH 2/3] Gather unassigned replicas corresponding to newly-created primary uniquely (#107794) Related to the work in #101638 this changes the way we calculate whether all replicas are unassigned when corresponding to newly created primaries. While this doesn't affect anything in Stateful ES on its own, it's a building-block used for object-store-based ES (Serverless). Semi-related to #99951, though it does not solve (and does not strive to solve) that issue. --- ...rdsAvailabilityHealthIndicatorService.java | 29 ++- ...ailabilityHealthIndicatorServiceTests.java | 182 +++++++++++++++++- 2 files changed, 208 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java index 74da033fd8811..7c176f65599a9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; @@ -440,7 +441,8 @@ public class ShardAllocationCounts { public void increment(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) { boolean isNew = isUnassignedDueToNewInitialization(routing, state); boolean isRestarting = isUnassignedDueToTimelyRestart(routing, shutdowns); - boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state); + boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state) + && isNewlyCreatedAndInitializingReplica(routing, state) == false; if (allUnavailable) { indicesWithAllShardsUnavailable.add(routing.getIndexName()); } @@ -498,7 +500,7 @@ private void addDefinition(Diagnosis.Definition diagnosisDefinition, String inde * example: if a replica is passed then this will return true if ALL replicas are unassigned, * but if at least one is assigned, it will return false. */ - private boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) { + boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) { return StreamSupport.stream( state.routingTable().allActiveShardsGrouped(new String[] { routing.getIndexName() }, true).spliterator(), false @@ -509,6 +511,29 @@ private boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterS .allMatch(ShardRouting::unassigned); } + /** + * Returns true if the given shard is a replica that is only unassigned due to its primary being + * newly created. See {@link ClusterShardHealth#getInactivePrimaryHealth(ShardRouting)} for more + * information. + * + * We use this information when considering whether a cluster should turn red. For some cases + * (a newly created index having unassigned replicas for example), we don't want the cluster + * to turn "unhealthy" for the tiny amount of time before the shards are allocated. + */ + static boolean isNewlyCreatedAndInitializingReplica(ShardRouting routing, ClusterState state) { + if (routing.active()) { + return false; + } + if (routing.primary()) { + return false; + } + ShardRouting primary = state.routingTable().shardRoutingTable(routing.shardId()).primaryShard(); + if (primary.active()) { + return false; + } + return ClusterShardHealth.getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW; + } + private static boolean isUnassignedDueToTimelyRestart(ShardRouting routing, NodesShutdownMetadata shutdowns) { var info = routing.unassignedInfo(); if (info == null || info.getReason() != UnassignedInfo.Reason.NODE_RESTARTING) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java index bb7523661a0fa..77b1fd8988d63 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -407,6 +408,95 @@ public void testAllReplicasUnassigned() { ); assertTrue(status.replicas.doAnyIndicesHaveAllUnavailable()); } + { + ClusterState clusterState = createClusterStateWith( + List.of( + indexNewlyCreated( + "myindex", + new ShardAllocation( + randomNodeId(), + CREATING, + new UnassignedInfo( + UnassignedInfo.Reason.NODE_LEFT, + "message", + null, + 0, + 0, + 0, + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Set.of(), + null + ) + ), // Primary 1 + new ShardAllocation(randomNodeId(), UNAVAILABLE) // Replica 1 + ) + ), + List.of() + ); + var service = createShardsAvailabilityIndicatorService(clusterState); + ShardAllocationStatus status = service.createNewStatus(clusterState.metadata()); + ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus( + status, + clusterState, + NodesShutdownMetadata.EMPTY, + randomBoolean() + ); + // Here because the replica is unassigned due to the primary being created, it's treated as though the replica can be ignored. + assertFalse( + "an unassigned replica from a newly created and initializing primary " + + "should not be treated as an index with all replicas unavailable", + status.replicas.doAnyIndicesHaveAllUnavailable() + ); + } + + /* + A couple of tests for + {@link ShardsAvailabilityHealthIndicatorService#areAllShardsOfThisTypeUnavailable(ShardRouting, ClusterState)} + */ + { + IndexRoutingTable routingTable = indexWithTwoPrimaryOneReplicaShard( + "myindex", + new ShardAllocation(randomNodeId(), AVAILABLE), // Primary 1 + new ShardAllocation(randomNodeId(), AVAILABLE), // Replica 1 + new ShardAllocation(randomNodeId(), AVAILABLE), // Primary 2 + new ShardAllocation(randomNodeId(), UNAVAILABLE) // Replica 2 + ); + ClusterState clusterState = createClusterStateWith(List.of(routingTable), List.of()); + var service = createShardsAvailabilityIndicatorService(clusterState); + ShardAllocationStatus status = service.createNewStatus(clusterState.metadata()); + ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus( + status, + clusterState, + NodesShutdownMetadata.EMPTY, + randomBoolean() + ); + var shardRouting = routingTable.shardsWithState(ShardRoutingState.UNASSIGNED).get(0); + assertTrue(service.areAllShardsOfThisTypeUnavailable(shardRouting, clusterState)); + } + { + ClusterState clusterState = createClusterStateWith( + List.of( + index( + "myindex", + new ShardAllocation(randomNodeId(), AVAILABLE), + new ShardAllocation(randomNodeId(), AVAILABLE), + new ShardAllocation(randomNodeId(), UNAVAILABLE) + ) + ), + List.of() + ); + var service = createShardsAvailabilityIndicatorService(clusterState); + ShardAllocationStatus status = service.createNewStatus(clusterState.metadata()); + ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus( + status, + clusterState, + NodesShutdownMetadata.EMPTY, + randomBoolean() + ); + var shardRouting = clusterState.routingTable().index("myindex").shardsWithState(ShardRoutingState.UNASSIGNED).get(0); + assertFalse(service.areAllShardsOfThisTypeUnavailable(shardRouting, clusterState)); + } } public void testShouldBeRedWhenThereAreUnassignedPrimariesAndNoReplicas() { @@ -1913,6 +2003,72 @@ public void testMappedFieldsForTelemetry() { } } + public void testIsNewlyCreatedAndInitializingReplica() { + ShardId id = new ShardId("index", "uuid", 0); + IndexMetadata idxMeta = IndexMetadata.builder("index") + .numberOfShards(1) + .numberOfReplicas(1) + .settings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.version.created", IndexVersion.current()) + .put("index.uuid", "uuid") + .build() + ) + .build(); + ShardRouting primary = createShardRouting(id, true, new ShardAllocation("node", AVAILABLE)); + var state = createClusterStateWith(List.of(index("index", new ShardAllocation("node", AVAILABLE))), List.of()); + assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(primary, state)); + + ShardRouting replica = createShardRouting(id, false, new ShardAllocation("node", AVAILABLE)); + state = createClusterStateWith(List.of(index("index", new ShardAllocation("node", AVAILABLE))), List.of()); + assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(replica, state)); + + ShardRouting unassignedReplica = createShardRouting(id, false, new ShardAllocation("node", UNAVAILABLE)); + state = createClusterStateWith( + List.of(idxMeta), + List.of(index("index", "uuid", new ShardAllocation("node", UNAVAILABLE))), + List.of(), + List.of() + ); + assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unassignedReplica, state)); + + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.NODE_LEFT, UnassignedInfo.Reason.NODE_RESTARTING); + ShardAllocation allocation = new ShardAllocation( + "node", + UNAVAILABLE, + new UnassignedInfo( + reason, + "message", + null, + 0, + 0, + 0, + randomBoolean(), + randomFrom(UnassignedInfo.AllocationStatus.values()), + Set.of(), + reason == UnassignedInfo.Reason.NODE_LEFT ? null : randomAlphaOfLength(20) + ) + ); + ShardRouting unallocatedReplica = createShardRouting(id, false, allocation); + state = createClusterStateWith( + List.of(idxMeta), + List.of(index(idxMeta, new ShardAllocation("node", UNAVAILABLE), allocation)), + List.of(), + List.of() + ); + assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unallocatedReplica, state)); + + state = createClusterStateWith( + List.of(idxMeta), + List.of(index(idxMeta, new ShardAllocation("node", CREATING), allocation)), + List.of(), + List.of() + ); + assertTrue(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unallocatedReplica, state)); + } + private HealthIndicatorResult createExpectedResult( HealthStatus status, String symptom, @@ -2038,9 +2194,18 @@ private static Map addDefaults(Map override) { } private static IndexRoutingTable index(String name, ShardAllocation primaryState, ShardAllocation... replicaStates) { + return index(name, "_na_", primaryState, replicaStates); + } + + private static IndexRoutingTable index(String name, String uuid, ShardAllocation primaryState, ShardAllocation... replicaStates) { return index( IndexMetadata.builder(name) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_INDEX_UUID, uuid) + .build() + ) .numberOfShards(1) .numberOfReplicas(replicaStates.length) .build(), @@ -2049,6 +2214,21 @@ private static IndexRoutingTable index(String name, ShardAllocation primaryState ); } + private static IndexRoutingTable indexNewlyCreated(String name, ShardAllocation primary1State, ShardAllocation replica1State) { + var indexMetadata = IndexMetadata.builder(name) + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + var index = indexMetadata.getIndex(); + var shard1Id = new ShardId(index, 0); + + var builder = IndexRoutingTable.builder(index); + builder.addShard(createShardRouting(shard1Id, true, primary1State)); + builder.addShard(createShardRouting(shard1Id, false, replica1State)); + return builder.build(); + } + private static IndexRoutingTable indexWithTwoPrimaryOneReplicaShard( String name, ShardAllocation primary1State, From c0b023c113d6bf0ac8b27b4e4e471c0b5ddd3bab Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Thu, 25 Apr 2024 07:54:29 +0200 Subject: [PATCH 3/3] Optimise few metric aggregations for single value fields (#107832) This commit adds two new abstractions for NumericMetricsAggregator which expects implementation for LeafCollectors for single and multi-value fields. --- docs/changelog/107832.yaml | 5 ++ .../AbstractHDRPercentilesAggregator.java | 35 ++++---- .../AbstractTDigestPercentilesAggregator.java | 34 ++++---- .../aggregations/metrics/AvgAggregator.java | 62 +++++++------ .../metrics/ExtendedStatsAggregator.java | 87 +++++++++++-------- .../MedianAbsoluteDeviationAggregator.java | 57 ++++++------ .../metrics/NumericMetricsAggregator.java | 74 ++++++++++++++++ .../aggregations/metrics/StatsAggregator.java | 73 +++++++++------- .../aggregations/metrics/SumAggregator.java | 57 ++++++------ 9 files changed, 306 insertions(+), 178 deletions(-) create mode 100644 docs/changelog/107832.yaml diff --git a/docs/changelog/107832.yaml b/docs/changelog/107832.yaml new file mode 100644 index 0000000000000..491c491736005 --- /dev/null +++ b/docs/changelog/107832.yaml @@ -0,0 +1,5 @@ +pr: 107832 +summary: Optimise few metric aggregations for single value fields +area: Aggregations +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHDRPercentilesAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHDRPercentilesAggregator.java index 670cf08038e03..a1cb547ec0bdd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHDRPercentilesAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractHDRPercentilesAggregator.java @@ -9,27 +9,24 @@ package org.elasticsearch.search.aggregations.metrics; import org.HdrHistogram.DoubleHistogram; -import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; import java.util.Map; -abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator.MultiValue { +abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator.MultiDoubleValue { protected final double[] keys; - protected final ValuesSource valuesSource; protected final DocValueFormat format; protected ObjectArray states; protected final int numberOfSignificantValueDigits; @@ -46,9 +43,8 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator DocValueFormat formatter, Map metadata ) throws IOException { - super(name, context, parent, metadata); + super(name, config, context, parent, metadata); assert config.hasValues(); - this.valuesSource = config.getValuesSource(); this.keyed = keyed; this.format = formatter; this.states = context.bigArrays().newObjectArray(1); @@ -57,26 +53,31 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator } @Override - public ScoreMode scoreMode() { - return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; - } - - @Override - public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(aggCtx.getLeafReaderContext()); + protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, LeafBucketCollector sub) { return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - DoubleHistogram state = getExistingOrNewHistogram(bigArrays(), bucket); - final int valueCount = values.docValueCount(); - for (int i = 0; i < valueCount; i++) { + final DoubleHistogram state = getExistingOrNewHistogram(bigArrays(), bucket); + for (int i = 0; i < values.docValueCount(); i++) { state.recordValue(values.nextValue()); } } } }; + } + @Override + protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) { + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + final DoubleHistogram state = getExistingOrNewHistogram(bigArrays(), bucket); + state.recordValue(values.doubleValue()); + } + } + }; } private DoubleHistogram getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractTDigestPercentilesAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractTDigestPercentilesAggregator.java index 5b58d2e26abfb..9d86f6800c47e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractTDigestPercentilesAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractTDigestPercentilesAggregator.java @@ -8,27 +8,24 @@ package org.elasticsearch.search.aggregations.metrics; -import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; import java.util.Map; -abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggregator.MultiValue { +abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggregator.MultiDoubleValue { protected final double[] keys; - protected final ValuesSource valuesSource; protected final DocValueFormat formatter; protected ObjectArray states; protected final double compression; @@ -47,9 +44,8 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg DocValueFormat formatter, Map metadata ) throws IOException { - super(name, context, parent, metadata); + super(name, config, context, parent, metadata); assert config.hasValues(); - this.valuesSource = config.getValuesSource(); this.keyed = keyed; this.formatter = formatter; this.states = context.bigArrays().newObjectArray(1); @@ -59,22 +55,28 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg } @Override - public ScoreMode scoreMode() { - return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, final LeafBucketCollector sub) { + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + final TDigestState state = getExistingOrNewHistogram(bigArrays(), bucket); + for (int i = 0; i < values.docValueCount(); i++) { + state.add(values.nextValue()); + } + } + } + }; } @Override - public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(aggCtx.getLeafReaderContext()); + protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) { return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - TDigestState state = getExistingOrNewHistogram(bigArrays(), bucket); - final int valueCount = values.docValueCount(); - for (int i = 0; i < valueCount; i++) { - state.add(values.nextValue()); - } + final TDigestState state = getExistingOrNewHistogram(bigArrays(), bucket); + state.add(values.doubleValue()); } } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java index 575108951b899..6588592820547 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java @@ -7,28 +7,24 @@ */ package org.elasticsearch.search.aggregations.metrics; -import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; import java.util.Map; -class AvgAggregator extends NumericMetricsAggregator.SingleValue { - - final ValuesSource.Numeric valuesSource; +class AvgAggregator extends NumericMetricsAggregator.SingleDoubleValue { LongArray counts; DoubleArray sums; @@ -42,9 +38,8 @@ class AvgAggregator extends NumericMetricsAggregator.SingleValue { Aggregator parent, Map metadata ) throws IOException { - super(name, context, parent, metadata); + super(name, valuesSourceConfig, context, parent, metadata); assert valuesSourceConfig.hasValues(); - this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); this.format = valuesSourceConfig.format(); final BigArrays bigArrays = context.bigArrays(); counts = bigArrays.newLongArray(1, true); @@ -53,38 +48,41 @@ class AvgAggregator extends NumericMetricsAggregator.SingleValue { } @Override - public ScoreMode scoreMode() { - return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; - } - - @Override - public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); + protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, final LeafBucketCollector sub) { final CompensatedSum kahanSummation = new CompensatedSum(0, 0); - return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - if (bucket >= counts.size()) { - counts = bigArrays().grow(counts, bucket + 1); - sums = bigArrays().grow(sums, bucket + 1); - compensations = bigArrays().grow(compensations, bucket + 1); - } + maybeGrow(bucket); final int valueCount = values.docValueCount(); counts.increment(bucket, valueCount); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - - kahanSummation.reset(sum, compensation); - + kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); for (int i = 0; i < valueCount; i++) { - double value = values.nextValue(); - kahanSummation.add(value); + kahanSummation.add(values.nextValue()); } + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + } + } + }; + } + @Override + protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) { + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + maybeGrow(bucket); + counts.increment(bucket, 1L); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); + kahanSummation.add(values.doubleValue()); sums.set(bucket, kahanSummation.value()); compensations.set(bucket, kahanSummation.delta()); } @@ -92,6 +90,14 @@ public void collect(int doc, long bucket) throws IOException { }; } + private void maybeGrow(long bucket) { + if (bucket >= counts.size()) { + counts = bigArrays().grow(counts, bucket + 1); + sums = bigArrays().grow(sums, bucket + 1); + compensations = bigArrays().grow(compensations, bucket + 1); + } + } + @Override public double metric(long owningBucketOrd) { if (owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java index 194ec2b641757..3645766f47bdf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java @@ -7,31 +7,28 @@ */ package org.elasticsearch.search.aggregations.metrics; -import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.xcontent.ParseField; import java.io.IOException; import java.util.Map; -class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiValue { +class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue { static final ParseField SIGMA_FIELD = new ParseField("sigma"); - final ValuesSource.Numeric valuesSource; final DocValueFormat format; final double sigma; @@ -51,9 +48,8 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiValue { double sigma, Map metadata ) throws IOException { - super(name, context, parent, metadata); + super(name, config, context, parent, metadata); assert config.hasValues(); - this.valuesSource = (ValuesSource.Numeric) config.getValuesSource(); this.format = config.format(); this.sigma = sigma; final BigArrays bigArrays = context.bigArrays(); @@ -69,13 +65,7 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiValue { } @Override - public ScoreMode scoreMode() { - return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; - } - - @Override - public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); + protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, final LeafBucketCollector sub) { final CompensatedSum compensatedSum = new CompensatedSum(0, 0); final CompensatedSum compensatedSumOfSqr = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @@ -83,32 +73,15 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - if (bucket >= counts.size()) { - final long from = counts.size(); - final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays().resize(counts, overSize); - sums = bigArrays().resize(sums, overSize); - compensations = bigArrays().resize(compensations, overSize); - mins = bigArrays().resize(mins, overSize); - maxes = bigArrays().resize(maxes, overSize); - sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize); - compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize); - mins.fill(from, overSize, Double.POSITIVE_INFINITY); - maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); - } + maybeGrow(bucket); final int valuesCount = values.docValueCount(); counts.increment(bucket, valuesCount); double min = mins.get(bucket); double max = maxes.get(bucket); // Compute the sum and sum of squires for double values with Kahan summation algorithm // which is more accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - compensatedSum.reset(sum, compensation); - - double sumOfSqr = sumOfSqrs.get(bucket); - double compensationOfSqr = compensationOfSqrs.get(bucket); - compensatedSumOfSqr.reset(sumOfSqr, compensationOfSqr); + compensatedSum.reset(sums.get(bucket), compensations.get(bucket)); + compensatedSumOfSqr.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket)); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); @@ -126,10 +99,56 @@ public void collect(int doc, long bucket) throws IOException { maxes.set(bucket, max); } } + }; + } + + @Override + protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) { + final CompensatedSum compensatedSum = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + maybeGrow(bucket); + final double value = values.doubleValue(); + counts.increment(bucket, 1L); + // Compute the sum and sum of squires for double values with Kahan summation algorithm + // which is more accurate than naive summation. + compensatedSum.reset(sums.get(bucket), compensations.get(bucket)); + compensatedSum.add(value); + sums.set(bucket, compensatedSum.value()); + compensations.set(bucket, compensatedSum.delta()); + + compensatedSum.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket)); + compensatedSum.add(value * value); + sumOfSqrs.set(bucket, compensatedSum.value()); + compensationOfSqrs.set(bucket, compensatedSum.delta()); + + mins.set(bucket, Math.min(mins.get(bucket), value)); + maxes.set(bucket, Math.max(maxes.get(bucket), value)); + } + } }; } + private void maybeGrow(long bucket) { + if (bucket >= counts.size()) { + final long from = counts.size(); + final long overSize = BigArrays.overSize(bucket + 1); + counts = bigArrays().resize(counts, overSize); + sums = bigArrays().resize(sums, overSize); + compensations = bigArrays().resize(compensations, overSize); + mins = bigArrays().resize(mins, overSize); + maxes = bigArrays().resize(maxes, overSize); + sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize); + compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize); + mins.fill(from, overSize, Double.POSITIVE_INFINITY); + maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); + } + } + @Override public boolean hasMetric(String name) { return InternalExtendedStats.Metrics.hasMetric(name); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregator.java index 61c2c75a49d7c..4382b07ad5460 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregator.java @@ -8,18 +8,17 @@ package org.elasticsearch.search.aggregations.metrics; -import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; @@ -28,9 +27,8 @@ import static org.elasticsearch.search.aggregations.metrics.InternalMedianAbsoluteDeviation.computeMedianAbsoluteDeviation; -public class MedianAbsoluteDeviationAggregator extends NumericMetricsAggregator.SingleValue { +public class MedianAbsoluteDeviationAggregator extends NumericMetricsAggregator.SingleDoubleValue { - private final ValuesSource.Numeric valuesSource; private final DocValueFormat format; private final double compression; @@ -49,9 +47,8 @@ public class MedianAbsoluteDeviationAggregator extends NumericMetricsAggregator. double compression, TDigestExecutionHint executionHint ) throws IOException { - super(name, context, parent, metadata); + super(name, config, context, parent, metadata); assert config.hasValues(); - this.valuesSource = (ValuesSource.Numeric) config.getValuesSource(); this.format = Objects.requireNonNull(format); this.compression = compression; this.executionHint = executionHint; @@ -72,39 +69,43 @@ public double metric(long owningBucketOrd) { } @Override - public ScoreMode scoreMode() { - if (valuesSource.needsScores()) { - return ScoreMode.COMPLETE; - } else { - return ScoreMode.COMPLETE_NO_SCORES; - } + protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, LeafBucketCollector sub) { + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + final TDigestState valueSketch = getExistingOrNewHistogram(bigArrays(), bucket); + for (int i = 0; i < values.docValueCount(); i++) { + valueSketch.add(values.nextValue()); + } + } + } + }; } @Override - protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); - + protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) { return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - valueSketches = bigArrays().grow(valueSketches, bucket + 1); - - TDigestState valueSketch = valueSketches.get(bucket); - if (valueSketch == null) { - valueSketch = TDigestState.create(compression, executionHint); - valueSketches.set(bucket, valueSketch); - } - final int valueCount = values.docValueCount(); - for (int i = 0; i < valueCount; i++) { - final double value = values.nextValue(); - valueSketch.add(value); - } + final TDigestState valueSketch = getExistingOrNewHistogram(bigArrays(), bucket); + valueSketch.add(values.doubleValue()); } } }; } + private TDigestState getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) { + valueSketches = bigArrays.grow(valueSketches, bucket + 1); + TDigestState state = valueSketches.get(bucket); + if (state == null) { + state = TDigestState.create(compression, executionHint); + valueSketches.set(bucket, state); + } + return state; + } + @Override public InternalAggregation buildAggregation(long bucket) throws IOException { if (hasDataForBucket(bucket)) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java index 7853422daac2c..02c7eb3ceba5d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -7,9 +7,17 @@ */ package org.elasticsearch.search.aggregations.metrics; +import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.Comparators; +import org.elasticsearch.index.fielddata.FieldData; +import org.elasticsearch.index.fielddata.NumericDoubleValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; @@ -42,6 +50,39 @@ public BucketComparator bucketComparator(String key, SortOrder order) { } } + public abstract static class SingleDoubleValue extends SingleValue { + + private final ValuesSource.Numeric valuesSource; + + protected SingleDoubleValue( + String name, + ValuesSourceConfig valuesSourceConfig, + AggregationContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); + } + + @Override + public ScoreMode scoreMode() { + return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public final LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) + throws IOException { + final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); + final NumericDoubleValues singleton = FieldData.unwrapSingleton(values); + return singleton != null ? getLeafCollector(singleton, sub) : getLeafCollector(values, sub); + } + + protected abstract LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, LeafBucketCollector sub); + + protected abstract LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub); + } + public abstract static class MultiValue extends NumericMetricsAggregator { protected MultiValue(String name, AggregationContext context, Aggregator parent, Map metadata) throws IOException { @@ -64,4 +105,37 @@ public BucketComparator bucketComparator(String key, SortOrder order) { return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } + + public abstract static class MultiDoubleValue extends MultiValue { + + private final ValuesSource.Numeric valuesSource; + + protected MultiDoubleValue( + String name, + ValuesSourceConfig valuesSourceConfig, + AggregationContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); + } + + @Override + public ScoreMode scoreMode() { + return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public final LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) + throws IOException { + final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); + final NumericDoubleValues singleton = FieldData.unwrapSingleton(values); + return singleton != null ? getLeafCollector(singleton, sub) : getLeafCollector(values, sub); + } + + protected abstract LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, LeafBucketCollector sub); + + protected abstract LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java index 8f571a95a145f..61e735901cd2f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java @@ -7,28 +7,25 @@ */ package org.elasticsearch.search.aggregations.metrics; -import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; import java.util.Map; -class StatsAggregator extends NumericMetricsAggregator.MultiValue { +class StatsAggregator extends NumericMetricsAggregator.MultiDoubleValue { - final ValuesSource.Numeric valuesSource; final DocValueFormat format; LongArray counts; @@ -39,9 +36,8 @@ class StatsAggregator extends NumericMetricsAggregator.MultiValue { StatsAggregator(String name, ValuesSourceConfig config, AggregationContext context, Aggregator parent, Map metadata) throws IOException { - super(name, context, parent, metadata); + super(name, config, context, parent, metadata); assert config.hasValues(); - this.valuesSource = (ValuesSource.Numeric) config.getValuesSource(); counts = bigArrays().newLongArray(1, true); sums = bigArrays().newDoubleArray(1, true); compensations = bigArrays().newDoubleArray(1, true); @@ -53,40 +49,20 @@ class StatsAggregator extends NumericMetricsAggregator.MultiValue { } @Override - public ScoreMode scoreMode() { - return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; - } - - @Override - public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); + public LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, LeafBucketCollector sub) { final CompensatedSum kahanSummation = new CompensatedSum(0, 0); - return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - if (bucket >= counts.size()) { - final long from = counts.size(); - final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays().resize(counts, overSize); - sums = bigArrays().resize(sums, overSize); - compensations = bigArrays().resize(compensations, overSize); - mins = bigArrays().resize(mins, overSize); - maxes = bigArrays().resize(maxes, overSize); - mins.fill(from, overSize, Double.POSITIVE_INFINITY); - maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); - } + maybeGrow(bucket); final int valuesCount = values.docValueCount(); counts.increment(bucket, valuesCount); double min = mins.get(bucket); double max = maxes.get(bucket); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); - + kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); kahanSummation.add(value); @@ -102,6 +78,43 @@ public void collect(int doc, long bucket) throws IOException { }; } + @Override + public LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) { + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + maybeGrow(bucket); + counts.increment(bucket, 1L); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); + double value = values.doubleValue(); + kahanSummation.add(value); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + mins.set(bucket, Math.min(mins.get(bucket), value)); + maxes.set(bucket, Math.max(maxes.get(bucket), value)); + } + } + }; + } + + private void maybeGrow(long bucket) { + if (bucket >= counts.size()) { + final long from = counts.size(); + final long overSize = BigArrays.overSize(bucket + 1); + counts = bigArrays().resize(counts, overSize); + sums = bigArrays().resize(sums, overSize); + compensations = bigArrays().resize(compensations, overSize); + mins = bigArrays().resize(mins, overSize); + maxes = bigArrays().resize(maxes, overSize); + mins.fill(from, overSize, Double.POSITIVE_INFINITY); + maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); + } + } + @Override public boolean hasMetric(String name) { return InternalStats.Metrics.hasMetric(name); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java index 94d9f311db621..105e52dbc91c7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java @@ -7,26 +7,23 @@ */ package org.elasticsearch.search.aggregations.metrics; -import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.NumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; import java.util.Map; -public class SumAggregator extends NumericMetricsAggregator.SingleValue { +public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue { - private final ValuesSource.Numeric valuesSource; private final DocValueFormat format; private DoubleArray sums; @@ -39,43 +36,46 @@ public class SumAggregator extends NumericMetricsAggregator.SingleValue { Aggregator parent, Map metadata ) throws IOException { - super(name, context, parent, metadata); + super(name, valuesSourceConfig, context, parent, metadata); assert valuesSourceConfig.hasValues(); - this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); this.format = valuesSourceConfig.format(); sums = bigArrays().newDoubleArray(1, true); compensations = bigArrays().newDoubleArray(1, true); } @Override - public ScoreMode scoreMode() { - return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, final LeafBucketCollector sub) { + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + if (values.advanceExact(doc)) { + maybeGrow(bucket); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); + for (int i = 0; i < values.docValueCount(); i++) { + kahanSummation.add(values.nextValue()); + } + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; } @Override - public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException { - final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext()); + protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) { final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { - if (bucket >= sums.size()) { - sums = bigArrays().grow(sums, bucket + 1); - compensations = bigArrays().grow(compensations, bucket + 1); - } - final int valuesCount = values.docValueCount(); + maybeGrow(bucket); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valuesCount; i++) { - double value = values.nextValue(); - kahanSummation.add(value); - } - + kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); + kahanSummation.add(values.doubleValue()); compensations.set(bucket, kahanSummation.delta()); sums.set(bucket, kahanSummation.value()); } @@ -83,6 +83,13 @@ public void collect(int doc, long bucket) throws IOException { }; } + private void maybeGrow(long bucket) { + if (bucket >= sums.size()) { + sums = bigArrays().grow(sums, bucket + 1); + compensations = bigArrays().grow(compensations, bucket + 1); + } + } + @Override public double metric(long owningBucketOrd) { if (owningBucketOrd >= sums.size()) {