Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass metrics object for Scan, Timeseries and GroupBy queries during cursor creation #12484

Merged
merged 3 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Metrics may have additional dimensions beyond those listed above.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment.|several hundred milliseconds|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment, vectorized.|several hundred milliseconds|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the Historical process).|id, segment.|several hundred milliseconds|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void testTimeseriesWithDistinctCountAgg() throws Exception
.build();

final Iterable<Result<TimeseriesResultValue>> results =
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
engine.process(query, new IncrementalIndexStorageAdapter(index), new DefaultTimeseriesQueryMetrics()).toList();
Copy link
Contributor

@suneet-s suneet-s May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you verify the metrics returned by calling engine.process(..)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the verification is done in *QueryRunnerTests for all the query types. This is mostly to exercise the metrics code more


List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
new Result<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.segment.VirtualColumns;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
Expand All @@ -46,7 +47,8 @@ public static <T> Sequence<Result<T>> makeCursorBasedQuery(
final VirtualColumns virtualColumns,
final boolean descending,
final Granularity granularity,
final Function<Cursor, Result<T>> mapFn
final Function<Cursor, Result<T>> mapFn,
@Nullable final QueryMetrics<?> queryMetrics
)
{
Preconditions.checkArgument(
Expand All @@ -55,7 +57,7 @@ public static <T> Sequence<Result<T>> makeCursorBasedQuery(

return Sequences.filter(
Sequences.map(
adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, null),
adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, queryMetrics),
mapFn
),
Objects::nonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ public GroupByQueryEngine(
this.intermediateResultsBufferPool = intermediateResultsBufferPool;
}

public Sequence<Row> process(final GroupByQuery query, final StorageAdapter storageAdapter)
public Sequence<Row> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
throw new ISE(
Expand All @@ -112,7 +116,7 @@ public Sequence<Row> process(final GroupByQuery query, final StorageAdapter stor
query.getVirtualColumns(),
query.getGranularity(),
false,
null
groupByQueryMetrics
);

final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class);
}

return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter);
return strategySelector
.strategize((GroupByQuery) query)
.process((GroupByQuery) query, adapter, (GroupByQueryMetrics) queryPlus.getQueryMetrics());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.column.ArrayDoubleGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.ArrayLongGroupByColumnSelectorStrategy;
Expand Down Expand Up @@ -90,7 +91,7 @@
* This code runs on data servers, like Historicals.
*
* Used by
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter)}.
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)}.
*/
public class GroupByQueryEngineV2
{
Expand Down Expand Up @@ -119,7 +120,8 @@ public static Sequence<ResultRow> process(
final GroupByQuery query,
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
final GroupByQueryConfig querySpecificConfig
final GroupByQueryConfig querySpecificConfig,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
Expand Down Expand Up @@ -161,7 +163,8 @@ public static Sequence<ResultRow> process(
fudgeTimestamp,
filter,
interval,
querySpecificConfig
querySpecificConfig,
groupByQueryMetrics
);
} else {
result = processNonVectorized(
Expand All @@ -171,7 +174,8 @@ public static Sequence<ResultRow> process(
fudgeTimestamp,
querySpecificConfig,
filter,
interval
interval,
groupByQueryMetrics
);
}

Expand All @@ -190,7 +194,8 @@ private static Sequence<ResultRow> processNonVectorized(
@Nullable final DateTime fudgeTimestamp,
final GroupByQueryConfig querySpecificConfig,
@Nullable final Filter filter,
final Interval interval
final Interval interval,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
Expand All @@ -199,7 +204,7 @@ private static Sequence<ResultRow> processNonVectorized(
query.getVirtualColumns(),
query.getGranularity(),
false,
null
groupByQueryMetrics
);

return cursors.flatMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
Expand Down Expand Up @@ -128,7 +129,8 @@ public static Sequence<ResultRow> process(
@Nullable final DateTime fudgeTimestamp,
@Nullable final Filter filter,
final Interval interval,
final GroupByQueryConfig config
final GroupByQueryConfig config,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (!canVectorize(query, storageAdapter, filter)) {
Expand All @@ -147,7 +149,7 @@ public CloseableIterator<ResultRow> make()
query.getVirtualColumns(),
false,
QueryContexts.getVectorSize(query),
null
groupByQueryMetrics
);

if (cursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
Expand Down Expand Up @@ -164,7 +165,7 @@ Sequence<ResultRow> processSubtotalsSpec(
/**
* Merge a variety of single-segment query runners into a combined runner. Used by
* {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)} (the runners created
* by that method will be fed into this method).
* <p>
* This method is only called on data servers, like Historicals (not the Broker).
Expand All @@ -187,7 +188,10 @@ Sequence<ResultRow> processSubtotalsSpec(
*
* @return result sequence for the storage adapter
*/
Sequence<ResultRow> process(GroupByQuery query, StorageAdapter storageAdapter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to wire in the GroupByQueryMetrics in the processSubqueryResult, processSubtotalsSpec, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do it in those methods since they take the subquery/subtotal result sequence in the method. I think the inlined subquery should execute using a separate query object and hence should emit their own metrics object. For pushed down subqueries, I'm not sure about their metrics framework as of now but my guess is that it should use the same metrics object as the outer one. will check and update

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the inlined subquery should execute using a separate query object and hence should emit their own metrics object.

Is there a test we can implement easily to validate this? If so, that would be great!

Sequence<ResultRow> process(
GroupByQuery query,
StorageAdapter storageAdapter,
@Nullable GroupByQueryMetrics groupByQueryMetrics);

/**
* Returns whether this strategy supports pushing down outer queries. This is used by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
Expand All @@ -51,6 +52,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -233,7 +235,8 @@ public Sequence<ResultRow> apply(Interval interval)
outerQuery.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
new IncrementalIndexStorageAdapter(innerQueryResultIndex),
null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not pass in queryMetrics here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I passed null here since passing the query metrics needed changes in the interface - and since this execution is synthetic over the inner query results and would always be non-vectorized (due to the storage adapter used) I skipped it. If you think, I can try to make that change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think passing this object around would be useful in the future for adding other dimensions.

For example, let's say we wanted to know how many group by queries are executed using the v1 strategy vs the v2 strategy, then making the interface changes to pass in the queryMetrics object could be useful.

Since the scope of this PR is just to add better support for seeing whether or not queries are vectorized, I think it isn't needed to do this as part of this change, but it could be done in a future change

);
}
}
Expand Down Expand Up @@ -269,10 +272,14 @@ public QueryRunner<ResultRow> mergeRunners(
}

@Override
public Sequence<ResultRow> process(final GroupByQuery query, final StorageAdapter storageAdapter)
public Sequence<ResultRow> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
return Sequences.map(
engine.process(query, storageAdapter),
engine.process(query, storageAdapter, groupByQueryMetrics),
row -> GroupByQueryHelper.toResultRow(query, row)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
Expand All @@ -73,6 +74,7 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -690,13 +692,18 @@ public QueryRunner<ResultRow> mergeRunners(
}

@Override
public Sequence<ResultRow> process(GroupByQuery query, StorageAdapter storageAdapter)
public Sequence<ResultRow> process(
GroupByQuery query,
StorageAdapter storageAdapter,
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
return GroupByQueryEngineV2.process(
query,
storageAdapter,
bufferPool,
configSupplier.get().withOverrides(query)
configSupplier.get().withOverrides(query),
groupByQueryMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -61,7 +63,8 @@ public class ScanQueryEngine
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
final ResponseContext responseContext
final ResponseContext responseContext,
@Nullable final QueryMetrics<?> queryMetrics
)
{
if (segment.asQueryableIndex() != null && segment.asQueryableIndex().isFromTombstone()) {
Expand Down Expand Up @@ -135,7 +138,7 @@ public Sequence<ScanResultValue> process(
Granularities.ALL,
query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
null
queryMetrics
)
.map(cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Respo
if (timeoutAt == null || timeoutAt == 0L) {
responseContext.putTimeoutTime(JodaUtils.MAX_INSTANT);
}
return engine.process((ScanQuery) query, segment, responseContext);
return engine.process((ScanQuery) query, segment, responseContext, queryPlus.getQueryMetrics());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legac
VirtualColumns.EMPTY,
descending,
Granularities.ALL,
this.skipToFirstMatching
this.skipToFirstMatching,
null
);
final List<Result<DateTime>> resultList = resultSequence.limit(1).toList();
if (resultList.size() > 0) {
Expand Down
Loading