From a6bfcc5bfd77df809d667da03050bdf2b8a450a8 Mon Sep 17 00:00:00 2001 From: binlijin Date: Thu, 7 Jan 2016 14:51:00 +0800 Subject: [PATCH 1/3] Allow change minTopNThreshold per topN query --- .../io/druid/query/topn/TopNQueryQueryToolChest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 82649b17aa54..7a53817091e6 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -470,7 +470,7 @@ public QueryRunner> postMergeQueryDecoration(final Query { final ThresholdAdjustingQueryRunner thresholdRunner = new ThresholdAdjustingQueryRunner( runner, - config.getMinTopNThreshold() + config ); return new QueryRunner>() { @@ -535,15 +535,15 @@ public Ordering> getOrdering() private static class ThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; - private final int minTopNThreshold; + private final TopNQueryConfig config; public ThresholdAdjustingQueryRunner( QueryRunner> runner, - int minTopNThreshold + TopNQueryConfig config ) { this.runner = runner; - this.minTopNThreshold = minTopNThreshold; + this.config = config; } @Override @@ -557,6 +557,7 @@ public Sequence> run( } final TopNQuery query = (TopNQuery) input; + final int minTopNThreshold = query.getContextValue("minTopNThreshold", config.getMinTopNThreshold()); if (query.getThreshold() > minTopNThreshold) { return runner.run(query, responseContext); } From 010c6e959c846e1e1c2ff2edd4931d541c74fa32 Mon Sep 17 00:00:00 2001 From: binlijin Date: Thu, 7 Jan 2016 18:01:46 +0800 Subject: [PATCH 2/3] add test --- .../query/topn/TopNQueryQueryToolChest.java | 2 +- .../topn/TopNQueryQueryToolChestTest.java | 80 +++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 7a53817091e6..4cd3e4ca9fde 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -532,7 +532,7 @@ public Ordering> getOrdering() return Ordering.natural(); } - private static class ThresholdAdjustingQueryRunner implements QueryRunner> + static class ThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; private final TopNQueryConfig config; diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 73ff4b4909f2..9d08223b4055 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -22,25 +22,37 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.CacheStrategy; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.TableDataSource; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.TestIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.Map; public class TopNQueryQueryToolChestTest { + private static final String segmentId = "testSegment"; + @Test public void testCacheStrategy() throws Exception { @@ -93,4 +105,72 @@ public void testCacheStrategy() throws Exception Assert.assertEquals(result, fromCacheResult); } + + @Test + public void testMinTopNThreshold() throws Exception + { + TopNQueryConfig config = new TopNQueryConfig(); + final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest( + config, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + QueryRunnerFactory factory = new TopNQueryRunnerFactory( + TestQueryRunners.getPool(), + chest, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner> runner = QueryRunnerTestHelper.makeQueryRunner( + factory, + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId) + ); + + Map context = Maps.newHashMap(); + context.put("minTopNThreshold", 500); + + TopNQueryBuilder builder = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.placementishDimension) + .metric(QueryRunnerTestHelper.indexMetric) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators(QueryRunnerTestHelper.commonAggregators); + + TopNQuery query1 = builder.threshold(10).context(null).build(); + MockQueryRunner mockRunner = new MockQueryRunner(runner); + new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config) + .run(query1, ImmutableMap.of()); + Assert.assertEquals(1000, mockRunner.query.getThreshold()); + + TopNQuery query2 = builder.threshold(10).context(context).build(); + + new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config) + .run(query2, ImmutableMap.of()); + Assert.assertEquals(500, mockRunner.query.getThreshold()); + + TopNQuery query3 = builder.threshold(2000).context(context).build(); + new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config) + .run(query3, ImmutableMap.of()); + Assert.assertEquals(2000, mockRunner.query.getThreshold()); + } + + static class MockQueryRunner implements QueryRunner> + { + private final QueryRunner> runner; + TopNQuery query = null; + + MockQueryRunner(QueryRunner> runner) + { + this.runner = runner; + } + + @Override + public Sequence> run( + Query> query, + Map responseContext + ) + { + this.query = (TopNQuery) query; + return query.run(runner, responseContext); + } + } } From 2751f785f8fb6bdc9a7202605b1814a0d63e3c5d Mon Sep 17 00:00:00 2001 From: binlijin Date: Tue, 12 Jan 2016 11:25:11 +0800 Subject: [PATCH 3/3] add doc --- docs/content/querying/query-context.md | 22 +++++++++++----------- docs/content/querying/topnquery.md | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 1c67e5e3959c..7b4707013c15 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -7,14 +7,14 @@ Query Context The query context is used for various query configuration parameters. -|property |default | description | -|--------------|---------------------|----------------------| -|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. | -|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| -|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | -|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | -|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | -|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | -|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. | - +|property |default | description | +|-----------------|---------------------|----------------------| +|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. | +|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| +|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | +|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | +|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | +|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. | +|minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. | diff --git a/docs/content/querying/topnquery.md b/docs/content/querying/topnquery.md index d7166e4b8c63..e9d967d76057 100644 --- a/docs/content/querying/topnquery.md +++ b/docs/content/querying/topnquery.md @@ -131,7 +131,7 @@ The format of the results would look like so: ### Aliasing The current TopN algorithm is an approximate algorithm. The top 1000 local results from each segment are returned for merging to determine the global topN. As such, the topN algorithm is approximate in both rank and results. Approximate results *ONLY APPLY WHEN THERE ARE MORE THAN 1000 DIM VALUES*. A topN over a dimension with fewer than 1000 unique dimension values can be considered accurate in rank and accurate in aggregates. -The threshold can be modified from it's default 1000 via the server parameter `druid.query.topN.minTopNThreshold` +The threshold can be modified from it's default 1000 via the server parameter `druid.query.topN.minTopNThreshold` which need to restart servers to take effect or set `minTopNThreshold` in query context which take effect per query. If you are wanting the top 100 of a high cardinality, uniformly distributed dimension ordered by some low-cardinality, uniformly distributed dimension, you are potentially going to get aggregates back that are missing data.