Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ WF: Pass a flag from broker to determine operator chain transformation #17443

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
Expand All @@ -44,11 +45,13 @@
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
Expand Down Expand Up @@ -197,6 +200,7 @@ public static DataSourcePlan forDataSource(
return forQuery(
queryKitSpec,
(QueryDataSource) dataSource,
queryContext,
minStageNumber,
broadcast
);
Expand Down Expand Up @@ -234,6 +238,7 @@ public static DataSourcePlan forDataSource(
case SORT_MERGE:
return forSortMergeJoin(
queryKitSpec,
queryContext,
(JoinDataSource) dataSource,
querySegmentSpec,
minStageNumber,
Expand Down Expand Up @@ -413,15 +418,25 @@ private static DataSourcePlan forLookup(
private static DataSourcePlan forQuery(
final QueryKitSpec queryKitSpec,
final QueryDataSource dataSource,
final QueryContext queryContext,
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
final int minStageNumber,
final boolean broadcast
)
{
final Query query = dataSource.getQuery()
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
// This flag is to ensure backward compatibility, as brokers are upgraded after indexers/middlemanagers.
.withOverriddenContext(
ImmutableMap.of(
MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(queryContext)
)
);
final QueryDefinition subQueryDef = queryKitSpec.getQueryKit().makeQueryDefinition(
queryKitSpec,
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
query,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()),
minStageNumber
);
Expand Down Expand Up @@ -630,6 +645,7 @@ private static DataSourcePlan forBroadcastHashJoin(
*/
private static DataSourcePlan forSortMergeJoin(
final QueryKitSpec queryKitSpec,
final QueryContext queryContext,
final JoinDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int minStageNumber,
Expand All @@ -652,6 +668,7 @@ private static DataSourcePlan forSortMergeJoin(
final DataSourcePlan leftPlan = forQuery(
queryKitSpec,
(QueryDataSource) dataSource.getLeft(),
queryContext,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false
);
Expand All @@ -663,6 +680,7 @@ private static DataSourcePlan forSortMergeJoin(
final DataSourcePlan rightPlan = forQuery(
queryKitSpec,
(QueryDataSource) dataSource.getRight(),
queryContext,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,9 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
Expand Down Expand Up @@ -101,7 +97,7 @@ public WindowOperatorQueryFrameProcessor(
this.frameWriterFactory = frameWriterFactory;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList);
this.operatorFactoryList = operatorFactoryList;
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);

this.frameReader = frameReader;
Expand Down Expand Up @@ -403,36 +399,4 @@ private void clearRACBuffers()
resultRowAndCols.clear();
rowId.set(0);
}

/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @return
*/
private List<OperatorFactory> getOperatorFactoryListForStageDefinition(List<OperatorFactory> operatorFactoryListFromQuery)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), this.maxRowsMaterialized));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}

operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
Expand Down Expand Up @@ -172,6 +174,9 @@ public QueryDefinition makeQueryDefinition(
.flatMap(of -> of.getPartitionColumns().stream())
.collect(Collectors.toList());

final List<OperatorFactory> operatorFactories = MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(originalQuery.context())
? getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized)
: operatorList.get(i);

queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
Expand All @@ -181,7 +186,7 @@ public QueryDefinition makeQueryDefinition(
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
operatorList.get(i),
operatorFactories,
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
Expand Down Expand Up @@ -325,4 +330,40 @@ private static RowSignature computeSignatureForFinalWindowStage(RowSignature row
finalWindowClusterBy.getColumns()
);
}

/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @param maxRowsMaterializedInWindow
* @return
*/
private List<OperatorFactory> getTransformedOperatorFactoryListForStageDefinition(
List<OperatorFactory> operatorFactoryListFromQuery,
int maxRowsMaterializedInWindow
)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}

operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ public static MSQSpec makeQuerySpec(
// Add appropriate finalization to native query context.
nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY, finalizeAggregations);

// This flag is to ensure backward compatibility, as brokers are upgraded after indexers/middlemanagers.
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true);

final MSQSpec querySpec =
MSQSpec.builder()
.query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ public class MultiStageQueryContext

public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow";

public static final String WINDOW_FUNCTION_OPERATOR_TRANSFORMATION = "windowFunctionOperatorTransformation";
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved

public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";

/**
Expand All @@ -217,6 +219,14 @@ public static int getMaxRowsMaterializedInWindow(final QueryContext queryContext
);
}

public static boolean isWindowFunctionOperatorTransformationEnabled(final QueryContext queryContext)
{
return queryContext.getBoolean(
WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
false
);
}

public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.put(MSQTaskQueryMaker.USER_KEY, "allowAll")
.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true)
.build();

public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
"sqlStringifyArrays" : false,
"windowFunctionOperatorTransformation" : true
}
}
},
Expand Down Expand Up @@ -201,7 +202,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
"sqlStringifyArrays" : false,
"windowFunctionOperatorTransformation" : true
}
}
},
Expand Down
Loading