Skip to content

Commit

Permalink
Merge pull request #2221 from binlijin/topN_minTopNThreshold
Browse files Browse the repository at this point in the history
Allow change minTopNThreshold per topN query
  • Loading branch information
fjy committed Jan 13, 2016
2 parents 01a0715 + 2751f78 commit d7ad93d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
22 changes: 11 additions & 11 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ Query Context

The query context is used for various query configuration parameters.

|property |default | description |
|--------------|---------------------|----------------------|
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |

|property |default | description |
|-----------------|---------------------|----------------------|
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |
|minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. |
2 changes: 1 addition & 1 deletion docs/content/querying/topnquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ The format of the results would look like so:
### Aliasing
The current TopN algorithm is an approximate algorithm. The top 1000 local results from each segment are returned for merging to determine the global topN. As such, the topN algorithm is approximate in both rank and results. Approximate results *ONLY APPLY WHEN THERE ARE MORE THAN 1000 DIM VALUES*. A topN over a dimension with fewer than 1000 unique dimension values can be considered accurate in rank and accurate in aggregates.

The threshold can be modified from it's default 1000 via the server parameter `druid.query.topN.minTopNThreshold`
The threshold can be modified from it's default 1000 via the server parameter `druid.query.topN.minTopNThreshold` which need to restart servers to take effect or set `minTopNThreshold` in query context which take effect per query.

If you are wanting the top 100 of a high cardinality, uniformly distributed dimension ordered by some low-cardinality, uniformly distributed dimension, you are potentially going to get aggregates back that are missing data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public QueryRunner<Result<TopNResultValue>> postMergeQueryDecoration(final Query
{
final ThresholdAdjustingQueryRunner thresholdRunner = new ThresholdAdjustingQueryRunner(
runner,
config.getMinTopNThreshold()
config
);
return new QueryRunner<Result<TopNResultValue>>()
{
Expand Down Expand Up @@ -532,18 +532,18 @@ public Ordering<Result<TopNResultValue>> getOrdering()
return Ordering.natural();
}

private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
private final int minTopNThreshold;
private final TopNQueryConfig config;

public ThresholdAdjustingQueryRunner(
QueryRunner<Result<TopNResultValue>> runner,
int minTopNThreshold
TopNQueryConfig config
)
{
this.runner = runner;
this.minTopNThreshold = minTopNThreshold;
this.config = config;
}

@Override
Expand All @@ -557,6 +557,7 @@ public Sequence<Result<TopNResultValue>> run(
}

final TopNQuery query = (TopNQuery) input;
final int minTopNThreshold = query.getContextValue("minTopNThreshold", config.getMinTopNThreshold());
if (query.getThreshold() > minTopNThreshold) {
return runner.run(query, responseContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,37 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.TestIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.Map;

public class TopNQueryQueryToolChestTest
{

private static final String segmentId = "testSegment";

@Test
public void testCacheStrategy() throws Exception
{
Expand Down Expand Up @@ -93,4 +105,72 @@ public void testCacheStrategy() throws Exception

Assert.assertEquals(result, fromCacheResult);
}

@Test
public void testMinTopNThreshold() throws Exception
{
TopNQueryConfig config = new TopNQueryConfig();
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
config,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
chest,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
);

Map<String, Object> context = Maps.newHashMap();
context.put("minTopNThreshold", 500);

TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.placementishDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(QueryRunnerTestHelper.commonAggregators);

TopNQuery query1 = builder.threshold(10).context(null).build();
MockQueryRunner mockRunner = new MockQueryRunner(runner);
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
.run(query1, ImmutableMap.<String, Object>of());
Assert.assertEquals(1000, mockRunner.query.getThreshold());

TopNQuery query2 = builder.threshold(10).context(context).build();

new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
.run(query2, ImmutableMap.<String, Object>of());
Assert.assertEquals(500, mockRunner.query.getThreshold());

TopNQuery query3 = builder.threshold(2000).context(context).build();
new TopNQueryQueryToolChest.ThresholdAdjustingQueryRunner(mockRunner, config)
.run(query3, ImmutableMap.<String, Object>of());
Assert.assertEquals(2000, mockRunner.query.getThreshold());
}

static class MockQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
TopNQuery query = null;

MockQueryRunner(QueryRunner<Result<TopNResultValue>> runner)
{
this.runner = runner;
}

@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> query,
Map<String, Object> responseContext
)
{
this.query = (TopNQuery) query;
return query.run(runner, responseContext);
}
}
}

0 comments on commit d7ad93d

Please sign in to comment.