diff --git a/velox/substrait/CMakeLists.txt b/velox/substrait/CMakeLists.txt index 110edc9d7de6..86260389254e 100644 --- a/velox/substrait/CMakeLists.txt +++ b/velox/substrait/CMakeLists.txt @@ -41,16 +41,8 @@ add_custom_command( add_custom_target(substrait_proto ALL DEPENDS ${PROTO_OUTPUT_FILES}) add_dependencies(substrait_proto protobuf::libprotobuf) -set(SRCS - ${PROTO_SRCS} - SubstraitParser.cpp - SubstraitToVeloxExpr.cpp - SubstraitToVeloxPlan.cpp - TypeUtils.cpp - VeloxToSubstraitExpr.cpp - VeloxToSubstraitPlan.cpp - VeloxToSubstraitType.cpp) - +set(SRCS ${PROTO_SRCS} SubstraitUtils.cpp SubstraitToVeloxPlanValidator.cpp + SubstraitToVeloxExpr.cpp SubstraitToVeloxPlan.cpp TypeUtils.cpp) add_library(velox_substrait_plan_converter ${SRCS}) target_include_directories(velox_substrait_plan_converter PUBLIC ${PROTO_OUTPUT_DIR}) diff --git a/velox/substrait/SubstraitParser.cpp b/velox/substrait/SubstraitParser.cpp index 53b60cae5e24..e9a7af7e0d08 100644 --- a/velox/substrait/SubstraitParser.cpp +++ b/velox/substrait/SubstraitParser.cpp @@ -27,7 +27,7 @@ std::shared_ptr SubstraitParser::parseType( switch (substraitType.kind_case()) { case ::substrait::Type::KindCase::kBool: { typeName = "BOOLEAN"; - nullability = substraitType.bool_().nullability(); + nullability = sType.bool_().nullability(); break; } case ::substrait::Type::KindCase::kI8: { @@ -202,7 +202,7 @@ int SubstraitParser::getIdxFromNodeName(const std::string& nodeName) { } } -const std::string& SubstraitParser::findFunctionSpec( +std::string SubstraitParser::findSubstraitFuncSpec( const std::unordered_map& functionMap, uint64_t id) const { if (functionMap.find(id) == functionMap.end()) { @@ -248,6 +248,30 @@ void SubstraitParser::getFunctionTypes( types.emplace_back(funcTypes); } +void SubstraitParser::getSubFunctionTypes( + const std::string& subFuncSpec, + std::vector& types) const { + // Get the position of ":" in the function name. + std::size_t pos = subFuncSpec.find(":"); + // Get the parameter types. + std::string funcTypes; + if (pos == std::string::npos) { + funcTypes = subFuncSpec; + } else { + if (pos == subFuncSpec.size() - 1) { + return; + } + funcTypes = subFuncSpec.substr(pos + 1); + } + // Split the types with delimiter. + std::string delimiter = "_"; + while ((pos = funcTypes.find(delimiter)) != std::string::npos) { + types.emplace_back(funcTypes.substr(0, pos)); + funcTypes.erase(0, pos + delimiter.length()); + } + types.emplace_back(funcTypes); +} + std::string SubstraitParser::findVeloxFunction( const std::unordered_map& functionMap, uint64_t id) const { @@ -257,8 +281,8 @@ std::string SubstraitParser::findVeloxFunction( } std::string SubstraitParser::mapToVeloxFunction( - const std::string& substraitFunction) const { - auto it = substraitVeloxFunctionMap_.find(substraitFunction); + const std::string& subFunc) const { + auto it = substraitVeloxFunctionMap_.find(subFunc); if (it != substraitVeloxFunctionMap_.end()) { return it->second; } diff --git a/velox/substrait/SubstraitParser.h b/velox/substrait/SubstraitParser.h index e82020e79eb6..c0ff587d1a9f 100644 --- a/velox/substrait/SubstraitParser.h +++ b/velox/substrait/SubstraitParser.h @@ -55,11 +55,10 @@ class SubstraitParser { /// Make node name in the format of n{nodeId}_{colIdx}. std::string makeNodeName(int nodeId, int colIdx); - /// Get the column index from a node name in the format of - /// n{nodeId}_{colIdx}. + /// Used to get the column index from node name. int getIdxFromNodeName(const std::string& nodeName); - /// Find the Substrait function name according to the function id + /// Used to find the Substrait function name according to the function id /// from a pre-constructed function map. The function specification can be /// a simple name or a compound name. The compound name format is: /// :__..._. @@ -79,7 +78,12 @@ class SubstraitParser { const std::string& functionSpec, std::vector& types) const; - /// Find the Velox function name according to the function id + /// This function is used get the types from the compound name. + void getSubFunctionTypes( + const std::string& subFuncSpec, + std::vector& types) const; + + /// Used to find the Velox function name according to the function id /// from a pre-constructed function map. std::string findVeloxFunction( const std::unordered_map& functionMap, diff --git a/velox/substrait/SubstraitToVeloxExpr.cpp b/velox/substrait/SubstraitToVeloxExpr.cpp index fee7c43f4478..9056f45f1b8b 100644 --- a/velox/substrait/SubstraitToVeloxExpr.cpp +++ b/velox/substrait/SubstraitToVeloxExpr.cpp @@ -27,8 +27,8 @@ SubstraitVeloxExprConverter::toVeloxExpr( switch (typeCase) { case ::substrait::Expression::FieldReference::ReferenceTypeCase:: kDirectReference: { - const auto& directRef = substraitField.direct_reference(); - int32_t colIdx = substraitParser_.parseReferenceSegment(directRef); + const auto& dRef = sField.direct_reference(); + int32_t colIdx = subParser_->parseReferenceSegment(dRef); const auto& inputTypes = inputType->children(); const auto& inputNames = inputType->names(); @@ -59,10 +59,18 @@ SubstraitVeloxExprConverter::toVeloxExpr( for (const auto& sArg : substraitFunc.args()) { params.emplace_back(toVeloxExpr(sArg, inputType)); } - const auto& veloxFunction = substraitParser_.findVeloxFunction( - functionMap_, substraitFunc.function_reference()); - const auto& veloxType = toVeloxType( - substraitParser_.parseType(substraitFunc.output_type())->type); + const auto& veloxFunction = + subParser_->findVeloxFunction(functionMap_, sFunc.function_reference()); + const auto& veloxType = + toVeloxType(subParser_->parseType(sFunc.output_type())->type); + + // Omit alias because because name change is not needed. + if (veloxFunction == "alias") { + if (params.size() != 1) { + VELOX_FAIL("Alias expects one parameter."); + } + return params[0]; + } return std::make_shared( veloxType, std::move(params), veloxFunction); @@ -74,17 +82,13 @@ SubstraitVeloxExprConverter::toVeloxExpr( auto typeCase = substraitLit.literal_type_case(); switch (typeCase) { case ::substrait::Expression_Literal::LiteralTypeCase::kBoolean: - return std::make_shared( - variant(substraitLit.boolean())); + return std::make_shared(variant(sLit.boolean())); case ::substrait::Expression_Literal::LiteralTypeCase::kI32: - return std::make_shared( - variant(substraitLit.i32())); + return std::make_shared(variant(sLit.i32())); case ::substrait::Expression_Literal::LiteralTypeCase::kI64: - return std::make_shared( - variant(substraitLit.i64())); + return std::make_shared(variant(sLit.i64())); case ::substrait::Expression_Literal::LiteralTypeCase::kFp64: - return std::make_shared( - variant(substraitLit.fp64())); + return std::make_shared(variant(sLit.fp64())); case ::substrait::Expression_Literal::LiteralTypeCase::kNull: { auto veloxType = toVeloxType(substraitParser_.parseType(substraitLit.null())->type); diff --git a/velox/substrait/SubstraitToVeloxExpr.h b/velox/substrait/SubstraitToVeloxExpr.h index 4f310262edcb..1bf195b77a6e 100644 --- a/velox/substrait/SubstraitToVeloxExpr.h +++ b/velox/substrait/SubstraitToVeloxExpr.h @@ -28,7 +28,7 @@ class SubstraitVeloxExprConverter { /// subParser: A Substrait parser used to convert Substrait representations /// into recognizable representations. functionMap: A pre-constructed map /// storing the relations between the function id and the function name. - explicit SubstraitVeloxExprConverter( + SubstraitVeloxExprConverter( const std::unordered_map& functionMap) : functionMap_(functionMap) {} @@ -59,7 +59,8 @@ class SubstraitVeloxExprConverter { private: /// The Substrait parser used to convert Substrait representations into /// recognizable representations. - SubstraitParser substraitParser_; + std::shared_ptr subParser_ = + std::make_shared(); /// The map storing the relations between the function id and the function /// name. diff --git a/velox/substrait/SubstraitToVeloxPlan.cpp b/velox/substrait/SubstraitToVeloxPlan.cpp index 7ccebd79a913..97825e1baf38 100644 --- a/velox/substrait/SubstraitToVeloxPlan.cpp +++ b/velox/substrait/SubstraitToVeloxPlan.cpp @@ -84,34 +84,38 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } else { VELOX_FAIL("Child Rel is expected in AggregateRel."); } + core::AggregationNode::Step aggStep; + // Get aggregation phase and check if there are input columns need to be + // combined into row. + if (needsRowConstruct(sAgg, aggStep)) { + return toVeloxAggWithRowConstruct(sAgg, childNode, aggStep); + } + return toVeloxAgg(sAgg, childNode, aggStep); +} - // Construct Velox grouping expressions. - auto inputTypes = childNode->outputType(); - std::vector veloxGroupingExprs; - - const auto& groupings = aggRel.groupings(); - int inputPlanNodeId = planNodeId_ - 1; - // The index of output column. - int outIdx = 0; +std::shared_ptr +SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( + const ::substrait::AggregateRel& sAgg, + const std::shared_ptr& childNode, + const core::AggregationNode::Step& aggStep) { + // Will add a Project node before Aggregate node to combine columns into row. + std::vector> constructExprs; + const auto& groupings = sAgg.groupings(); + const auto& constructInputType = childNode->outputType(); + + // Handle groupings. + uint32_t groupingOutIdx = 0; for (const auto& grouping : groupings) { - auto groupingExprs = grouping.grouping_expressions(); + const auto& groupingExprs = grouping.grouping_expressions(); for (const auto& groupingExpr : groupingExprs) { - // Velox's groupings are limited to be Field, so groupingExpr is - // expected to be FieldReference. - auto fieldExpr = - exprConverter_->toVeloxExpr(groupingExpr.selection(), inputTypes); - veloxGroupingExprs.emplace_back(fieldExpr); - outIdx += 1; + // Velox's groupings are limited to be Field. + auto fieldExpr = exprConverter_->toVeloxExpr( + groupingExpr.selection(), constructInputType); + constructExprs.push_back(fieldExpr); + groupingOutIdx += 1; } } - // Parse measures to get Aggregation phase and expressions. - bool phaseInited = false; - core::AggregationNode::Step aggStep; - // Project expressions are used to conduct a pre-projection before - // Aggregation if needed. - std::vector projectExprs; - std::vector projectOutNames; std::vector aggExprs; auto aggMeasureSize = aggRel.measures().size(); aggExprs.reserve(aggMeasureSize); @@ -134,116 +138,188 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } aggregateMasks.push_back(aggregateMask); } - - auto aggFunction = measure.measure(); - // Get the params of this Aggregate function. - std::vector aggParams; - auto args = aggFunction.args(); - aggParams.reserve(args.size()); - for (auto arg : args) { - auto typeCase = arg.rex_type_case(); - switch (typeCase) { - case ::substrait::Expression::RexTypeCase::kSelection: { - aggParams.emplace_back( - exprConverter_->toVeloxExpr(arg.selection(), inputTypes)); - break; + // Handle aggregations. + std::vector aggFuncNames; + aggFuncNames.reserve(sAgg.measures().size()); + std::vector aggOutTypes; + aggOutTypes.reserve(sAgg.measures().size()); + + for (const auto& smea : sAgg.measures()) { + const auto& aggFunction = smea.measure(); + std::string funcName = subParser_->findVeloxFunction( + functionMap_, aggFunction.function_reference()); + aggFuncNames.emplace_back(funcName); + aggOutTypes.emplace_back( + toVeloxType(subParser_->parseType(aggFunction.output_type())->type)); + if (funcName == "avg") { + // Will use row constructor to combine the sum and count columns into + // row. + if (aggFunction.args().size() != 2) { + VELOX_FAIL("Final average should have two args."); } - case ::substrait::Expression::RexTypeCase::kCast: { + std::vector> aggParams; + aggParams.reserve(aggFunction.args().size()); + for (const auto& arg : aggFunction.args()) { aggParams.emplace_back( - exprConverter_->toVeloxExpr(arg.cast(), inputTypes)); - break; + exprConverter_->toVeloxExpr(arg, constructInputType)); } - case ::substrait::Expression::RexTypeCase::kLiteral: { - aggParams.emplace_back(exprConverter_->toVeloxExpr(arg.literal())); - break; + auto constructExpr = std::make_shared( + ROW({"sum", "count"}, {DOUBLE(), BIGINT()}), + std::move(aggParams), + "row_constructor"); + constructExprs.emplace_back(constructExpr); + } else { + if (aggFunction.args().size() != 1) { + VELOX_FAIL("Expect only one arg."); } - case ::substrait::Expression::RexTypeCase::kScalarFunction: { - // Pre-projection is needed before Aggregate. - // The input of Aggregatation will be the output of the - // pre-projection. - auto sFunc = arg.scalar_function(); - projectExprs.emplace_back( - exprConverter_->toVeloxExpr(sFunc, inputTypes)); - auto colOutName = substraitParser_->makeNodeName(planNodeId_, outIdx); - projectOutNames.emplace_back(colOutName); - auto outType = substraitParser_->parseType(sFunc.output_type()); - auto aggInputParam = - std::make_shared( - toVeloxType(outType->type), colOutName); - aggParams.emplace_back(aggInputParam); - break; + for (const auto& arg : aggFunction.args()) { + constructExprs.emplace_back( + exprConverter_->toVeloxExpr(arg, constructInputType)); } - default: - VELOX_NYI( - "Substrait conversion not supported for arg type '{}'", typeCase); } } - auto funcId = aggFunction.function_reference(); - auto funcName = substraitParser_->findVeloxFunction(functionMap_, funcId); - auto aggOutType = substraitParser_->parseType(aggFunction.output_type()); - auto aggExpr = std::make_shared( - toVeloxType(aggOutType->type), std::move(aggParams), funcName); - aggExprs.emplace_back(aggExpr); - - // Initialize the Aggregate Step. - if (!phaseInited) { - auto phase = aggFunction.phase(); - switch (phase) { - case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE: - aggStep = core::AggregationNode::Step::kPartial; - break; - case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE: - aggStep = core::AggregationNode::Step::kIntermediate; - break; - case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT: - aggStep = core::AggregationNode::Step::kFinal; - break; - case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT: - aggStep = core::AggregationNode::Step::kSingle; - break; - default: - VELOX_NYI("Substrait conversion not supported for phase '{}'", phase); - } - phaseInited = true; + + // Get the output names of row construct. + std::vector constructOutNames; + constructOutNames.reserve(constructExprs.size()); + for (uint32_t colIdx = 0; colIdx < constructExprs.size(); colIdx++) { + constructOutNames.emplace_back( + subParser_->makeNodeName(planNodeId_, colIdx)); } - outIdx += 1; - } - // Construct the Aggregate Node. - bool ignoreNullKeys = false; - std::vector preGroupingExprs; - if (projectOutNames.size() == 0) { - // Conduct Aggregation directly. + uint32_t totalOutColNum = constructExprs.size(); + // Create the row construct node. + auto constructNode = std::make_shared( + nextPlanNodeId(), + std::move(constructOutNames), + std::move(constructExprs), + childNode); + + // Create the Aggregation node. + bool ignoreNullKeys = false; + std::vector> + aggregateMasks(totalOutColNum - groupingOutIdx); + std::vector> + preGroupingExprs = {}; + + // Get the output names of Aggregate node. std::vector aggOutNames; - aggOutNames.reserve(aggMeasureSize); - for (int idx = 0; idx < aggMeasureSize; idx++) { - aggOutNames.emplace_back( - substraitParser_->makeNodeName(planNodeId_, idx)); + aggOutNames.reserve(totalOutColNum - groupingOutIdx); + for (uint32_t idx = groupingOutIdx; idx < totalOutColNum; idx++) { + aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); + } + + // Get the Aggregate expressions. + std::vector> aggExprs; + aggExprs.reserve(totalOutColNum - groupingOutIdx); + const auto& constructOutType = constructNode->outputType(); + for (uint32_t colIdx = groupingOutIdx; colIdx < totalOutColNum; colIdx++) { + std::vector> aggArgs; + aggArgs.reserve(1); + // Use the colIdx to access the columns after grouping columns. + aggArgs.emplace_back(std::make_shared( + constructOutType->childAt(colIdx), + constructOutType->names()[colIdx])); + // Use the another index to access the types and names of aggregation + // columns. + aggExprs.emplace_back(std::make_shared( + aggOutTypes[colIdx - groupingOutIdx], + std::move(aggArgs), + aggFuncNames[colIdx - groupingOutIdx])); } - return std::make_shared( + + // Get the grouping expressions. + std::vector> + groupingExprs; + groupingExprs.reserve(groupingOutIdx); + for (uint32_t colIdx = 0; colIdx < groupingOutIdx; colIdx++) { + // Velox's groupings are limited to be Field. + groupingExprs.emplace_back( + std::make_shared( + constructOutType->childAt(colIdx), + constructOutType->names()[colIdx])); + } + + // Create the Aggregation node. + auto aggNode = std::make_shared( nextPlanNodeId(), aggStep, - veloxGroupingExprs, + groupingExprs, preGroupingExprs, aggOutNames, aggExprs, aggregateMasks, ignoreNullKeys, - childNode); - } else { - // A Project Node is needed before Aggregation. - auto projectNode = std::make_shared( - nextPlanNodeId(), - std::move(projectOutNames), - std::move(projectExprs), - childNode); + constructNode); + return aggNode; + } + + std::shared_ptr SubstraitVeloxPlanConverter::toVeloxAgg( + const ::substrait::AggregateRel& sAgg, + const std::shared_ptr& childNode, + const core::AggregationNode::Step& aggStep) { + const auto& inputType = childNode->outputType(); + std::vector> + veloxGroupingExprs; + + // Get the grouping expressions. + uint32_t groupingOutIdx = 0; + for (const auto& grouping : sAgg.groupings()) { + for (const auto& groupingExpr : grouping.grouping_expressions()) { + // Velox's groupings are limited to be Field. + veloxGroupingExprs.emplace_back( + exprConverter_->toVeloxExpr(groupingExpr.selection(), inputType)); + groupingOutIdx += 1; + } + } + + // Parse measures and get the aggregate expressions. + uint32_t aggOutIdx = groupingOutIdx; + std::vector> aggExprs; + aggExprs.reserve(sAgg.measures().size()); + for (const auto& smea : sAgg.measures()) { + const auto& aggFunction = smea.measure(); + std::string funcName = subParser_->findVeloxFunction( + functionMap_, aggFunction.function_reference()); + std::vector> aggParams; + aggParams.reserve(aggFunction.args().size()); + for (const auto& arg : aggFunction.args()) { + aggParams.emplace_back(exprConverter_->toVeloxExpr(arg, inputType)); + } + auto aggVeloxType = + toVeloxType(subParser_->parseType(aggFunction.output_type())->type); + if (funcName == "avg") { + // Will used sum and count to calculate the partial avg. + auto sumExpr = std::make_shared( + aggVeloxType, aggParams, "sum"); + auto countExpr = std::make_shared( + BIGINT(), aggParams, "count"); + aggExprs.emplace_back(sumExpr); + aggExprs.emplace_back(countExpr); + aggOutIdx += 2; + } else { + auto aggExpr = std::make_shared( + aggVeloxType, std::move(aggParams), funcName); + aggExprs.emplace_back(aggExpr); + aggOutIdx += 1; + } + } + + bool ignoreNullKeys = false; + std::vector> + aggregateMasks(aggOutIdx - groupingOutIdx); + std::vector> + preGroupingExprs = {}; + + // Get the output names of Aggregation. std::vector aggOutNames; - aggOutNames.reserve(aggMeasureSize); - for (int idx = 0; idx < aggMeasureSize; idx++) { - aggOutNames.emplace_back( - substraitParser_->makeNodeName(planNodeId_, idx)); + aggOutNames.reserve(aggOutIdx - groupingOutIdx); + for (int idx = groupingOutIdx; idx < aggOutIdx; idx++) { + aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); } - return std::make_shared( + + // Create Aggregate node. + auto aggNode = std::make_shared( nextPlanNodeId(), aggStep, veloxGroupingExprs, @@ -252,449 +328,554 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( aggExprs, aggregateMasks, ignoreNullKeys, - projectNode); - } -} - -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::ProjectRel& projectRel, - memory::MemoryPool* pool) { - core::PlanNodePtr childNode; - if (projectRel.has_input()) { - childNode = toVeloxPlan(projectRel.input(), pool); - } else { - VELOX_FAIL("Child Rel is expected in ProjectRel."); + childNode); + return aggNode; } - // Construct Velox Expressions. - auto projectExprs = projectRel.expressions(); - std::vector projectNames; - std::vector expressions; - projectNames.reserve(projectExprs.size()); - expressions.reserve(projectExprs.size()); - - const auto& inputType = childNode->outputType(); - int colIdx = 0; - for (const auto& expr : projectExprs) { - expressions.emplace_back(exprConverter_->toVeloxExpr(expr, inputType)); - projectNames.emplace_back( - substraitParser_->makeNodeName(planNodeId_, colIdx)); - colIdx += 1; - } + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::ProjectRel& projectRel, memory::MemoryPool* pool) { + core::PlanNodePtr childNode; + if (projectRel.has_input()) { + childNode = toVeloxPlan(projectRel.input(), pool); + } else { + VELOX_FAIL("Child Rel is expected in ProjectRel."); + } - return std::make_shared( - nextPlanNodeId(), - std::move(projectNames), - std::move(expressions), - childNode); -} + // Construct Velox Expressions. + const auto& projectExprs = sProject.expressions(); + std::vector projectNames; + std::vector expressions; + projectNames.reserve(projectExprs.size()); + expressions.reserve(projectExprs.size()); + + const auto& inputType = childNode->outputType(); + int colIdx = 0; + for (const auto& expr : projectExprs) { + expressions.emplace_back(exprConverter_->toVeloxExpr(expr, inputType)); + projectNames.emplace_back( + substraitParser_->makeNodeName(planNodeId_, colIdx)); + colIdx += 1; + } -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::FilterRel& filterRel, - memory::MemoryPool* pool) { - core::PlanNodePtr childNode; - if (filterRel.has_input()) { - childNode = toVeloxPlan(filterRel.input(), pool); - } else { - VELOX_FAIL("Child Rel is expected in FilterRel."); + return std::make_shared( + nextPlanNodeId(), + std::move(projectNames), + std::move(expressions), + childNode); } - const auto& inputType = childNode->outputType(); - const auto& sExpr = filterRel.condition(); + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::FilterRel& filterRel, memory::MemoryPool* pool) { + core::PlanNodePtr childNode; + if (filterRel.has_input()) { + childNode = toVeloxPlan(filterRel.input(), pool); + } else { + VELOX_FAIL("Child Rel is expected in FilterRel."); + } - return std::make_shared( - nextPlanNodeId(), - exprConverter_->toVeloxExpr(sExpr, inputType), - childNode); -} + const auto& inputType = childNode->outputType(); + const auto& sExpr = filterRel.condition(); -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::ReadRel& readRel, - memory::MemoryPool* pool, - std::shared_ptr& splitInfo) { - // Get output names and types. - std::vector colNameList; - std::vector veloxTypeList; - if (readRel.has_base_schema()) { - const auto& baseSchema = readRel.base_schema(); - colNameList.reserve(baseSchema.names().size()); - for (const auto& name : baseSchema.names()) { - colNameList.emplace_back(name); - } - auto substraitTypeList = substraitParser_->parseNamedStruct(baseSchema); - veloxTypeList.reserve(substraitTypeList.size()); - for (const auto& substraitType : substraitTypeList) { - veloxTypeList.emplace_back(toVeloxType(substraitType->type)); - } + return std::make_shared( + nextPlanNodeId(), + exprConverter_->toVeloxExpr(sExpr, inputType), + childNode); } - // Parse local files - if (readRel.has_local_files()) { - using SubstraitFileFormatCase = - ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; - const auto& fileList = readRel.local_files().items(); - splitInfo->paths.reserve(fileList.size()); - splitInfo->starts.reserve(fileList.size()); - splitInfo->lengths.reserve(fileList.size()); - for (const auto& file : fileList) { - // Expect all files to share the same index. - splitInfo->partitionIndex = file.partition_index(); - splitInfo->paths.emplace_back(file.uri_file()); - splitInfo->starts.emplace_back(file.start()); - splitInfo->lengths.emplace_back(file.length()); - switch (file.file_format_case()) { - case SubstraitFileFormatCase::kOrc: - splitInfo->format = dwio::common::FileFormat::DWRF; - break; - case SubstraitFileFormatCase::kParquet: - splitInfo->format = dwio::common::FileFormat::PARQUET; - break; - default: - splitInfo->format = dwio::common::FileFormat::UNKNOWN; + std::shared_ptr + SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::ReadRel& sRead, + u_int32_t& index, + std::vector& paths, + std::vector& starts, + std::vector& lengths) { + // Check if the ReadRel specifies an input of stream. If yes, the pre-built + // input node will be used as the data source. + auto streamIdx = streamIsInput(sRead); + if (streamIdx >= 0) { + if (inputNodesMap_.find(streamIdx) == inputNodesMap_.end()) { + VELOX_FAIL( + "Could not find source index {} in input nodes map.", streamIdx); } + return inputNodesMap_[streamIdx]; } - } - // Do not hard-code connector ID and allow for connectors other than Hive. - static const std::string kHiveConnectorId = "test-hive"; - - // Velox requires Filter Pushdown must being enabled. - bool filterPushdownEnabled = true; - std::shared_ptr tableHandle; - if (!readRel.has_filter()) { - tableHandle = std::make_shared( - kHiveConnectorId, - "hive_table", - filterPushdownEnabled, - connector::hive::SubfieldFilters{}, - nullptr); - } else { - connector::hive::SubfieldFilters filters = - toVeloxFilter(colNameList, veloxTypeList, readRel.filter()); - tableHandle = std::make_shared( - kHiveConnectorId, - "hive_table", - filterPushdownEnabled, - std::move(filters), - nullptr); - } + // Otherwise, will create TableScan node for ReadRel. + // Get output names and types. + std::vector colNameList; + std::vector veloxTypeList; + if (readRel.has_base_schema()) { + const auto& baseSchema = readRel.base_schema(); + colNameList.reserve(baseSchema.names().size()); + for (const auto& name : baseSchema.names()) { + colNameList.emplace_back(name); + } + auto substraitTypeList = substraitParser_->parseNamedStruct(baseSchema); + veloxTypeList.reserve(substraitTypeList.size()); + for (const auto& substraitType : substraitTypeList) { + veloxTypeList.emplace_back(toVeloxType(substraitType->type)); + } + } - // Get assignments and out names. - std::vector outNames; - outNames.reserve(colNameList.size()); - std::unordered_map> - assignments; - for (int idx = 0; idx < colNameList.size(); idx++) { - auto outName = substraitParser_->makeNodeName(planNodeId_, idx); - assignments[outName] = std::make_shared( - colNameList[idx], - connector::hive::HiveColumnHandle::ColumnType::kRegular, - veloxTypeList[idx]); - outNames.emplace_back(outName); - } - auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); + // Parse local files + if (readRel.has_local_files()) { + const auto& fileList = readRel.local_files().items(); + splitInfo->paths.reserve(fileList.size()); + splitInfo->starts.reserve(fileList.size()); + splitInfo->lengths.reserve(fileList.size()); + for (const auto& file : fileList) { + // Expect all files to share the same index. + splitInfo->partitionIndex = file.partition_index(); + splitInfo->paths.emplace_back(file.uri_file()); + splitInfo->starts.emplace_back(file.start()); + splitInfo->lengths.emplace_back(file.length()); + switch (file.format()) { + case 0: + splitInfo->format = dwio::common::FileFormat::DWRF; + break; + case 1: + splitInfo->format = dwio::common::FileFormat::PARQUET; + break; + default: + splitInfo->format = dwio::common::FileFormat::UNKNOWN; + } + } + } - if (readRel.has_virtual_table()) { - return toVeloxPlan(readRel, pool, outputType); + // Do not hard-code connector ID and allow for connectors other than Hive. + static const std::string kHiveConnectorId = "test-hive"; + + // Velox requires Filter Pushdown must being enabled. + bool filterPushdownEnabled = true; + std::shared_ptr tableHandle; + if (!readRel.has_filter()) { + tableHandle = std::make_shared( + kHiveConnectorId, + "hive_table", + filterPushdownEnabled, + connector::hive::SubfieldFilters{}, + nullptr); + } else { + connector::hive::SubfieldFilters filters = + toVeloxFilter(colNameList, veloxTypeList, readRel.filter()); + tableHandle = std::make_shared( + kHiveConnectorId, + "hive_table", + filterPushdownEnabled, + std::move(filters), + nullptr); + } - } else { - auto tableScanNode = std::make_shared( - nextPlanNodeId(), outputType, tableHandle, assignments); - return tableScanNode; - } -} + // Get assignments and out names. + std::vector outNames; + outNames.reserve(colNameList.size()); + std::unordered_map> + assignments; + for (int idx = 0; idx < colNameList.size(); idx++) { + auto outName = substraitParser_->makeNodeName(planNodeId_, idx); + assignments[outName] = + std::make_shared( + colNameList[idx], + connector::hive::HiveColumnHandle::ColumnType::kRegular, + veloxTypeList[idx]); + outNames.emplace_back(outName); + } + auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::ReadRel& readRel, - memory::MemoryPool* pool, - const RowTypePtr& type) { - ::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table(); - int64_t numVectors = readVirtualTable.values_size(); - int64_t numColumns = type->size(); - int64_t valueFieldNums = - readVirtualTable.values(numVectors - 1).fields_size(); - std::vector vectors; - vectors.reserve(numVectors); - - int64_t batchSize; - // For the empty vectors, eg,vectors = makeRowVector(ROW({}, {}), 1). - if (numColumns == 0) { - batchSize = 1; - } else { - batchSize = valueFieldNums / numColumns; + if (readRel.has_virtual_table()) { + return toVeloxPlan(readRel, pool, outputType); + + } else { + auto tableScanNode = std::make_shared( + nextPlanNodeId(), outputType, tableHandle, assignments); + return tableScanNode; + } } - for (int64_t index = 0; index < numVectors; ++index) { - std::vector children; - ::substrait::Expression_Literal_Struct rowValue = - readRel.virtual_table().values(index); - auto fieldSize = rowValue.fields_size(); - VELOX_CHECK_EQ(fieldSize, batchSize * numColumns); - - for (int64_t col = 0; col < numColumns; ++col) { - const TypePtr& outputChildType = type->childAt(col); - std::vector batchChild; - batchChild.reserve(batchSize); - for (int64_t batchId = 0; batchId < batchSize; batchId++) { - // each value in the batch - auto fieldIdx = col * batchSize + batchId; - ::substrait::Expression_Literal field = rowValue.fields(fieldIdx); - - auto expr = exprConverter_->toVeloxExpr(field); - if (auto constantExpr = - std::dynamic_pointer_cast( - expr)) { - if (!constantExpr->hasValueVector()) { - batchChild.emplace_back(constantExpr->value()); + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::ReadRel& readRel, + memory::MemoryPool* pool, + const RowTypePtr& type) { + ::substrait::ReadRel_VirtualTable readVirtualTable = + readRel.virtual_table(); + int64_t numVectors = readVirtualTable.values_size(); + int64_t numColumns = type->size(); + int64_t valueFieldNums = + readVirtualTable.values(numVectors - 1).fields_size(); + std::vector vectors; + vectors.reserve(numVectors); + + int64_t batchSize = valueFieldNums / numColumns; + + for (int64_t index = 0; index < numVectors; ++index) { + std::vector children; + ::substrait::Expression_Literal_Struct rowValue = + readRel.virtual_table().values(index); + auto fieldSize = rowValue.fields_size(); + VELOX_CHECK_EQ(fieldSize, batchSize * numColumns); + + for (int64_t col = 0; col < numColumns; ++col) { + const TypePtr& outputChildType = type->childAt(col); + std::vector batchChild; + batchChild.reserve(batchSize); + for (int64_t batchId = 0; batchId < batchSize; batchId++) { + // each value in the batch + auto fieldIdx = col * batchSize + batchId; + ::substrait::Expression_Literal field = rowValue.fields(fieldIdx); + + auto expr = exprConverter_->toVeloxExpr(field); + if (auto constantExpr = + std::dynamic_pointer_cast( + expr)) { + if (!constantExpr->hasValueVector()) { + batchChild.emplace_back(constantExpr->value()); + } else { + VELOX_UNSUPPORTED( + "Values node with complex type values is not supported yet"); + } } else { - VELOX_UNSUPPORTED( - "Values node with complex type values is not supported yet"); + VELOX_FAIL("Expected constant expression"); } - } else { - VELOX_FAIL("Expected constant expression"); } + children.emplace_back( + setVectorFromVariants(outputChildType, batchChild, pool)); } - children.emplace_back( - setVectorFromVariants(outputChildType, batchChild, pool)); + + vectors.emplace_back(std::make_shared( + pool, type, nullptr, batchSize, children)); } - vectors.emplace_back( - std::make_shared(pool, type, nullptr, batchSize, children)); + return std::make_shared(nextPlanNodeId(), vectors); } - return std::make_shared(nextPlanNodeId(), vectors); -} + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::Rel& rel, memory::MemoryPool* pool) { + if (rel.has_aggregate()) { + return toVeloxPlan(rel.aggregate(), pool); + } + if (rel.has_project()) { + return toVeloxPlan(rel.project(), pool); + } + if (rel.has_filter()) { + return toVeloxPlan(rel.filter(), pool); + } + if (rel.has_read()) { + auto splitInfo = std::make_shared(); -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::Rel& rel, - memory::MemoryPool* pool) { - if (rel.has_aggregate()) { - return toVeloxPlan(rel.aggregate(), pool); + auto planNode = toVeloxPlan(rel.read(), pool, splitInfo); + splitInfoMap_[planNode->id()] = splitInfo; + return planNode; + } + VELOX_NYI("Substrait conversion not supported for Rel."); } - if (rel.has_project()) { - return toVeloxPlan(rel.project(), pool); + + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::RelRoot& root, memory::MemoryPool* pool) { + // TODO: Use the names as the output names for the whole computing. + const auto& names = root.names(); + if (root.has_input()) { + const auto& rel = root.input(); + return toVeloxPlan(rel, pool); + } + VELOX_FAIL("Input is expected in RelRoot."); } - if (rel.has_filter()) { - return toVeloxPlan(rel.filter(), pool); + + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::Plan& substraitPlan, memory::MemoryPool* pool) { + // Construct the function map based on the Substrait representation. + constructFuncMap(sPlan); + + // Create the expression converter. + exprConverter_ = + std::make_shared(functionMap_); + + // In fact, only one RelRoot or Rel is expected here. + for (const auto& rel : substraitPlan.relations()) { + if (rel.has_root()) { + return toVeloxPlan(rel.root(), pool); + } + if (rel.has_rel()) { + return toVeloxPlan(rel.rel(), pool); + } + } + VELOX_FAIL("RelRoot or Rel is expected in Plan."); } - if (rel.has_read()) { - auto splitInfo = std::make_shared(); - auto planNode = toVeloxPlan(rel.read(), pool, splitInfo); - splitInfoMap_[planNode->id()] = splitInfo; - return planNode; + void SubstraitVeloxPlanConverter::constructFuncMap( + const ::substrait::Plan& sPlan) { + // Construct the function map based on the Substrait representation. + for (const auto& sExtension : sPlan.extensions()) { + if (!sExtension.has_extension_function()) { + continue; + } + const auto& sFmap = sExtension.extension_function(); + auto id = sFmap.function_anchor(); + auto name = sFmap.name(); + functionMap_[id] = name; + } } - VELOX_NYI("Substrait conversion not supported for Rel."); -} -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::RelRoot& root, - memory::MemoryPool* pool) { - // TODO: Use the names as the output names for the whole computing. - const auto& names = root.names(); - if (root.has_input()) { - const auto& rel = root.input(); - return toVeloxPlan(rel, pool); + std::string SubstraitVeloxPlanConverter::nextPlanNodeId() { + auto id = fmt::format("{}", planNodeId_); + planNodeId_++; + return id; } - VELOX_FAIL("Input is expected in RelRoot."); -} -core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::Plan& substraitPlan, - memory::MemoryPool* pool) { - VELOX_CHECK( - checkTypeExtension(substraitPlan), - "The type extension only have unknown type.") - // Construct the function map based on the Substrait representation. - constructFunctionMap(substraitPlan); + core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::Plan& substraitPlan, memory::MemoryPool* pool) { + VELOX_CHECK( + checkTypeExtension(substraitPlan), + "The type extension only have unknown type.") + // Construct the function map based on the Substrait representation. + constructFunctionMap(substraitPlan); - // Construct the expression converter. - exprConverter_ = std::make_shared(functionMap_); + // In fact, only one RelRoot or Rel is expected here. + VELOX_CHECK_EQ(substraitPlan.relations_size(), 1); + const auto& rel = substraitPlan.relations(0); + if (rel.has_root()) { + return toVeloxPlan(rel.root(), pool); + } + if (rel.has_rel()) { + return toVeloxPlan(rel.rel(), pool); + } - // In fact, only one RelRoot or Rel is expected here. - VELOX_CHECK_EQ(substraitPlan.relations_size(), 1); - const auto& rel = substraitPlan.relations(0); - if (rel.has_root()) { - return toVeloxPlan(rel.root(), pool); - } - if (rel.has_rel()) { - return toVeloxPlan(rel.rel(), pool); + VELOX_FAIL("RelRoot or Rel is expected in Plan."); } - VELOX_FAIL("RelRoot or Rel is expected in Plan."); -} + // This class contains the needed infos for Filter Pushdown. + // TODO: Support different types here. + class FilterInfo { + public: + // Used to set the left bound. + void setLeft(double left, bool isExclusive) { + left_ = left; + leftExclusive_ = isExclusive; + if (!isInitialized_) { + isInitialized_ = true; + } + } -std::string SubstraitVeloxPlanConverter::nextPlanNodeId() { - auto id = fmt::format("{}", planNodeId_); - planNodeId_++; - return id; -} + // Used to set the right bound. + void setRight(double right, bool isExclusive) { + right_ = right; + rightExclusive_ = isExclusive; + if (!isInitialized_) { + isInitialized_ = true; + } + } -// This class contains the needed infos for Filter Pushdown. -// TODO: Support different types here. -class FilterInfo { - public: - // Used to set the left bound. - void setLeft(double left, bool isExclusive) { - left_ = left; - leftExclusive_ = isExclusive; - if (!isInitialized_) { - isInitialized_ = true; + // Will fordis Null value if called once. + void forbidsNull() { + nullAllowed_ = false; + if (!isInitialized_) { + isInitialized_ = true; + } } - } - // Used to set the right bound. - void setRight(double right, bool isExclusive) { - right_ = right; - rightExclusive_ = isExclusive; - if (!isInitialized_) { - isInitialized_ = true; + // Return the initialization status. + bool isInitialized() { + return isInitialized_ ? true : false; } - } - // Will fordis Null value if called once. - void forbidsNull() { - nullAllowed_ = false; - if (!isInitialized_) { - isInitialized_ = true; + // The left bound. + std::optional left_ = std::nullopt; + // The right bound. + std::optional right_ = std::nullopt; + // The Null allowing. + bool nullAllowed_ = true; + // If true, left bound will be exclusive. + bool leftExclusive_ = false; + // If true, right bound will be exclusive. + bool rightExclusive_ = false; + + private: + bool isInitialized_ = false; + }; + + connector::hive::SubfieldFilters SubstraitVeloxPlanConverter::toVeloxFilter( + const std::vector& inputNameList, + const std::vector& inputTypeList, + const ::substrait::Expression& substraitFilter) { + connector::hive::SubfieldFilters filters; + // A map between the column index and the FilterInfo for that column. + std::unordered_map> colInfoMap; + for (int idx = 0; idx < inputNameList.size(); idx++) { + colInfoMap[idx] = std::make_shared(); } - } - // Return the initialization status. - bool isInitialized() { - return isInitialized_ ? true : false; + std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; + flattenConditions(substraitFilter, scalarFunctions); + // Construct the FilterInfo for the related column. + for (const auto& scalarFunction : scalarFunctions) { + auto filterNameSpec = substraitParser_->findFunctionSpec( + functionMap_, scalarFunction.function_reference()); + auto filterName = substraitParser_->getFunctionName(filterNameSpec); + int32_t colIdx; + // TODO: Add different types' support here. + double val; + for (auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: { + auto sel = param.selection(); + // TODO: Only direct reference is considered here. + auto dRef = sel.direct_reference(); + colIdx = substraitParser_->parseReferenceSegment(dRef); + break; + } + case ::substrait::Expression::RexTypeCase::kLiteral: { + auto sLit = param.literal(); + // TODO: Only double is considered here. + val = sLit.fp64(); + break; + } + default: + VELOX_NYI( + "Substrait conversion not supported for arg type '{}'", + typeCase); + } + } + if (filterName == "is_not_null") { + colInfoMap[colIdx]->forbidsNull(); + } else if (filterName == "gte") { + colInfoMap[colIdx]->setLeft(val, false); + } else if (filterName == "gt") { + colInfoMap[colIdx]->setLeft(val, true); + } else if (filterName == "lte") { + colInfoMap[colIdx]->setRight(val, false); + } else if (filterName == "lt") { + colInfoMap[colIdx]->setRight(val, true); + } else { + VELOX_NYI( + "Substrait conversion not supported for filter name '{}'", + filterName); + } + } + + // Construct the Filters. + for (int idx = 0; idx < inputNameList.size(); idx++) { + auto filterInfo = colInfoMap[idx]; + // Set the left bound to be negative infinity. + double leftBound = -1.0 / 0.0; + // Set the right bound to be positive infinity. + double rightBound = 1.0 / 0.0; + bool leftUnbounded = true; + bool rightUnbounded = true; + bool leftExclusive = false; + bool rightExclusive = false; + if (filterInfo->isInitialized()) { + if (filterInfo->left_) { + leftUnbounded = false; + leftBound = filterInfo->left_.value(); + leftExclusive = filterInfo->leftExclusive_; + } + if (filterInfo->right_) { + rightUnbounded = false; + rightBound = filterInfo->right_.value(); + rightExclusive = filterInfo->rightExclusive_; + } + bool nullAllowed = filterInfo->nullAllowed_; + filters[common::Subfield(inputNameList[idx])] = + std::make_unique( + leftBound, + leftUnbounded, + leftExclusive, + rightBound, + rightUnbounded, + rightExclusive, + nullAllowed); + } + } + return filters; + } + + void SubstraitVeloxPlanConverter::flattenConditions( + const ::substrait::Expression& substraitFilter, + std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions) { + auto typeCase = substraitFilter.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kScalarFunction: { + auto sFunc = substraitFilter.scalar_function(); + auto filterNameSpec = substraitParser_->findFunctionSpec( + functionMap_, sFunc.function_reference()); + // TODO: Only and relation is supported here. + if (substraitParser_->getFunctionName(filterNameSpec) == "and") { + for (const auto& sCondition : sFunc.args()) { + flattenConditions(sCondition, scalarFunctions); + } + } else { + scalarFunctions.emplace_back(sFunc); + } + break; + } + default: + VELOX_NYI("GetFlatConditions not supported for type '{}'", typeCase); + } } - // The left bound. - std::optional left_ = std::nullopt; - // The right bound. - std::optional right_ = std::nullopt; - // The Null allowing. - bool nullAllowed_ = true; - // If true, left bound will be exclusive. - bool leftExclusive_ = false; - // If true, right bound will be exclusive. - bool rightExclusive_ = false; - - private: - bool isInitialized_ = false; -}; - -connector::hive::SubfieldFilters SubstraitVeloxPlanConverter::toVeloxFilter( - const std::vector& inputNameList, - const std::vector& inputTypeList, - const ::substrait::Expression& substraitFilter) { - connector::hive::SubfieldFilters filters; - // A map between the column index and the FilterInfo for that column. - std::unordered_map> colInfoMap; - for (int idx = 0; idx < inputNameList.size(); idx++) { - colInfoMap[idx] = std::make_shared(); + std::string SubstraitVeloxPlanConverter::findFuncSpec(uint64_t id) { + return subParser_->findSubstraitFuncSpec(functionMap_, id); } - std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; - flattenConditions(substraitFilter, scalarFunctions); - // Construct the FilterInfo for the related column. - for (const auto& scalarFunction : scalarFunctions) { - auto filterNameSpec = substraitParser_->findFunctionSpec( - functionMap_, scalarFunction.function_reference()); - auto filterName = substraitParser_->getFunctionName(filterNameSpec); - int32_t colIdx; - // TODO: Add different types' support here. - double val; - for (auto& param : scalarFunction.args()) { - auto typeCase = param.rex_type_case(); - switch (typeCase) { - case ::substrait::Expression::RexTypeCase::kSelection: { - auto sel = param.selection(); - // TODO: Only direct reference is considered here. - auto dRef = sel.direct_reference(); - colIdx = substraitParser_->parseReferenceSegment(dRef); + bool SubstraitVeloxPlanConverter::needsRowConstruct( + const ::substrait::AggregateRel& sAgg, + core::AggregationNode::Step& aggStep) { + for (const auto& smea : sAgg.measures()) { + auto aggFunction = smea.measure(); + std::string funcName = subParser_->findVeloxFunction( + functionMap_, aggFunction.function_reference()); + // Set the aggregation phase. + switch (aggFunction.phase()) { + case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE: + aggStep = core::AggregationNode::Step::kPartial; break; - } - case ::substrait::Expression::RexTypeCase::kLiteral: { - auto sLit = param.literal(); - // TODO: Only double is considered here. - val = sLit.fp64(); + case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE: + aggStep = core::AggregationNode::Step::kIntermediate; + break; + case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT: + aggStep = core::AggregationNode::Step::kFinal; + // Only Final Average needs row construct currently. + if (funcName == "avg") { + return true; + } + break; + case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT: + aggStep = core::AggregationNode::Step::kSingle; break; - } default: - VELOX_NYI( - "Substrait conversion not supported for arg type '{}'", typeCase); + throw std::runtime_error("Aggregate phase is not supported."); } } - if (filterName == "is_not_null") { - colInfoMap[colIdx]->forbidsNull(); - } else if (filterName == "gte") { - colInfoMap[colIdx]->setLeft(val, false); - } else if (filterName == "gt") { - colInfoMap[colIdx]->setLeft(val, true); - } else if (filterName == "lte") { - colInfoMap[colIdx]->setRight(val, false); - } else if (filterName == "lt") { - colInfoMap[colIdx]->setRight(val, true); - } else { - VELOX_NYI( - "Substrait conversion not supported for filter name '{}'", - filterName); - } + return false; } - // Construct the Filters. - for (int idx = 0; idx < inputNameList.size(); idx++) { - auto filterInfo = colInfoMap[idx]; - double leftBound; - double rightBound; - bool leftUnbounded = true; - bool rightUnbounded = true; - bool leftExclusive = false; - bool rightExclusive = false; - if (filterInfo->isInitialized()) { - if (filterInfo->left_) { - leftUnbounded = false; - leftBound = filterInfo->left_.value(); - leftExclusive = filterInfo->leftExclusive_; + int32_t SubstraitVeloxPlanConverter::streamIsInput( + const ::substrait::ReadRel& sRead) { + if (sRead.has_local_files()) { + const auto& fileList = sRead.local_files().items(); + if (fileList.size() == 0) { + VELOX_FAIL("At least one file path is expected."); } - if (filterInfo->right_) { - rightUnbounded = false; - rightBound = filterInfo->right_.value(); - rightExclusive = filterInfo->rightExclusive_; + + // The stream input will be specified with the format of + // "iterator:${index}". + std::string filePath = fileList[0].uri_file(); + std::string prefix = "iterator:"; + std::size_t pos = filePath.find(prefix); + if (pos == std::string::npos) { + return -1; } - bool nullAllowed = filterInfo->nullAllowed_; - filters[common::Subfield(inputNameList[idx])] = - std::make_unique( - leftBound, - leftUnbounded, - leftExclusive, - rightBound, - rightUnbounded, - rightExclusive, - nullAllowed); - } - } - return filters; -} -void SubstraitVeloxPlanConverter::flattenConditions( - const ::substrait::Expression& substraitFilter, - std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions) { - auto typeCase = substraitFilter.rex_type_case(); - switch (typeCase) { - case ::substrait::Expression::RexTypeCase::kScalarFunction: { - auto sFunc = substraitFilter.scalar_function(); - auto filterNameSpec = substraitParser_->findFunctionSpec( - functionMap_, sFunc.function_reference()); - // TODO: Only and relation is supported here. - if (substraitParser_->getFunctionName(filterNameSpec) == "and") { - for (const auto& sCondition : sFunc.args()) { - flattenConditions(sCondition, scalarFunctions); - } - } else { - scalarFunctions.emplace_back(sFunc); + // Get the index. + std::string idxStr = + filePath.substr(pos + prefix.size(), filePath.size()); + try { + return stoi(idxStr); + } catch (const std::exception& err) { + VELOX_FAIL(err.what()); } - break; } - default: - VELOX_NYI("GetFlatConditions not supported for type '{}'", typeCase); + VELOX_FAIL("Local file is expected."); } } diff --git a/velox/substrait/SubstraitToVeloxPlan.h b/velox/substrait/SubstraitToVeloxPlan.h index 739fe1d1a240..b5e2c2687903 100644 --- a/velox/substrait/SubstraitToVeloxPlan.h +++ b/velox/substrait/SubstraitToVeloxPlan.h @@ -57,48 +57,51 @@ class SubstraitVeloxPlanConverter { const ::substrait::FilterRel& filterRel, memory::MemoryPool* pool); - /// Convert Substrait ReadRel into Velox PlanNode. - /// Index: the index of the partition this item belongs to. - /// Starts: the start positions in byte to read from the items. - /// Lengths: the lengths in byte to read from the items. - core::PlanNodePtr toVeloxPlan( - const ::substrait::ReadRel& readRel, - memory::MemoryPool* pool, - std::shared_ptr& splitInfo); - /// Convert Substrait ReadRel into Velox Values Node. core::PlanNodePtr toVeloxPlan( const ::substrait::ReadRel& readRel, - memory::MemoryPool* pool, const RowTypePtr& type); - /// Convert Substrait Rel into Velox PlanNode. - core::PlanNodePtr toVeloxPlan( - const ::substrait::Rel& rel, - memory::MemoryPool* pool); - - /// Convert Substrait RelRoot into Velox PlanNode. - core::PlanNodePtr toVeloxPlan( - const ::substrait::RelRoot& root, - memory::MemoryPool* pool); - - /// Convert Substrait Plan into Velox PlanNode. - core::PlanNodePtr toVeloxPlan( - const ::substrait::Plan& substraitPlan, - memory::MemoryPool* pool); - /// Check the Substrait type extension only has one unknown extension. bool checkTypeExtension(const ::substrait::Plan& substraitPlan); - /// Construct the function map between the index and the Substrait function - /// name. - void constructFunctionMap(const ::substrait::Plan& substraitPlan); - - /// Return the function map used by this plan converter. - const std::unordered_map& getFunctionMap() const { + /// Convert Substrait ReadRel into Velox PlanNode. + /// Index: the index of the partition this item belongs to. + /// Starts: the start positions in byte to read from the items. + /// Lengths: the lengths in byte to read from the items. + std::shared_ptr toVeloxPlan( + const ::substrait::ReadRel& sRead, + u_int32_t& index, + std::vector& paths, + std::vector& starts, + std::vector& lengths); + + /// Used to convert Substrait Rel into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::Rel& sRel); + + /// Used to convert Substrait RelRoot into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::RelRoot& sRoot); + + /// Used to convert Substrait Plan into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::Plan& sPlan); + + /// Used to construct the function map between the index + /// and the Substrait function name. + void constructFuncMap(const ::substrait::Plan& sPlan); + + /// Will return the function map used by this plan converter. + const std::unordered_map& getFunctionMap() { return functionMap_; } + /// Will return the index of Partition to be scanned. + u_int32_t getPartitionIndex() { + return partitionIndex_; + } + /// Return the splitInfo map used by this plan converter. const std::unordered_map>& splitInfos() const { @@ -112,45 +115,101 @@ class SubstraitVeloxPlanConverter { /// name>:__..._ const std::string& findFunction(uint64_t id) const; - private: - /// Returns unique ID to use for plan node. Produces sequential numbers - /// starting from zero. - std::string nextPlanNodeId(); + /// Used to insert certain plan node as input. The plan node + /// id will start from the setted one. + void insertInputNode( + uint64_t inputIdx, + const std::shared_ptr& inputNode, + int planNodeId) { + inputNodesMap_[inputIdx] = inputNode; + planNodeId_ = planNodeId; + } - /// Used to convert Substrait Filter into Velox SubfieldFilters which will - /// be used in TableScan. - connector::hive::SubfieldFilters toVeloxFilter( - const std::vector& inputNameList, - const std::vector& inputTypeList, - const ::substrait::Expression& substraitFilter); + /// Used to check if ReadRel specifies an input of stream. + /// If yes, the index of input stream will be returned. + /// If not, -1 will be returned. + int32_t streamIsInput(const ::substrait::ReadRel& sRel); /// Multiple conditions are connected to a binary tree structure with /// the relation key words, including AND, OR, and etc. Currently, only /// AND is supported. This function is used to extract all the Substrait /// conditions in the binary tree structure into a vector. void flattenConditions( - const ::substrait::Expression& substraitFilter, + const ::substrait::Expression& sFilter, std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions); + /// Used to find the function specification in the constructed function map. + std::string findFuncSpec(uint64_t id); + + private: + /// The Partition index. + u_int32_t partitionIndex_; + + /// The file paths to be scanned. + std::vector paths_; + + /// The file starts in the scan. + std::vector starts_; + + /// The lengths to be scanned. + std::vector lengths_; + + /// The unique identification for each PlanNode. + int planNodeId_ = 0; + + /// The map storing the relations between the function id and the function + /// name. Will be constructed based on the Substrait representation. + std::unordered_map functionMap_; + + /// The map storing the pre-built plan nodes which can be accessed through + /// index. This map is only used when the computation of a Substrait plan + /// depends on other input nodes. + std::unordered_map> + inputNodesMap_; + /// The Substrait parser used to convert Substrait representations into /// recognizable representations. - std::shared_ptr substraitParser_{ + std::shared_ptr subParser_{ std::make_shared()}; /// The Expression converter used to convert Substrait representations into /// Velox expressions. std::shared_ptr exprConverter_; - /// The unique identification for each PlanNode. - int planNodeId_ = 0; + /// A function returning current function id and adding the plan node id by + /// one once called. + std::string nextPlanNodeId(); - /// The map storing the relations between the function id and the function - /// name. Will be constructed based on the Substrait representation. - std::unordered_map functionMap_; + /// Used to convert Substrait Filter into Velox SubfieldFilters which will + /// be used in TableScan. + connector::hive::SubfieldFilters toVeloxFilter( + const std::vector& inputNameList, + const std::vector& inputTypeList, + const ::substrait::Expression& substraitFilter); /// Mapping from leaf plan node ID to splits. std::unordered_map> splitInfoMap_; + /// Used to check if some of the input columns of Aggregation + /// should be combined into a single column. Currently, this case occurs in + /// final Average. The phase of Aggregation will also be set. + bool needsRowConstruct( + const ::substrait::AggregateRel& sAgg, + core::AggregationNode::Step& aggStep); + + /// Used to convert AggregateRel into Velox plan node. + /// This method will add a Project node before Aggregation to combine columns. + std::shared_ptr toVeloxAggWithRowConstruct( + const ::substrait::AggregateRel& sAgg, + const std::shared_ptr& childNode, + const core::AggregationNode::Step& aggStep); + + /// Used to convert AggregateRel into Velox plan node. + /// The output of child node will be used as the input of Aggregation. + std::shared_ptr toVeloxAgg( + const ::substrait::AggregateRel& sAgg, + const std::shared_ptr& childNode, + const core::AggregationNode::Step& aggStep); }; } // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxPlanValidator.cpp b/velox/substrait/SubstraitToVeloxPlanValidator.cpp new file mode 100644 index 000000000000..977b43a88402 --- /dev/null +++ b/velox/substrait/SubstraitToVeloxPlanValidator.cpp @@ -0,0 +1,340 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/substrait/SubstraitToVeloxPlanValidator.h" +#include "TypeUtils.h" + +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" + +namespace facebook::velox::substrait { + +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Type& sType) { + switch (sType.kind_case()) { + case ::substrait::Type::KindCase::kBool: + case ::substrait::Type::KindCase::kI32: + case ::substrait::Type::KindCase::kI64: + case ::substrait::Type::KindCase::kFp64: + case ::substrait::Type::KindCase::kString: + return true; + default: + return false; + } +} + +bool SubstraitToVeloxPlanValidator::validateInputTypes( + const ::substrait::extensions::AdvancedExtension& extension, + std::vector& types) { + // The input type is wrapped in enhancement. + if (!extension.has_enhancement()) { + return false; + } + const auto& enhancement = extension.enhancement(); + ::substrait::Type inputType; + if (!enhancement.UnpackTo(&inputType)) { + return false; + } + if (!inputType.has_struct_()) { + return false; + } + + // Get the input types. + const auto& sTypes = inputType.struct_().types(); + for (const auto& sType : sTypes) { + try { + types.emplace_back(toVeloxType(subParser_->parseType(sType)->type)); + } catch (const VeloxException& err) { + std::cout << "Type is not supported in ProjectRel due to:" + << err.message() << std::endl; + return false; + } + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::ProjectRel& sProject) { + if (sProject.has_input() && !validate(sProject.input())) { + return false; + } + + // Get and validate the input types from extension. + if (!sProject.has_advanced_extension()) { + std::cout << "Input types are expected in ProjectRel." << std::endl; + return false; + } + const auto& extension = sProject.advanced_extension(); + std::vector types; + if (!validateInputTypes(extension, types)) { + std::cout << "Validation failed for input types in ProjectRel" << std::endl; + return false; + } + + int32_t inputPlanNodeId = 0; + // Create the fake input names to be used in row type. + std::vector names; + names.reserve(types.size()); + for (uint32_t colIdx = 0; colIdx < types.size(); colIdx++) { + names.emplace_back(subParser_->makeNodeName(inputPlanNodeId, colIdx)); + } + auto rowType = std::make_shared(std::move(names), std::move(types)); + + // Validate the project expressions. + const auto& projectExprs = sProject.expressions(); + std::vector> expressions; + expressions.reserve(projectExprs.size()); + try { + for (const auto& expr : projectExprs) { + expressions.emplace_back(exprConverter_->toVeloxExpr(expr, rowType)); + } + // Try to compile the expressions. If there is any unregistred funciton or + // mismatched type, exception will be thrown. + exec::ExprSet exprSet(std::move(expressions), &execCtx_); + } catch (const VeloxException& err) { + std::cout << "Validation failed for expression in ProjectRel due to:" + << err.message() << std::endl; + return false; + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::FilterRel& sFilter) { + return false; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::AggregateRel& sAgg) { + if (sAgg.has_input() && !validate(sAgg.input())) { + return false; + } + + // Validate input types. + if (sAgg.has_advanced_extension()) { + const auto& extension = sAgg.advanced_extension(); + std::vector types; + if (!validateInputTypes(extension, types)) { + std::cout << "Validation failed for input types in AggregateRel" + << std::endl; + return false; + } + } + + // Validate groupings. + for (const auto& grouping : sAgg.groupings()) { + for (const auto& groupingExpr : grouping.grouping_expressions()) { + const auto& typeCase = groupingExpr.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: + break; + default: + std::cout << "Only field is supported in groupings." << std::endl; + return false; + } + } + } + + // Validate aggregate functions. + std::vector funcSpecs; + funcSpecs.reserve(sAgg.measures().size()); + for (const auto& smea : sAgg.measures()) { + try { + const auto& aggFunction = smea.measure(); + funcSpecs.emplace_back( + planConverter_->findFuncSpec(aggFunction.function_reference())); + toVeloxType(subParser_->parseType(aggFunction.output_type())->type); + for (const auto& arg : aggFunction.args()) { + auto typeCase = arg.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: + break; + default: + std::cout << "Only field is supported in aggregate functions." + << std::endl; + return false; + } + } + } catch (const VeloxException& err) { + std::cout << "Validation failed for aggregate function due to: " + << err.message() << std::endl; + return false; + } + } + + std::unordered_set supportedFuncs = {"sum", "count", "avg"}; + for (const auto& funcSpec : funcSpecs) { + auto funcName = subParser_->getSubFunctionName(funcSpec); + if (supportedFuncs.find(funcName) == supportedFuncs.end()) { + std::cout << "Validation failed due to " << funcName + << " was not supported in AggregateRel." << std::endl; + return false; + } + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::ReadRel& sRead) { + if (!sRead.has_base_schema()) { + std::cout << "Validation failed due to schema was not found in ReadRel." + << std::endl; + return false; + } + const auto& sTypes = sRead.base_schema().struct_().types(); + for (const auto& sType : sTypes) { + if (!validate(sType)) { + std::cout << "Validation failed due to type was not supported in ReadRel." + << std::endl; + return false; + } + } + std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; + if (sRead.has_filter()) { + try { + planConverter_->flattenConditions(sRead.filter(), scalarFunctions); + } catch (const VeloxException& err) { + std::cout + << "Validation failed due to flattening conditions failed in ReadRel due to:" + << err.message() << std::endl; + return false; + } + } + // Get and validate the filter functions. + std::vector funcSpecs; + funcSpecs.reserve(scalarFunctions.size()); + for (const auto& scalarFunction : scalarFunctions) { + try { + funcSpecs.emplace_back( + planConverter_->findFuncSpec(scalarFunction.function_reference())); + } catch (const VeloxException& err) { + std::cout << "Validation failed in ReadRel due to:" << err.message() + << std::endl; + return false; + } + + if (scalarFunction.args().size() == 1) { + // Field is expected. + for (const auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: + break; + default: + std::cout << "Field is Expected." << std::endl; + return false; + } + } + } else if (scalarFunction.args().size() == 2) { + // Expect there being two args. One is field and the other is literal. + bool fieldExists = false; + bool litExists = false; + for (const auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: { + fieldExists = true; + break; + } + case ::substrait::Expression::RexTypeCase::kLiteral: { + litExists = true; + break; + } + default: + std::cout << "Type case: " << typeCase + << " is not supported in ReadRel." << std::endl; + return false; + } + } + if (!fieldExists || !litExists) { + std::cout << "Only the case of Field and Literal is supported." + << std::endl; + return false; + } + } else { + std::cout << "More than two args is not supported in ReadRel." + << std::endl; + return false; + } + } + std::unordered_set supportedFilters = { + "is_not_null", "gte", "gt", "lte", "lt"}; + std::unordered_set supportedTypes = {"opt", "req", "fp64"}; + for (const auto& funcSpec : funcSpecs) { + // Validate the functions. + auto funcName = subParser_->getSubFunctionName(funcSpec); + if (supportedFilters.find(funcName) == supportedFilters.end()) { + std::cout << "Validation failed due to " << funcName + << " was not supported in ReadRel." << std::endl; + return false; + } + + // Validate the types. + std::vector funcTypes; + subParser_->getSubFunctionTypes(funcSpec, funcTypes); + for (const auto& funcType : funcTypes) { + if (supportedTypes.find(funcType) == supportedTypes.end()) { + std::cout << "Validation failed due to " << funcType + << " was not supported in ReadRel." << std::endl; + return false; + } + } + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Rel& sRel) { + if (sRel.has_aggregate()) { + return validate(sRel.aggregate()); + } + if (sRel.has_project()) { + return validate(sRel.project()); + } + if (sRel.has_filter()) { + return validate(sRel.filter()); + } + if (sRel.has_read()) { + return validate(sRel.read()); + } + return false; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::RelRoot& sRoot) { + if (sRoot.has_input()) { + const auto& sRel = sRoot.input(); + return validate(sRel); + } + return false; +} + +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Plan& sPlan) { + functions::prestosql::registerAllScalarFunctions(); + // Create plan converter and expression converter to help the validation. + planConverter_->constructFuncMap(sPlan); + exprConverter_ = std::make_shared( + planConverter_->getFunctionMap()); + + for (const auto& sRel : sPlan.relations()) { + if (sRel.has_root()) { + return validate(sRel.root()); + } + if (sRel.has_rel()) { + return validate(sRel.rel()); + } + } + return false; +} + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxPlanValidator.h b/velox/substrait/SubstraitToVeloxPlanValidator.h new file mode 100644 index 000000000000..705b8ebec3af --- /dev/null +++ b/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/substrait/SubstraitToVeloxPlan.h" + +namespace facebook::velox::substrait { + +/// This class is used to validate whether the computing of +/// a Substrait plan is supported in Velox. +class SubstraitToVeloxPlanValidator { + public: + /// Used to validate whether the computing of this type is supported. + bool validate(const ::substrait::Type& sType); + + /// Used to validate whether the computing of this Aggregation is supported. + bool validate(const ::substrait::AggregateRel& sAgg); + + /// Used to validate whether the computing of this Project is supported. + bool validate(const ::substrait::ProjectRel& sProject); + + /// Used to validate whether the computing of this Filter is supported. + bool validate(const ::substrait::FilterRel& sFilter); + + /// Used to validate whether the computing of this Read is supported. + bool validate(const ::substrait::ReadRel& sRead); + + /// Used to validate whether the computing of this Rel is supported. + bool validate(const ::substrait::Rel& sRel); + + /// Used to validate whether the computing of this RelRoot is supported. + bool validate(const ::substrait::RelRoot& sRoot); + + /// Used to validate whether the computing of this Plan is supported. + bool validate(const ::substrait::Plan& sPlan); + + private: + /// A query context used for function validation. + std::shared_ptr queryCtx_{core::QueryCtx::createForTest()}; + + /// A memory pool used for function validation. + std::unique_ptr pool_{ + memory::getDefaultScopedMemoryPool()}; + + /// An execution context used for function validation. + core::ExecCtx execCtx_{pool_.get(), queryCtx_.get()}; + + /// A converter used to convert Substrait plan into Velox's plan node. + std::shared_ptr planConverter_ = + std::make_shared(); + + /// A parser used to convert Substrait plan into recognizable representations. + std::shared_ptr subParser_ = + std::make_shared(); + + /// An expression converter used to convert Substrait representations into + /// Velox expressions. + std::shared_ptr exprConverter_; + + /// Used to get types from advanced extension and validate them. + bool validateInputTypes( + const ::substrait::extensions::AdvancedExtension& extension, + std::vector& types); +}; + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/tests/PlanConversionTest.cpp b/velox/substrait/tests/PlanConversionTest.cpp new file mode 100644 index 000000000000..7f7d0fe56d45 --- /dev/null +++ b/velox/substrait/tests/PlanConversionTest.cpp @@ -0,0 +1,569 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/common/base/tests/Fs.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/test/utils/DataFiles.h" +#include "velox/exec/PartitionedOutputBufferManager.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/substrait/SubstraitToVeloxPlan.h" +#include "velox/type/Type.h" +#include "velox/type/tests/FilterBuilder.h" +#include "velox/type/tests/SubfieldFiltersBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::exec; +using namespace facebook::velox::common::test; +using namespace facebook::velox::exec::test; + +class PlanConversionTest : public virtual HiveConnectorTestBase, + public testing::WithParamInterface { + protected: + void SetUp() override { + useAsyncCache_ = GetParam(); + HiveConnectorTestBase::SetUp(); + } + + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + } + + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const std::shared_ptr& rowType) { + return HiveConnectorTestBase::makeVectors(rowType, count, rowsPerVector); + } + + class VeloxConverter { + public: + // This class is an iterator for Velox computing. + class WholeComputeResultIterator { + public: + WholeComputeResultIterator( + const std::shared_ptr& planNode, + u_int32_t index, + const std::vector& paths, + const std::vector& starts, + const std::vector& lengths) + : planNode_(planNode), + index_(index), + paths_(paths), + starts_(starts), + lengths_(lengths) { + // Construct the splits. + std::vector> + connectorSplits; + connectorSplits.reserve(paths.size()); + for (int idx = 0; idx < paths.size(); idx++) { + auto path = paths[idx]; + auto start = starts[idx]; + auto length = lengths[idx]; + auto split = std::make_shared< + facebook::velox::connector::hive::HiveConnectorSplit>( + facebook::velox::exec::test::kHiveConnectorId, + path, + facebook::velox::dwio::common::FileFormat::ORC, + start, + length); + connectorSplits.emplace_back(split); + } + splits_.reserve(connectorSplits.size()); + for (const auto& connectorSplit : connectorSplits) { + splits_.emplace_back(exec::Split(folly::copy(connectorSplit), -1)); + } + + params_.planNode = planNode; + cursor_ = std::make_unique(params_); + addSplits_ = [&](Task* task) { + if (noMoreSplits_) { + return; + } + for (auto& split : splits_) { + task->addSplit("0", std::move(split)); + } + task->noMoreSplits("0"); + noMoreSplits_ = true; + }; + } + + bool HasNext() { + if (!mayHasNext_) { + return false; + } + if (numRows_ > 0) { + return true; + } else { + addSplits_(cursor_->task().get()); + if (cursor_->moveNext()) { + result_ = cursor_->current(); + numRows_ += result_->size(); + return true; + } else { + mayHasNext_ = false; + return false; + } + } + } + + RowVectorPtr Next() { + numRows_ = 0; + return result_; + } + + private: + const std::shared_ptr planNode_; + std::unique_ptr cursor_; + exec::test::CursorParameters params_; + std::vector splits_; + bool noMoreSplits_ = false; + std::function addSplits_; + u_int32_t index_; + std::vector paths_; + std::vector starts_; + std::vector lengths_; + uint64_t numRows_ = 0; + bool mayHasNext_ = true; + RowVectorPtr result_; + }; + + // This method will resume the Substrait plan from Json file, + // and convert it into Velox PlanNode. A result iterator for + // Velox computing will be returned. + std::shared_ptr getResIter( + const std::string& subPlanPath) { + // Read json file and resume the Substrait plan. + std::ifstream subJson(subPlanPath); + std::stringstream buffer; + buffer << subJson.rdbuf(); + std::string subData = buffer.str(); + ::substrait::Plan subPlan; + google::protobuf::util::JsonStringToMessage(subData, &subPlan); + + auto planConverter = std::make_shared< + facebook::velox::substrait::SubstraitVeloxPlanConverter>(); + // Convert to Velox PlanNode. + auto planNode = planConverter->toVeloxPlan(subPlan); + + // Get the information for TableScan. + u_int32_t partitionIndex = planConverter->getPartitionIndex(); + std::vector paths = planConverter->getPaths(); + + // In test, need to get the absolute path of the generated ORC file. + auto tempPath = getTmpDirPath(); + std::vector absolutePaths; + absolutePaths.reserve(paths.size()); + + for (const auto& path : paths) { + absolutePaths.emplace_back(fmt::format("file://{}{}", tempPath, path)); + } + + std::vector starts = planConverter->getStarts(); + std::vector lengths = planConverter->getLengths(); + // Construct the result iterator. + auto resIter = std::make_shared( + planNode, partitionIndex, absolutePaths, starts, lengths); + return resIter; + } + + std::string getTmpDirPath() const { + return tmpDir_->path; + } + + std::shared_ptr tmpDir_{ + exec::test::TempDirectoryPath::create()}; + }; + + // This method can be used to create a Fixed-width type of Vector without Null + // values. + template + VectorPtr createSpecificScalar( + size_t size, + std::vector vals, + facebook::velox::memory::MemoryPool& pool) { + facebook::velox::BufferPtr values = AlignedBuffer::allocate(size, &pool); + auto valuesPtr = values->asMutableRange(); + facebook::velox::BufferPtr nulls = nullptr; + for (size_t i = 0; i < size; ++i) { + valuesPtr[i] = vals[i]; + } + return std::make_shared>( + &pool, nulls, size, values, std::vector{}); + } + + // This method can be used to create a String type of Vector without Null + // values. + VectorPtr createSpecificStringVector( + size_t size, + std::vector vals, + facebook::velox::memory::MemoryPool& pool) { + auto vector = BaseVector::create(VARCHAR(), size, &pool); + auto flatVector = vector->asFlatVector(); + + size_t childSize = 0; + std::vector lengths(size); + size_t nullCount = 0; + for (size_t i = 0; i < size; ++i) { + auto notNull = true; + vector->setNull(i, !notNull); + auto len = vals[i].size(); + lengths[i] = len; + childSize += len; + } + vector->setNullCount(0); + + BufferPtr buf = AlignedBuffer::allocate(childSize, &pool); + char* bufPtr = buf->asMutable(); + char* dest = bufPtr; + for (size_t i = 0; i < size; ++i) { + std::string str = vals[i]; + const char* chr = str.c_str(); + auto length = str.size(); + memcpy(dest, chr, length); + dest = dest + length; + } + size_t offset = 0; + for (size_t i = 0; i < size; ++i) { + if (!vector->isNullAt(i)) { + flatVector->set( + i, facebook::velox::StringView(bufPtr + offset, lengths[i])); + offset += lengths[i]; + } + } + return vector; + } + + void genLineitemORC(const std::shared_ptr& veloxConverter) { + auto type = + ROW({"l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment"}, + {BIGINT(), + BIGINT(), + BIGINT(), + INTEGER(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + VARCHAR(), + VARCHAR(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + std::unique_ptr pool{ + facebook::velox::memory::getDefaultScopedMemoryPool()}; + std::vector vectors; + // TPC-H lineitem table has 16 columns. + int colNum = 16; + vectors.reserve(colNum); + std::vector lOrderkeyData = { + 4636438147, + 2012485446, + 1635327427, + 8374290148, + 2972204230, + 8001568994, + 989963396, + 2142695974, + 6354246853, + 4141748419}; + vectors.emplace_back( + createSpecificScalar(10, lOrderkeyData, *pool)); + std::vector lPartkeyData = { + 263222018, + 255918298, + 143549509, + 96877642, + 201976875, + 196938305, + 100260625, + 273511608, + 112999357, + 299103530}; + vectors.emplace_back( + createSpecificScalar(10, lPartkeyData, *pool)); + std::vector lSuppkeyData = { + 2102019, + 13998315, + 12989528, + 4717643, + 9976902, + 12618306, + 11940632, + 871626, + 1639379, + 3423588}; + vectors.emplace_back( + createSpecificScalar(10, lSuppkeyData, *pool)); + std::vector lLinenumberData = {4, 6, 1, 5, 1, 2, 1, 5, 2, 6}; + vectors.emplace_back( + createSpecificScalar(10, lLinenumberData, *pool)); + std::vector lQuantityData = { + 6.0, 1.0, 19.0, 4.0, 6.0, 12.0, 23.0, 11.0, 16.0, 19.0}; + vectors.emplace_back( + createSpecificScalar(10, lQuantityData, *pool)); + std::vector lExtendedpriceData = { + 30586.05, + 7821.0, + 1551.33, + 30681.2, + 1941.78, + 66673.0, + 6322.44, + 41754.18, + 8704.26, + 63780.36}; + vectors.emplace_back( + createSpecificScalar(10, lExtendedpriceData, *pool)); + std::vector lDiscountData = { + 0.05, 0.06, 0.01, 0.07, 0.05, 0.06, 0.07, 0.05, 0.06, 0.07}; + vectors.emplace_back( + createSpecificScalar(10, lDiscountData, *pool)); + std::vector lTaxData = { + 0.02, 0.03, 0.01, 0.0, 0.01, 0.01, 0.03, 0.07, 0.01, 0.04}; + vectors.emplace_back(createSpecificScalar(10, lTaxData, *pool)); + std::vector lReturnflagData = { + "N", "A", "A", "R", "A", "N", "A", "A", "N", "R"}; + vectors.emplace_back( + createSpecificStringVector(10, lReturnflagData, *pool)); + std::vector lLinestatusData = { + "O", "F", "F", "F", "F", "O", "F", "F", "O", "F"}; + vectors.emplace_back( + createSpecificStringVector(10, lLinestatusData, *pool)); + std::vector lShipdateNewData = { + 8953.666666666666, + 8773.666666666666, + 9034.666666666666, + 8558.666666666666, + 9072.666666666666, + 8864.666666666666, + 9004.666666666666, + 8778.666666666666, + 9013.666666666666, + 8832.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lShipdateNewData, *pool)); + std::vector lCommitdateNewData = { + 10447.666666666666, + 8953.666666666666, + 8325.666666666666, + 8527.666666666666, + 8438.666666666666, + 10049.666666666666, + 9036.666666666666, + 8666.666666666666, + 9519.666666666666, + 9138.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lCommitdateNewData, *pool)); + std::vector lReceiptdateNewData = { + 10456.666666666666, + 8979.666666666666, + 8299.666666666666, + 8474.666666666666, + 8525.666666666666, + 9996.666666666666, + 9103.666666666666, + 8726.666666666666, + 9593.666666666666, + 9178.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lReceiptdateNewData, *pool)); + std::vector lShipinstructData = { + "COLLECT COD", + "NONE", + "TAKE BACK RETURN", + "NONE", + "TAKE BACK RETURN", + "NONE", + "DELIVER IN PERSON", + "DELIVER IN PERSON", + "TAKE BACK RETURN", + "NONE"}; + vectors.emplace_back( + createSpecificStringVector(10, lShipinstructData, *pool)); + std::vector lShipmodeData = { + "FOB", + "REG AIR", + "MAIL", + "FOB", + "RAIL", + "SHIP", + "REG AIR", + "REG AIR", + "TRUCK", + "AIR"}; + vectors.emplace_back(createSpecificStringVector(10, lShipmodeData, *pool)); + std::vector lCommentData = { + " the furiously final foxes. quickly final p", + "thely ironic", + "ate furiously. even, pending pinto bean", + "ackages af", + "odolites. slyl", + "ng the regular requests sleep above", + "lets above the slyly ironic theodolites sl", + "lyly regular excuses affi", + "lly unusual theodolites grow slyly above", + " the quickly ironic pains lose car"}; + vectors.emplace_back(createSpecificStringVector(10, lCommentData, *pool)); + + // Batches has only one RowVector here. + uint64_t nullCount = 0; + std::vector batches{std::make_shared( + pool.get(), type, nullptr, 10, vectors, nullCount)}; + + // Writes data into an ORC file. + auto sink = std::make_unique( + veloxConverter->getTmpDirPath() + "/mock_lineitem.orc"); + auto config = std::make_shared(); + const int64_t writerMemoryCap = std::numeric_limits::max(); + facebook::velox::dwrf::WriterOptions options; + options.config = config; + options.schema = type; + options.memoryBudget = writerMemoryCap; + options.flushPolicyFactory = nullptr; + options.layoutPlannerFactory = nullptr; + auto writer = std::make_unique( + options, + std::move(sink), + facebook::velox::memory::getProcessDefaultMemoryManager().getRoot()); + for (size_t i = 0; i < batches.size(); ++i) { + writer->write(batches[i]); + } + writer->close(); + } + + // Used to find the Velox path according current path. + std::string getVeloxPath() { + std::string veloxPath; + std::string currentPath = fs::current_path().c_str(); + size_t pos = 0; + + if ((pos = currentPath.find("project")) != std::string::npos) { + // In Github test, the Velox home is /root/project. + veloxPath = currentPath.substr(0, pos) + "project"; + } else if ((pos = currentPath.find("velox")) != std::string::npos) { + veloxPath = currentPath.substr(0, pos) + "velox"; + } else if ((pos = currentPath.find("fbcode")) != std::string::npos) { + veloxPath = currentPath; + } else { + throw std::runtime_error("Current path is not a valid Velox path."); + } + return veloxPath; + } +}; + +// This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's +// computing will be tested based on the generated ORC file. +// Input: Json file of the Substrait plan for the first stage of below modified +// TPC-H Q6 query: +// +// select sum(l_extendedprice*l_discount) as revenue from lineitem where +// l_shipdate >= 8766 and l_shipdate < 9131 and l_discount between .06 +// - 0.01 and .06 + 0.01 and l_quantity < 24 +// +// Tested Velox computings include: TableScan (Filter Pushdown) + Project + +// Aggregate +// Output: the Velox computed Aggregation result + +TEST_P(PlanConversionTest, q6FirstStageTest) { + auto veloxConverter = std::make_shared(); + std::string veloxPath = getVeloxPath(); + genLineitemORC(veloxConverter); + // Find and deserialize Substrait plan json file. + std::string subPlanPath = veloxPath + "/velox/substrait/tests/q6_first.json"; + auto resIter = veloxConverter->getResIter(subPlanPath); + while (resIter->HasNext()) { + auto rv = resIter->Next(); + auto size = rv->size(); + ASSERT_EQ(size, 1); + std::string res = rv->toString(0); + ASSERT_EQ(res, "{ [child at 0]: 13613.1921}"); + } +} + +// This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's +// computing will be tested based on the generated ORC file. +// Input: Json file of the Substrait plan for the first stage of the below +// modified TPC-H Q1 query: +// +// select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, +// sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - +// l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + +// l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as +// avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem +// where l_shipdate <= 10471 group by l_returnflag, l_linestatus order by +// l_returnflag, l_linestatus +// +// Tested Velox computings include: TableScan (Filter Pushdown) + Project + +// Aggregate +// Output: the Velox computed Aggregation result + +TEST_P(PlanConversionTest, q1FirstStageTest) { + auto veloxConverter = std::make_shared(); + std::string veloxPath = getVeloxPath(); + genLineitemORC(veloxConverter); + // Find and deserialize Substrait plan json file. + std::string subPlanPath = veloxPath + "/velox/substrait/tests/q1_first.json"; + auto resIter = veloxConverter->getResIter(subPlanPath); + while (resIter->HasNext()) { + auto rv = resIter->Next(); + auto size = rv->size(); + ASSERT_EQ(size, 3); + ASSERT_EQ( + rv->toString(0), + "{ [child at 0]: N, O, 34, 105963.31, 99911.3719, 101201.05309399999, 34, 3, 105963.31, 3, 0.16999999999999998, 3, 3}"); + ASSERT_EQ( + rv->toString(1), + "{ [child at 1]: A, F, 60, 59390.729999999996, 56278.5879, 59485.994223, 60, 5, 59390.729999999996, 5, 0.24, 5, 5}"); + ASSERT_EQ( + rv->toString(2), + "{ [child at 2]: R, F, 23, 94461.56, 87849.2508, 90221.880192, 23, 2, 94461.56, 2, 0.14, 2, 2}"); + } +} + +VELOX_INSTANTIATE_TEST_SUITE_P( + PlanConversionTests, + PlanConversionTest, + testing::Values(true, false)); diff --git a/velox/substrait/tests/q1_first.json b/velox/substrait/tests/q1_first.json new file mode 100644 index 000000000000..e11d8a7f05a1 --- /dev/null +++ b/velox/substrait/tests/q1_first.json @@ -0,0 +1,720 @@ +{ + "extension_uris": [], + "extensions": [ + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 1, + "name": "lte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 6, + "name": "sum:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 3, + "name": "subtract:opt_fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 0, + "name": "is_not_null:fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 2, + "name": "and:bool_bool" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 5, + "name": "add:opt_fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 7, + "name": "avg:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 4, + "name": "multiply:opt_fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 8, + "name": "count:opt_i32" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "base_schema": { + "names": [ + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate" + ], + "struct": { + "types": [ + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "type_variation_reference": 0, + "nullability": "NULLABILITY_UNSPECIFIED" + } + }, + "filter": { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 6 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 6 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 10471 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + "local_files": { + "items": [ + { + "format": "FILE_FORMAT_UNSPECIFIED", + "partition_index": "0", + "start": "0", + "length": "3719", + "uri_file": "/mock_lineitem.orc" + } + ] + } + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 4 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 5 + } + } + } + } + ] + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 4 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 5 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "literal": { + "nullable": false, + "fp64": 1 + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "literal": { + "nullable": false, + "fp64": 1 + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 5, + "args": [ + { + "literal": { + "nullable": false, + "fp64": 1 + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "i32": 1 + } + } + ] + } + }, + "groupings": [ + { + "grouping_expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + } + ] + } + ], + "measures": [ + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 4 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 5 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 7, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 7, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 7, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 6 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 8, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 7 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "i64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + } + } + } + ] + } + }, + "names": [ + "real_arrow_output", + "l_returnflag", + "l_linestatus", + "sum", + "sum", + "sum", + "sum", + "sum", + "count", + "sum", + "count", + "sum", + "count", + "count" + ] + } + } + ], + "expected_type_urls": [] +} diff --git a/velox/substrait/tests/q6_first.json b/velox/substrait/tests/q6_first.json new file mode 100644 index 000000000000..a48bf5da285e --- /dev/null +++ b/velox/substrait/tests/q6_first.json @@ -0,0 +1,523 @@ +{ + "extension_uris": [], + "extensions": [ + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 4, + "name": "lte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 6, + "name": "sum:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 3, + "name": "lt:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 0, + "name": "is_not_null:fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 1, + "name": "and:bool_bool" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 2, + "name": "gte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 5, + "name": "multiply:opt_fp64_fp64" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "base_schema": { + "names": [ + "l_quantity", + "l_extendedprice", + "l_discount", + "l_shipdate" + ], + "struct": { + "types": [ + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "type_variation_reference": 0, + "nullability": "NULLABILITY_UNSPECIFIED" + } + }, + "filter": { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 8766 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 9131 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.05 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.07 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 24 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + "local_files": { + "items": [ + { + "format": "FILE_FORMAT_UNSPECIFIED", + "partition_index": "0", + "start": "0", + "length": "3719", + "uri_file": "/mock_lineitem.orc" + } + ] + } + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ] + } + }, + "expressions": [ + { + "scalar_function": { + "function_reference": 5, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ] + } + }, + "groupings": [ + { + "grouping_expressions": [] + } + ], + "measures": [ + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ] + } + }, + "names": [ + "real_arrow_output", + "sum" + ] + } + } + ], + "expected_type_urls": [] +}