Skip to content

Commit

Permalink
Pass metrics object for Scan, Timeseries and GroupBy queries during c…
Browse files Browse the repository at this point in the history
…ursor creation (#12484)

* Pass metrics object for Scan, Timeseries and GroupBy queries during cursor creation

* fixup! Pass metrics object for Scan, Timeseries and GroupBy queries during cursor creation

* Document vectorized dimension
  • Loading branch information
rohangarg authored May 9, 2022
1 parent eb6de94 commit 2dd073c
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 45 deletions.
2 changes: 1 addition & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Metrics may have additional dimensions beyond those listed above.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment.|several hundred milliseconds|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment, vectorized.|several hundred milliseconds|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the Historical process).|id, segment.|several hundred milliseconds|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void testTimeseriesWithDistinctCountAgg() throws Exception
.build();

final Iterable<Result<TimeseriesResultValue>> results =
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
engine.process(query, new IncrementalIndexStorageAdapter(index), new DefaultTimeseriesQueryMetrics()).toList();

List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
new Result<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.segment.VirtualColumns;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
Expand All @@ -46,7 +47,8 @@ public static <T> Sequence<Result<T>> makeCursorBasedQuery(
final VirtualColumns virtualColumns,
final boolean descending,
final Granularity granularity,
final Function<Cursor, Result<T>> mapFn
final Function<Cursor, Result<T>> mapFn,
@Nullable final QueryMetrics<?> queryMetrics
)
{
Preconditions.checkArgument(
Expand All @@ -55,7 +57,7 @@ public static <T> Sequence<Result<T>> makeCursorBasedQuery(

return Sequences.filter(
Sequences.map(
adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, null),
adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, queryMetrics),
mapFn
),
Objects::nonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ public GroupByQueryEngine(
this.intermediateResultsBufferPool = intermediateResultsBufferPool;
}

public Sequence<Row> process(final GroupByQuery query, final StorageAdapter storageAdapter)
public Sequence<Row> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
throw new ISE(
Expand All @@ -112,7 +116,7 @@ public Sequence<Row> process(final GroupByQuery query, final StorageAdapter stor
query.getVirtualColumns(),
query.getGranularity(),
false,
null
groupByQueryMetrics
);

final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class);
}

return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter);
return strategySelector
.strategize((GroupByQuery) query)
.process((GroupByQuery) query, adapter, (GroupByQueryMetrics) queryPlus.getQueryMetrics());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.column.ArrayDoubleGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.ArrayLongGroupByColumnSelectorStrategy;
Expand Down Expand Up @@ -90,7 +91,7 @@
* This code runs on data servers, like Historicals.
*
* Used by
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter)}.
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)}.
*/
public class GroupByQueryEngineV2
{
Expand Down Expand Up @@ -119,7 +120,8 @@ public static Sequence<ResultRow> process(
final GroupByQuery query,
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
final GroupByQueryConfig querySpecificConfig
final GroupByQueryConfig querySpecificConfig,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
Expand Down Expand Up @@ -161,7 +163,8 @@ public static Sequence<ResultRow> process(
fudgeTimestamp,
filter,
interval,
querySpecificConfig
querySpecificConfig,
groupByQueryMetrics
);
} else {
result = processNonVectorized(
Expand All @@ -171,7 +174,8 @@ public static Sequence<ResultRow> process(
fudgeTimestamp,
querySpecificConfig,
filter,
interval
interval,
groupByQueryMetrics
);
}

Expand All @@ -190,7 +194,8 @@ private static Sequence<ResultRow> processNonVectorized(
@Nullable final DateTime fudgeTimestamp,
final GroupByQueryConfig querySpecificConfig,
@Nullable final Filter filter,
final Interval interval
final Interval interval,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
Expand All @@ -199,7 +204,7 @@ private static Sequence<ResultRow> processNonVectorized(
query.getVirtualColumns(),
query.getGranularity(),
false,
null
groupByQueryMetrics
);

return cursors.flatMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
Expand Down Expand Up @@ -128,7 +129,8 @@ public static Sequence<ResultRow> process(
@Nullable final DateTime fudgeTimestamp,
@Nullable final Filter filter,
final Interval interval,
final GroupByQueryConfig config
final GroupByQueryConfig config,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (!canVectorize(query, storageAdapter, filter)) {
Expand All @@ -147,7 +149,7 @@ public CloseableIterator<ResultRow> make()
query.getVirtualColumns(),
false,
QueryContexts.getVectorSize(query),
null
groupByQueryMetrics
);

if (cursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
Expand Down Expand Up @@ -164,7 +165,7 @@ Sequence<ResultRow> processSubtotalsSpec(
/**
* Merge a variety of single-segment query runners into a combined runner. Used by
* {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)} (the runners created
* by that method will be fed into this method).
* <p>
* This method is only called on data servers, like Historicals (not the Broker).
Expand All @@ -187,7 +188,10 @@ Sequence<ResultRow> processSubtotalsSpec(
*
* @return result sequence for the storage adapter
*/
Sequence<ResultRow> process(GroupByQuery query, StorageAdapter storageAdapter);
Sequence<ResultRow> process(
GroupByQuery query,
StorageAdapter storageAdapter,
@Nullable GroupByQueryMetrics groupByQueryMetrics);

/**
* Returns whether this strategy supports pushing down outer queries. This is used by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
Expand All @@ -51,6 +52,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -233,7 +235,8 @@ public Sequence<ResultRow> apply(Interval interval)
outerQuery.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
new IncrementalIndexStorageAdapter(innerQueryResultIndex),
null
);
}
}
Expand Down Expand Up @@ -269,10 +272,14 @@ public QueryRunner<ResultRow> mergeRunners(
}

@Override
public Sequence<ResultRow> process(final GroupByQuery query, final StorageAdapter storageAdapter)
public Sequence<ResultRow> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
return Sequences.map(
engine.process(query, storageAdapter),
engine.process(query, storageAdapter, groupByQueryMetrics),
row -> GroupByQueryHelper.toResultRow(query, row)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
Expand All @@ -73,6 +74,7 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -690,13 +692,18 @@ public QueryRunner<ResultRow> mergeRunners(
}

@Override
public Sequence<ResultRow> process(GroupByQuery query, StorageAdapter storageAdapter)
public Sequence<ResultRow> process(
GroupByQuery query,
StorageAdapter storageAdapter,
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
return GroupByQueryEngineV2.process(
query,
storageAdapter,
bufferPool,
configSupplier.get().withOverrides(query)
configSupplier.get().withOverrides(query),
groupByQueryMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -61,7 +63,8 @@ public class ScanQueryEngine
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
final ResponseContext responseContext
final ResponseContext responseContext,
@Nullable final QueryMetrics<?> queryMetrics
)
{
if (segment.asQueryableIndex() != null && segment.asQueryableIndex().isFromTombstone()) {
Expand Down Expand Up @@ -135,7 +138,7 @@ public Sequence<ScanResultValue> process(
Granularities.ALL,
query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
null
queryMetrics
)
.map(cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Respo
if (timeoutAt == null || timeoutAt == 0L) {
responseContext.putTimeoutTime(JodaUtils.MAX_INSTANT);
}
return engine.process((ScanQuery) query, segment, responseContext);
return engine.process((ScanQuery) query, segment, responseContext, queryPlus.getQueryMetrics());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legac
VirtualColumns.EMPTY,
descending,
Granularities.ALL,
this.skipToFirstMatching
this.skipToFirstMatching,
null
);
final List<Result<DateTime>> resultList = resultSequence.limit(1).toList();
if (resultList.size() > 0) {
Expand Down
Loading

0 comments on commit 2dd073c

Please sign in to comment.