diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index dd442bfbf8b0..eb86c917025e 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -64,7 +64,7 @@ Metrics may have additional dimensions beyond those listed above. |Metric|Description|Dimensions|Normal Value| |------|-----------|----------|------------| |`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s| -|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment.|several hundred milliseconds| +|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment, vectorized.|several hundred milliseconds| |`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds| |`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0| |`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the Historical process).|id, segment.|several hundred milliseconds| diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java index c218004375ca..c5faefc65ca6 100644 --- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java @@ -28,6 +28,7 @@ import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; @@ -100,7 +101,7 @@ public void testTimeseriesWithDistinctCountAgg() throws Exception .build(); final Iterable> results = - engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); + engine.process(query, new IncrementalIndexStorageAdapter(index), new DefaultTimeseriesQueryMetrics()).toList(); List> expectedResults = Collections.singletonList( new Result<>( diff --git a/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java b/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java index 8788778157a1..88b12e81c8b8 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerHelper.java @@ -31,6 +31,7 @@ import org.apache.druid.segment.VirtualColumns; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.List; import java.util.Objects; @@ -46,7 +47,8 @@ public static Sequence> makeCursorBasedQuery( final VirtualColumns virtualColumns, final boolean descending, final Granularity granularity, - final Function> mapFn + final Function> mapFn, + @Nullable final QueryMetrics queryMetrics ) { Preconditions.checkArgument( @@ -55,7 +57,7 @@ public static Sequence> makeCursorBasedQuery( return Sequences.filter( Sequences.map( - adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, null), + adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, queryMetrics), mapFn ), Objects::nonNull diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java index 62a82dc84413..611d0084061d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java @@ -85,7 +85,11 @@ public GroupByQueryEngine( this.intermediateResultsBufferPool = intermediateResultsBufferPool; } - public Sequence process(final GroupByQuery query, final StorageAdapter storageAdapter) + public Sequence process( + final GroupByQuery query, + final StorageAdapter storageAdapter, + @Nullable final GroupByQueryMetrics groupByQueryMetrics + ) { if (storageAdapter == null) { throw new ISE( @@ -112,7 +116,7 @@ public Sequence process(final GroupByQuery query, final StorageAdapter stor query.getVirtualColumns(), query.getGranularity(), false, - null + groupByQueryMetrics ); final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java index 51a7d1a5aa01..6553f4c4eec4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -102,7 +102,9 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class); } - return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter); + return strategySelector + .strategize((GroupByQuery) query) + .process((GroupByQuery) query, adapter, (GroupByQueryMetrics) queryPlus.getQueryMetrics()); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index c6fc53b82bcd..a61a592bb203 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -41,6 +41,7 @@ import org.apache.druid.query.filter.Filter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryMetrics; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.column.ArrayDoubleGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.ArrayLongGroupByColumnSelectorStrategy; @@ -90,7 +91,7 @@ * This code runs on data servers, like Historicals. * * Used by - * {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter)}. + * {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)}. */ public class GroupByQueryEngineV2 { @@ -119,7 +120,8 @@ public static Sequence process( final GroupByQuery query, @Nullable final StorageAdapter storageAdapter, final NonBlockingPool intermediateResultsBufferPool, - final GroupByQueryConfig querySpecificConfig + final GroupByQueryConfig querySpecificConfig, + @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { if (storageAdapter == null) { @@ -161,7 +163,8 @@ public static Sequence process( fudgeTimestamp, filter, interval, - querySpecificConfig + querySpecificConfig, + groupByQueryMetrics ); } else { result = processNonVectorized( @@ -171,7 +174,8 @@ public static Sequence process( fudgeTimestamp, querySpecificConfig, filter, - interval + interval, + groupByQueryMetrics ); } @@ -190,7 +194,8 @@ private static Sequence processNonVectorized( @Nullable final DateTime fudgeTimestamp, final GroupByQueryConfig querySpecificConfig, @Nullable final Filter filter, - final Interval interval + final Interval interval, + @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { final Sequence cursors = storageAdapter.makeCursors( @@ -199,7 +204,7 @@ private static Sequence processNonVectorized( query.getVirtualColumns(), query.getGranularity(), false, - null + groupByQueryMetrics ); return cursors.flatMap( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index f5914a565e06..2c7811c8ff15 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -33,6 +33,7 @@ import org.apache.druid.query.filter.Filter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryMetrics; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.AggregateResult; import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper; @@ -128,7 +129,8 @@ public static Sequence process( @Nullable final DateTime fudgeTimestamp, @Nullable final Filter filter, final Interval interval, - final GroupByQueryConfig config + final GroupByQueryConfig config, + @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { if (!canVectorize(query, storageAdapter, filter)) { @@ -147,7 +149,7 @@ public CloseableIterator make() query.getVirtualColumns(), false, QueryContexts.getVectorSize(query), - null + groupByQueryMetrics ); if (cursor == null) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java index df48c968724f..87e6a3dcac10 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java @@ -27,6 +27,7 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryMetrics; import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.resource.GroupByQueryResource; @@ -164,7 +165,7 @@ Sequence processSubtotalsSpec( /** * Merge a variety of single-segment query runners into a combined runner. Used by * {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In - * that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created + * that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)} (the runners created * by that method will be fed into this method). *

* This method is only called on data servers, like Historicals (not the Broker). @@ -187,7 +188,10 @@ Sequence processSubtotalsSpec( * * @return result sequence for the storage adapter */ - Sequence process(GroupByQuery query, StorageAdapter storageAdapter); + Sequence process( + GroupByQuery query, + StorageAdapter storageAdapter, + @Nullable GroupByQueryMetrics groupByQueryMetrics); /** * Returns whether this strategy supports pushing down outer queries. This is used by diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java index c25335f7ce7e..aeb3d2b8005e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -41,6 +41,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryEngine; import org.apache.druid.query.groupby.GroupByQueryHelper; +import org.apache.druid.query.groupby.GroupByQueryMetrics; import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.orderby.NoopLimitSpec; @@ -51,6 +52,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -233,7 +235,8 @@ public Sequence apply(Interval interval) outerQuery.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(ImmutableList.of(interval)) ), - new IncrementalIndexStorageAdapter(innerQueryResultIndex) + new IncrementalIndexStorageAdapter(innerQueryResultIndex), + null ); } } @@ -269,10 +272,14 @@ public QueryRunner mergeRunners( } @Override - public Sequence process(final GroupByQuery query, final StorageAdapter storageAdapter) + public Sequence process( + final GroupByQuery query, + final StorageAdapter storageAdapter, + @Nullable final GroupByQueryMetrics groupByQueryMetrics + ) { return Sequences.map( - engine.process(query, storageAdapter), + engine.process(query, storageAdapter, groupByQueryMetrics), row -> GroupByQueryHelper.toResultRow(query, row) ); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index f8823d62acc3..abb5cdf65da9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -59,6 +59,7 @@ import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryMetrics; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; @@ -73,6 +74,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.utils.CloseableUtils; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; @@ -690,13 +692,18 @@ public QueryRunner mergeRunners( } @Override - public Sequence process(GroupByQuery query, StorageAdapter storageAdapter) + public Sequence process( + GroupByQuery query, + StorageAdapter storageAdapter, + @Nullable GroupByQueryMetrics groupByQueryMetrics + ) { return GroupByQueryEngineV2.process( query, storageAdapter, bufferPool, - configSupplier.get().withOverrides(query) + configSupplier.get().withOverrides(query), + groupByQueryMetrics ); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index 112fdeaa1ba3..fccc3a2c86a6 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.filter.Filter; @@ -44,6 +45,7 @@ import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -61,7 +63,8 @@ public class ScanQueryEngine public Sequence process( final ScanQuery query, final Segment segment, - final ResponseContext responseContext + final ResponseContext responseContext, + @Nullable final QueryMetrics queryMetrics ) { if (segment.asQueryableIndex() != null && segment.asQueryableIndex().isFromTombstone()) { @@ -135,7 +138,7 @@ public Sequence process( Granularities.ALL, query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) || (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()), - null + queryMetrics ) .map(cursor -> new BaseSequence<>( new BaseSequence.IteratorMaker>() diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 7133241836ce..8aec07679b3a 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -373,7 +373,7 @@ public Sequence run(QueryPlus queryPlus, Respo if (timeoutAt == null || timeoutAt == 0L) { responseContext.putTimeoutTime(JodaUtils.MAX_INSTANT); } - return engine.process((ScanQuery) query, segment, responseContext); + return engine.process((ScanQuery) query, segment, responseContext, queryPlus.getQueryMetrics()); } } } diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 2fe21dafbf42..71a1938edaca 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -120,7 +120,8 @@ private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legac VirtualColumns.EMPTY, descending, Granularities.ALL, - this.skipToFirstMatching + this.skipToFirstMatching, + null ); final List> resultList = resultSequence.limit(1).toList(); if (resultList.size() > 0) { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java index e93cc973c6d7..6981751e121b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java @@ -82,7 +82,11 @@ public TimeseriesQueryEngine( * Run a single-segment, single-interval timeseries query on a particular adapter. The query must have been * scoped down to a single interval before calling this method. */ - public Sequence> process(final TimeseriesQuery query, final StorageAdapter adapter) + public Sequence> process( + final TimeseriesQuery query, + final StorageAdapter adapter, + @Nullable final TimeseriesQueryMetrics timeseriesQueryMetrics + ) { if (adapter == null) { throw new SegmentMissingException( @@ -106,9 +110,9 @@ public Sequence> process(final TimeseriesQuery que final Sequence> result; if (doVectorize) { - result = processVectorized(query, adapter, filter, interval, gran, descending); + result = processVectorized(query, adapter, filter, interval, gran, descending, timeseriesQueryMetrics); } else { - result = processNonVectorized(query, adapter, filter, interval, gran, descending); + result = processNonVectorized(query, adapter, filter, interval, gran, descending, timeseriesQueryMetrics); } final int limit = query.getLimit(); @@ -125,7 +129,8 @@ private Sequence> processVectorized( @Nullable final Filter filter, final Interval queryInterval, final Granularity gran, - final boolean descending + final boolean descending, + final TimeseriesQueryMetrics timeseriesQueryMetrics ) { final boolean skipEmptyBuckets = query.isSkipEmptyBuckets(); @@ -137,7 +142,7 @@ private Sequence> processVectorized( query.getVirtualColumns(), descending, QueryContexts.getVectorSize(query), - null + timeseriesQueryMetrics ); if (cursor == null) { @@ -251,7 +256,8 @@ private Sequence> processNonVectorized( @Nullable final Filter filter, final Interval queryInterval, final Granularity gran, - final boolean descending + final boolean descending, + final TimeseriesQueryMetrics timeseriesQueryMetrics ) { final boolean skipEmptyBuckets = query.isSkipEmptyBuckets(); @@ -299,7 +305,8 @@ private Sequence> processNonVectorized( agg.close(); } } - } + }, + timeseriesQueryMetrics ); } } diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 2d04905b3b29..fe3d420e5662 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -99,7 +99,7 @@ public Sequence> run( throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class); } - return engine.process((TimeseriesQuery) input, adapter); + return engine.process((TimeseriesQuery) input, adapter, (TimeseriesQueryMetrics) queryPlus.getQueryMetrics()); } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java index fb424f8e2193..9a4951cfc4c1 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -262,6 +262,10 @@ public Sequence makeCursors( return Sequences.empty(); } + if (queryMetrics != null) { + queryMetrics.vectorized(false); + } + final Interval dataInterval = new Interval(getMinTime(), gran.bucketEnd(getMaxTime())); if (!interval.overlaps(dataInterval)) { diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java index 355a25c17729..250525ca1b78 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java @@ -31,6 +31,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; +import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; @@ -140,11 +141,12 @@ public void testTimeseriesQuery() ) ); + final DefaultTimeseriesQueryMetrics defaultTimeseriesQueryMetrics = new DefaultTimeseriesQueryMetrics(); final Iterable> iiResults = - engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList(); + engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex), defaultTimeseriesQueryMetrics).toList(); final Iterable> qiResults = - engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList(); + engine.process(query, new QueryableIndexStorageAdapter(queryableIndex), defaultTimeseriesQueryMetrics).toList(); TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index"); TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index"); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java index 235dfdf45023..6c017bab4af0 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java @@ -31,6 +31,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; +import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; @@ -139,11 +140,12 @@ public void testTimeseriesQuery() ) ); + final DefaultTimeseriesQueryMetrics defaultTimeseriesQueryMetrics = new DefaultTimeseriesQueryMetrics(); final Iterable> iiResults = - engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList(); + engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex), defaultTimeseriesQueryMetrics).toList(); final Iterable> qiResults = - engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList(); + engine.process(query, new QueryableIndexStorageAdapter(queryableIndex), defaultTimeseriesQueryMetrics).toList(); TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index"); TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index"); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index bd8e4905aeb8..23cc2cdc2b46 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -50,6 +50,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentResultValue; @@ -207,6 +208,7 @@ public int getNumThreads() private static final Closer RESOURCE_CLOSER = Closer.create(); private final QueryRunner runner; + private final QueryRunner originalRunner; private final GroupByQueryRunnerFactory factory; private final GroupByQueryConfig config; private final boolean vectorize; @@ -449,7 +451,9 @@ public static Collection constructorFeeder() final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize); // Add vectorization tests for any indexes that support it. - if (!vectorize || QueryRunnerTestHelper.isTestRunnerVectorizable(runner)) { + if (!vectorize || + (QueryRunnerTestHelper.isTestRunnerVectorizable(runner) && + config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) { constructors.add(new Object[]{testName, config, factory, runner, vectorize}); } } @@ -476,6 +480,7 @@ public GroupByQueryRunnerTest( this.config = config; this.factory = factory; this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)); + this.originalRunner = runner; String runnerName = runner.toString(); this.vectorize = vectorize; } @@ -752,7 +757,15 @@ public void testGroupBy() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter( + factory, + originalRunner, + query, + serviceEmitter + ); + Assert.assertEquals(1, serviceEmitter.getEvents().size()); + Assert.assertEquals(vectorize, serviceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null)); TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java index 6356a1535a8d..e433a27bee6c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java @@ -22,7 +22,9 @@ import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -54,6 +56,30 @@ public static Iterable runQuery(QueryRunnerFactory factory, QueryRunner r return queryResult.toList(); } + public static Iterable runQueryWithEmitter( + QueryRunnerFactory factory, + QueryRunner runner, + Query query, + ServiceEmitter serviceEmitter + ) + { + MetricsEmittingQueryRunner metricsEmittingQueryRunner = + new MetricsEmittingQueryRunner( + serviceEmitter, + factory.getToolchest(), + runner, + (obj, lng) -> {}, + (metrics) -> {} + ).withWaitMeasuredFromNow(); + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(metricsEmittingQueryRunner)), + toolChest + ); + + return theRunner.run(QueryPlus.wrap(query)).toList(); + } + public static ResultRow createExpectedRow(final GroupByQuery query, final String timestamp, Object... vals) { return createExpectedRow(query, DateTimes.of(timestamp), vals); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index abd3f78f55a4..f0d87c9e36cc 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -36,9 +36,11 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.Druids; +import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -211,7 +213,16 @@ public void testFullOnSelect() .virtualColumns(EXPR_COLUMN) .build(); - Iterable results = runner.run(QueryPlus.wrap(query)).toList(); + StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + MetricsEmittingQueryRunner metricsEmittingQueryRunner = + new MetricsEmittingQueryRunner( + stubServiceEmitter, + TOOL_CHEST, + runner, + (obj, lng) -> {}, + (metrics) -> {} + ).withWaitMeasuredFromNow(); + Iterable results = metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList(); List expectedResults = toExpected( toFullEvents(V_0112_0114), @@ -219,6 +230,8 @@ public void testFullOnSelect() 0, 3 ); + Assert.assertEquals(1, stubServiceEmitter.getEvents().size()); + Assert.assertEquals(false, stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null)); verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); } diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 05848a5e4dd9..74155ce51c54 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -34,8 +34,10 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -220,7 +222,16 @@ public void testFullOnTimeseries() .context(makeContext()) .build(); - Iterable> results = runner.run(QueryPlus.wrap(query)).toList(); + StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + MetricsEmittingQueryRunner> metricsEmittingQueryRunner = + new MetricsEmittingQueryRunner>( + stubServiceEmitter, + new TimeseriesQueryQueryToolChest(), + runner, + (obj, lng) -> {}, + (metrics) -> {} + ).withWaitMeasuredFromNow(); + Iterable> results = metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList(); final String[] expectedIndex = descending ? QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC : @@ -306,6 +317,11 @@ public void testFullOnTimeseries() ++count; } + Assert.assertEquals(1, stubServiceEmitter.getEvents().size()); + Assert.assertEquals( + vectorize, + stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null) + ); Assert.assertEquals(lastResult.toString(), expectedLast, lastResult.getTimestamp()); } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 20812e7b8884..72e1efd50eaa 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -158,7 +158,8 @@ public int getMaxIntermediateRows() .addDimension("sally") .addAggregator(new LongSumAggregatorFactory("cnt", "cnt")) .build(), - new IncrementalIndexStorageAdapter(index) + new IncrementalIndexStorageAdapter(index), + null ); final List results = rows.toList(); @@ -236,7 +237,8 @@ public int getMaxIntermediateRows() ) ) .build(), - new IncrementalIndexStorageAdapter(index) + new IncrementalIndexStorageAdapter(index), + null ); final List results = rows.toList(); @@ -406,7 +408,8 @@ public int getMaxIntermediateRows() .addAggregator(new LongSumAggregatorFactory("cnt", "cnt")) .setDimFilter(DimFilters.dimEquals("sally", (String) null)) .build(), - new IncrementalIndexStorageAdapter(index) + new IncrementalIndexStorageAdapter(index), + null ); final List results = rows.toList();