Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature flag for sql planning of TimeBoundary queries #12491

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/querying/sql-query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Configure Druid SQL query planning using the parameters in the table below.
|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)|
|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker (default: false)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)|
|`enableTimeBoundaryPlanning`|If true, SQL queries will get converted to TimeBoundary queries wherever possible. TimeBoundary queries are very efficient for min-max calculation on __time column in a datasource |druid.query.default.context.enableTimeBoundaryPlanning on the Broker (default: false)|

## Setting the query context
The query context parameters can be specified as a "context" object in the [JSON API](sql-api.md) or as a [JDBC connection properties object](sql-jdbc.md).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class QueryContexts
public static final String BY_SEGMENT_KEY = "bySegment";
public static final String BROKER_SERVICE_NAME = "brokerService";
public static final String IN_SUB_QUERY_THRESHOLD_KEY = "inSubQueryThreshold";
public static final String TIME_BOUNDARY_PLANNING_KEY = "enableTimeBoundaryPlanning";

public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
Expand All @@ -93,6 +94,7 @@ public class QueryContexts
public static final boolean DEFAULT_SECONDARY_PARTITION_PRUNING = true;
public static final boolean DEFAULT_ENABLE_DEBUG = false;
public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = Integer.MAX_VALUE;
public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false;

@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
Expand Down Expand Up @@ -347,6 +349,11 @@ public static int getInSubQueryThreshold(Map<String, Object> context, int defaul
return parseInt(context, IN_SUB_QUERY_THRESHOLD_KEY, defaultValue);
}

public static boolean isTimeBoundaryPlanningEnabled(Map<String, Object> queryContext)
{
return parseBoolean(queryContext, TIME_BOUNDARY_PLANNING_KEY, DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING);
}

public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ public void testDefaultInSubQueryThreshold()
QueryContexts.getInSubQueryThreshold(ImmutableMap.of()));
}

@Test
public void testDefaultPlanTimeBoundarySql()
{
Assert.assertEquals(QueryContexts.DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING,
QueryContexts.isTimeBoundaryPlanningEnabled(ImmutableMap.of()));
}

@Test
public void testGetEnableJoinLeftScanDirect()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ private Query computeQuery(final QueryFeatureInspector queryFeatureInspector)
}
}

final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery();
final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery(queryFeatureInspector);
if (timeBoundaryQuery != null) {
return timeBoundaryQuery;
}
Expand Down Expand Up @@ -838,9 +838,10 @@ private Query computeQuery(final QueryFeatureInspector queryFeatureInspector)
* @return a TimeBoundaryQuery if possible. null if it is not possible to construct one.
*/
@Nullable
private TimeBoundaryQuery toTimeBoundaryQuery()
private TimeBoundaryQuery toTimeBoundaryQuery(QueryFeatureInspector queryFeatureInspector)
{
if (grouping == null
if (!queryFeatureInspector.feature(QueryFeature.CAN_RUN_TIME_BOUNDARY)
|| grouping == null
|| grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
|| grouping.getHavingFilter() != null
|| selectProjection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.math.expr.Evals;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
Expand Down Expand Up @@ -112,6 +113,8 @@ public boolean feature(QueryFeature feature)
case CAN_READ_EXTERNAL_DATA:
case SCAN_CAN_ORDER_BY_NON_TIME:
return false;
case CAN_RUN_TIME_BOUNDARY:
return QueryContexts.isTimeBoundaryPlanningEnabled(plannerContext.getQueryContext().getMergedParams());
default:
throw new IAE("Unrecognized feature: %s", feature);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public enum QueryFeature
* other than the "__time" column.
*/
SCAN_CAN_ORDER_BY_NON_TIME,

/**
* Queries of type {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} are usable.
*/
CAN_RUN_TIME_BOUNDARY
}
Original file line number Diff line number Diff line change
Expand Up @@ -2388,11 +2388,13 @@ public void testInAggregationSubquery(Map<String, Object> queryContext) throws E
cannotVectorize();
}

Map<String, Object> updatedQueryContext = new HashMap<>(queryContext);
updatedQueryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT DISTINCT __time FROM druid.foo WHERE __time IN (SELECT MAX(__time) FROM druid.foo)",
queryContext,
updatedQueryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
Expand Down Expand Up @@ -2434,11 +2436,13 @@ public void testNotInAggregationSubquery(Map<String, Object> queryContext) throw
// Cannot vectorize JOIN operator.
cannotVectorize();

Map<String, Object> updatedQueryContext = new HashMap<>(queryContext);
updatedQueryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT DISTINCT __time FROM druid.foo WHERE __time NOT IN (SELECT MAX(__time) FROM druid.foo)",
queryContext,
updatedQueryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
Expand Down Expand Up @@ -3568,6 +3572,8 @@ public void testTwoSemiJoinsSimultaneously(Map<String, Object> queryContext) thr
cannotVectorize();
}

Map<String, Object> updatedQueryContext = new HashMap<>(queryContext);
updatedQueryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
Expand All @@ -3576,7 +3582,7 @@ public void testTwoSemiJoinsSimultaneously(Map<String, Object> queryContext) thr
+ "AND __time IN (SELECT MAX(__time) FROM foo WHERE cnt = 1)\n"
+ "AND __time IN (SELECT MAX(__time) FROM foo WHERE cnt <> 2)\n"
+ "GROUP BY 1",
queryContext,
updatedQueryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
Expand Down Expand Up @@ -3628,6 +3634,8 @@ public void testSemiAndAntiJoinSimultaneouslyUsingWhereInSubquery(Map<String, Ob
{
cannotVectorize();

Map<String, Object> updatedQueryContext = new HashMap<>(queryContext);
updatedQueryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
Expand All @@ -3638,7 +3646,7 @@ public void testSemiAndAntiJoinSimultaneouslyUsingWhereInSubquery(Map<String, Ob
+ "AND __time IN (SELECT MAX(__time) FROM foo)\n"
+ "AND __time NOT IN (SELECT MIN(__time) FROM foo)\n"
+ "GROUP BY 1",
queryContext,
updatedQueryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
Expand Down Expand Up @@ -3732,6 +3740,8 @@ public void testSemiAndAntiJoinSimultaneouslyUsingExplicitJoins(Map<String, Obje
{
cannotVectorize();

Map<String, Object> updatedQueryContext = new HashMap<>(queryContext);
updatedQueryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
Expand All @@ -3743,7 +3753,7 @@ public void testSemiAndAntiJoinSimultaneouslyUsingExplicitJoins(Map<String, Obje
+ "LEFT JOIN (SELECT MIN(__time) t FROM foo) t1 on t1.t = foo.__time\n"
+ "WHERE dim1 IN ('abc', 'def') AND t1.t is null\n"
+ "GROUP BY 1",
queryContext,
updatedQueryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand All @@ -38,15 +39,18 @@ public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest
@Test
public void testMaxTimeQuery() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
HashMap<String, Object> queryContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
queryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
HashMap<String, Object> expectedContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
expectedContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT MAX(__time) AS maxTime FROM foo",
queryContext,
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.bound(TimeBoundaryQuery.MAX_TIME)
.context(context)
.context(expectedContext)
.build()
),
ImmutableList.of(new Object[]{DateTimes.of("2001-01-03").getMillis()})
Expand All @@ -56,15 +60,18 @@ public void testMaxTimeQuery() throws Exception
@Test
public void testMinTimeQuery() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
HashMap<String, Object> queryContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
queryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
HashMap<String, Object> expectedContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
expectedContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT MIN(__time) AS minTime FROM foo",
queryContext,
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.bound(TimeBoundaryQuery.MIN_TIME)
.context(context)
.context(expectedContext)
.build()
),
ImmutableList.of(new Object[]{DateTimes.of("2000-01-01").getMillis()})
Expand All @@ -74,10 +81,13 @@ public void testMinTimeQuery() throws Exception
@Test
public void testMinTimeQueryWithFilters() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
HashMap<String, Object> queryContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
queryContext.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
HashMap<String, Object> expectedContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
expectedContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT MIN(__time) AS minTime FROM foo where __time >= '2001-01-01' and __time < '2003-01-01'",
queryContext,
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
Expand All @@ -87,7 +97,7 @@ public void testMinTimeQueryWithFilters() throws Exception
)
)
.bound(TimeBoundaryQuery.MIN_TIME)
.context(context)
.context(expectedContext)
.build()
),
ImmutableList.of(new Object[]{DateTimes.of("2001-01-01").getMillis()})
Expand All @@ -99,8 +109,11 @@ public void testMinTimeQueryWithFilters() throws Exception
@Test
public void testMinMaxTimeQuery() throws Exception
{
HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
testQuery(
"SELECT MIN(__time) AS minTime, MAX(__time) as maxTime FROM foo",
context,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
Expand All @@ -109,7 +122,7 @@ public void testMinMaxTimeQuery() throws Exception
new LongMinAggregatorFactory("a0", "__time"),
new LongMaxAggregatorFactory("a1", "__time")
)
.context(QUERY_CONTEXT_DEFAULT)
.context(context)
.build()
),
ImmutableList.of(new Object[]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public boolean feature(final QueryFeature feature)
// INSERT queries should stick to groupBy, scan.
case CAN_RUN_TIMESERIES:
case CAN_RUN_TOPN:
case CAN_RUN_TIME_BOUNDARY:
return false;

// INSERT uses external data.
Expand Down
3 changes: 3 additions & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ useApproximateCountDistinct
useGroupingSetForExactDistinct
useApproximateTopN
wikipedia
enableTimeBoundaryPlanning
TimeBoundary
druid.query.default.context.enableTimeBoundaryPlanning
IEC
- ../docs/comparisons/druid-vs-elasticsearch.md
100x
Expand Down