From b73b95e5e460c536b1bbd9292953c74d245b1f00 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Fri, 6 May 2022 17:43:09 +0800 Subject: [PATCH] Support more Substrait-to-Velox conversions and added validations (#7) Always compile Substrait (#8) --- velox/CMakeLists.txt | 4 +- velox/substrait/CMakeLists.txt | 3 +- velox/substrait/SubstraitParser.cpp | 67 +- velox/substrait/SubstraitParser.h | 21 +- velox/substrait/SubstraitToVeloxExpr.cpp | 24 +- velox/substrait/SubstraitToVeloxExpr.h | 3 +- velox/substrait/SubstraitToVeloxPlan.cpp | 290 +++++-- velox/substrait/SubstraitToVeloxPlan.h | 120 ++- .../SubstraitToVeloxPlanValidator.cpp | 340 +++++++++ .../substrait/SubstraitToVeloxPlanValidator.h | 80 ++ velox/substrait/tests/PlanConversionTest.cpp | 569 ++++++++++++++ velox/substrait/tests/q1_first.json | 720 ++++++++++++++++++ velox/substrait/tests/q6_first.json | 523 +++++++++++++ 13 files changed, 2649 insertions(+), 115 deletions(-) create mode 100644 velox/substrait/SubstraitToVeloxPlanValidator.cpp create mode 100644 velox/substrait/SubstraitToVeloxPlanValidator.h create mode 100644 velox/substrait/tests/PlanConversionTest.cpp create mode 100644 velox/substrait/tests/q1_first.json create mode 100644 velox/substrait/tests/q6_first.json diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 3f24ef512fad..2d61264980e7 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -71,6 +71,6 @@ if(${VELOX_CODEGEN_SUPPORT}) endif() # substrait converter -if(${VELOX_ENABLE_SUBSTRAIT}) +# if(${VELOX_ENABLE_SUBSTRAIT}) add_subdirectory(substrait) -endif() +# endif() diff --git a/velox/substrait/CMakeLists.txt b/velox/substrait/CMakeLists.txt index 007162207951..d5e5d28506ae 100644 --- a/velox/substrait/CMakeLists.txt +++ b/velox/substrait/CMakeLists.txt @@ -54,7 +54,8 @@ set(SRCS VeloxToSubstraitPlan.cpp VeloxToSubstraitType.cpp VeloxSubstraitSignature.cpp - VariantToVectorConverter.cpp) + VariantToVectorConverter.cpp + SubstraitToVeloxPlanValidator.cpp) add_library(velox_substrait_plan_converter ${SRCS}) target_include_directories(velox_substrait_plan_converter diff --git a/velox/substrait/SubstraitParser.cpp b/velox/substrait/SubstraitParser.cpp index 8ffecbefa2a5..51aa7e62e0ce 100644 --- a/velox/substrait/SubstraitParser.cpp +++ b/velox/substrait/SubstraitParser.cpp @@ -28,7 +28,7 @@ std::shared_ptr SubstraitParser::parseType( switch (substraitType.kind_case()) { case ::substrait::Type::KindCase::kBool: { typeName = "BOOLEAN"; - nullability = substraitType.bool_().nullability(); + nullability = sType.bool_().nullability(); break; } case ::substrait::Type::KindCase::kI8: { @@ -203,7 +203,7 @@ int SubstraitParser::getIdxFromNodeName(const std::string& nodeName) { } } -const std::string& SubstraitParser::findFunctionSpec( +std::string SubstraitParser::findSubstraitFuncSpec( const std::unordered_map& functionMap, uint64_t id) const { if (functionMap.find(id) == functionMap.end()) { @@ -214,6 +214,65 @@ const std::string& SubstraitParser::findFunctionSpec( return map[id]; } +std::string SubstraitParser::getFunctionName( + const std::string& functionSpec) const { + // Get the position of ":" in the function name. + std::size_t pos = functionSpec.find(":"); + if (pos == std::string::npos) { + return functionSpec; + } + return functionSpec.substr(0, pos); +} + +void SubstraitParser::getFunctionTypes( + const std::string& functionSpec, + std::vector& types) const { + types.clear(); + // Get the position of ":" in the function name. + std::size_t pos = functionSpec.find(":"); + // Get the parameter types. + std::string funcTypes; + if (pos == std::string::npos) { + return; + } else { + if (pos == functionSpec.size() - 1) { + return; + } + funcTypes = functionSpec.substr(pos + 1); + } + // Split the types with delimiter. + std::string delimiter = "_"; + while ((pos = funcTypes.find(delimiter)) != std::string::npos) { + types.emplace_back(funcTypes.substr(0, pos)); + funcTypes.erase(0, pos + delimiter.length()); + } + types.emplace_back(funcTypes); +} + +void SubstraitParser::getSubFunctionTypes( + const std::string& subFuncSpec, + std::vector& types) const { + // Get the position of ":" in the function name. + std::size_t pos = subFuncSpec.find(":"); + // Get the parameter types. + std::string funcTypes; + if (pos == std::string::npos) { + funcTypes = subFuncSpec; + } else { + if (pos == subFuncSpec.size() - 1) { + return; + } + funcTypes = subFuncSpec.substr(pos + 1); + } + // Split the types with delimiter. + std::string delimiter = "_"; + while ((pos = funcTypes.find(delimiter)) != std::string::npos) { + types.emplace_back(funcTypes.substr(0, pos)); + funcTypes.erase(0, pos + delimiter.length()); + } + types.emplace_back(funcTypes); +} + std::string SubstraitParser::findVeloxFunction( const std::unordered_map& functionMap, uint64_t id) const { @@ -223,8 +282,8 @@ std::string SubstraitParser::findVeloxFunction( } std::string SubstraitParser::mapToVeloxFunction( - const std::string& substraitFunction) const { - auto it = substraitVeloxFunctionMap_.find(substraitFunction); + const std::string& subFunc) const { + auto it = substraitVeloxFunctionMap_.find(subFunc); if (it != substraitVeloxFunctionMap_.end()) { return it->second; } diff --git a/velox/substrait/SubstraitParser.h b/velox/substrait/SubstraitParser.h index a76db95efc1d..c0ff587d1a9f 100644 --- a/velox/substrait/SubstraitParser.h +++ b/velox/substrait/SubstraitParser.h @@ -55,11 +55,10 @@ class SubstraitParser { /// Make node name in the format of n{nodeId}_{colIdx}. std::string makeNodeName(int nodeId, int colIdx); - /// Get the column index from a node name in the format of - /// n{nodeId}_{colIdx}. + /// Used to get the column index from node name. int getIdxFromNodeName(const std::string& nodeName); - /// Find the Substrait function name according to the function id + /// Used to find the Substrait function name according to the function id /// from a pre-constructed function map. The function specification can be /// a simple name or a compound name. The compound name format is: /// :__..._. @@ -70,7 +69,21 @@ class SubstraitParser { const std::unordered_map& functionMap, uint64_t id) const; - /// Find the Velox function name according to the function id + /// Extracts the function name for a function from specified compound name. + /// When the input is a simple name, it will be returned. + std::string getFunctionName(const std::string& functionSpec) const; + + /// Extracts argument types for a function from specified compound name. + void getFunctionTypes( + const std::string& functionSpec, + std::vector& types) const; + + /// This function is used get the types from the compound name. + void getSubFunctionTypes( + const std::string& subFuncSpec, + std::vector& types) const; + + /// Used to find the Velox function name according to the function id /// from a pre-constructed function map. std::string findVeloxFunction( const std::unordered_map& functionMap, diff --git a/velox/substrait/SubstraitToVeloxExpr.cpp b/velox/substrait/SubstraitToVeloxExpr.cpp index 347b65713cbe..33e9ff04910b 100644 --- a/velox/substrait/SubstraitToVeloxExpr.cpp +++ b/velox/substrait/SubstraitToVeloxExpr.cpp @@ -27,8 +27,8 @@ SubstraitVeloxExprConverter::toVeloxExpr( switch (typeCase) { case ::substrait::Expression::FieldReference::ReferenceTypeCase:: kDirectReference: { - const auto& directRef = substraitField.direct_reference(); - int32_t colIdx = substraitParser_.parseReferenceSegment(directRef); + const auto& dRef = sField.direct_reference(); + int32_t colIdx = subParser_->parseReferenceSegment(dRef); const auto& inputNames = inputType->names(); const int64_t inputSize = inputNames.size(); if (colIdx <= inputSize) { @@ -57,10 +57,19 @@ SubstraitVeloxExprConverter::toVeloxExpr( for (const auto& sArg : substraitFunc.arguments()) { params.emplace_back(toVeloxExpr(sArg.value(), inputType)); } - const auto& veloxFunction = substraitParser_.findVeloxFunction( - functionMap_, substraitFunc.function_reference()); - std::string typeName = - substraitParser_.parseType(substraitFunc.output_type())->type; + const auto& veloxFunction = + subParser_->findVeloxFunction(functionMap_, sFunc.function_reference()); + const auto& veloxType = + toVeloxType(subParser_->parseType(sFunc.output_type())->type); + + // Omit alias because because name change is not needed. + if (veloxFunction == "alias") { + if (params.size() != 1) { + VELOX_FAIL("Alias expects one parameter."); + } + return params[0]; + } + return std::make_shared( toVeloxType(typeName), std::move(params), veloxFunction); } @@ -88,8 +97,7 @@ SubstraitVeloxExprConverter::toVeloxExpr( return std::make_shared( variant(substraitLit.fp32())); case ::substrait::Expression_Literal::LiteralTypeCase::kI64: - return std::make_shared( - variant(substraitLit.i64())); + return std::make_shared(variant(sLit.i64())); case ::substrait::Expression_Literal::LiteralTypeCase::kFp64: return std::make_shared( variant(substraitLit.fp64())); diff --git a/velox/substrait/SubstraitToVeloxExpr.h b/velox/substrait/SubstraitToVeloxExpr.h index 5af58fd441f0..51ad406d3981 100644 --- a/velox/substrait/SubstraitToVeloxExpr.h +++ b/velox/substrait/SubstraitToVeloxExpr.h @@ -68,7 +68,8 @@ class SubstraitVeloxExprConverter { /// The Substrait parser used to convert Substrait representations into /// recognizable representations. - SubstraitParser substraitParser_; + std::shared_ptr subParser_ = + std::make_shared(); /// The map storing the relations between the function id and the function /// name. diff --git a/velox/substrait/SubstraitToVeloxPlan.cpp b/velox/substrait/SubstraitToVeloxPlan.cpp index 69d560c85992..f16fa7dff0b3 100644 --- a/velox/substrait/SubstraitToVeloxPlan.cpp +++ b/velox/substrait/SubstraitToVeloxPlan.cpp @@ -55,26 +55,33 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( VELOX_FAIL("Child Rel is expected in AggregateRel."); } core::AggregationNode::Step aggStep = toAggregationStep(aggRel); + return toVeloxAgg(sAgg, childNode, aggStep); +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxAgg( + const ::substrait::AggregateRel& sAgg, + const std::shared_ptr& childNode, + const core::AggregationNode::Step& aggStep) { const auto& inputType = childNode->outputType(); - std::vector veloxGroupingExprs; + std::vector> + veloxGroupingExprs; // Get the grouping expressions. - for (const auto& grouping : aggRel.groupings()) { + uint32_t groupingOutIdx = 0; + for (const auto& grouping : sAgg.groupings()) { for (const auto& groupingExpr : grouping.grouping_expressions()) { // Velox's groupings are limited to be Field. veloxGroupingExprs.emplace_back( exprConverter_->toVeloxExpr(groupingExpr.selection(), inputType)); + groupingOutIdx += 1; } } // Parse measures and get the aggregate expressions. // Each measure represents one aggregate expression. - std::vector aggExprs; - aggExprs.reserve(aggRel.measures().size()); - std::vector aggregateMasks; - aggregateMasks.reserve(aggRel.measures().size()); - - for (const auto& measure : aggRel.measures()) { + std::vector> aggExprs; + aggExprs.reserve(sAgg.measures().size()); + for (const auto& smea : sAgg.measures()) { core::FieldAccessTypedExprPtr aggregateMask; ::substrait::Expression substraitAggMask = measure.filter(); // Get Aggregation Masks. @@ -88,37 +95,38 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } aggregateMasks.push_back(aggregateMask); } - - const auto& aggFunction = measure.measure(); - auto funcName = substraitParser_->findVeloxFunction( + const auto& aggFunction = smea.measure(); + std::string funcName = subParser_->findVeloxFunction( functionMap_, aggFunction.function_reference()); - std::vector aggParams; + std::vector> aggParams; aggParams.reserve(aggFunction.arguments().size()); for (const auto& arg : aggFunction.arguments()) { - aggParams.emplace_back( - exprConverter_->toVeloxExpr(arg.value(), inputType)); + aggParams.emplace_back(exprConverter_->toVeloxExpr( + getExprFromFunctionArgument(arg), inputType)); } - auto aggVeloxType = toVeloxType( - substraitParser_->parseType(aggFunction.output_type())->type); + auto aggVeloxType = + toVeloxType(subParser_->parseType(aggFunction.output_type())->type); auto aggExpr = std::make_shared( aggVeloxType, std::move(aggParams), funcName); aggExprs.emplace_back(aggExpr); } bool ignoreNullKeys = false; - std::vector preGroupingExprs; + std::vector> aggregateMasks( + sAgg.measures().size()); + std::vector> + preGroupingExprs = {}; // Get the output names of Aggregation. std::vector aggOutNames; - aggOutNames.reserve(aggRel.measures().size()); - for (int idx = veloxGroupingExprs.size(); - idx < veloxGroupingExprs.size() + aggRel.measures().size(); + aggOutNames.reserve(sAgg.measures().size()); + for (int idx = groupingOutIdx; idx < groupingOutIdx + sAgg.measures().size(); idx++) { - aggOutNames.emplace_back(substraitParser_->makeNodeName(planNodeId_, idx)); + aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); } // Create Aggregate node. - return std::make_shared( + auto aggNode = std::make_shared( nextPlanNodeId(), aggStep, veloxGroupingExprs, @@ -128,6 +136,7 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( aggregateMasks, ignoreNullKeys, childNode); + return aggNode; } core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( @@ -138,9 +147,19 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } else { VELOX_FAIL("Child Rel is expected in ProjectRel."); } +} +core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::ProjectRel& projectRel, + memory::MemoryPool* pool) { + core::PlanNodePtr childNode; + if (projectRel.has_input()) { + childNode = toVeloxPlan(projectRel.input(), pool); + } else { + VELOX_FAIL("Child Rel is expected in ProjectRel."); + } // Construct Velox Expressions. - auto projectExprs = projectRel.expressions(); + const auto& projectExprs = sProject.expressions(); std::vector projectNames; std::vector expressions; projectNames.reserve(projectExprs.size()); @@ -181,40 +200,56 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::ReadRel& readRel, - std::shared_ptr& splitInfo) { + const ::substrait::ReadRel& sRead) { + // Check if the ReadRel specifies an input of stream. If yes, the pre-built + // input node will be used as the data source. + auto splitInfo = std::make_shared(); + auto streamIdx = streamIsInput(sRead); + if (streamIdx >= 0) { + if (inputNodesMap_.find(streamIdx) == inputNodesMap_.end()) { + VELOX_FAIL( + "Could not find source index {} in input nodes map.", streamIdx); + } + auto streamNode = inputNodesMap_[streamIdx]; + splitInfo->isStream = true; + splitInfoMap_[streamNode->id()] = splitInfo; + return streamNode; + } + + // Otherwise, will create TableScan node for ReadRel. // Get output names and types. std::vector colNameList; std::vector veloxTypeList; - if (readRel.has_base_schema()) { - const auto& baseSchema = readRel.base_schema(); + if (sRead.has_base_schema()) { + const auto& baseSchema = sRead.base_schema(); colNameList.reserve(baseSchema.names().size()); for (const auto& name : baseSchema.names()) { colNameList.emplace_back(name); } - auto substraitTypeList = substraitParser_->parseNamedStruct(baseSchema); + auto substraitTypeList = subParser_->parseNamedStruct(baseSchema); veloxTypeList.reserve(substraitTypeList.size()); for (const auto& substraitType : substraitTypeList) { veloxTypeList.emplace_back(toVeloxType(substraitType->type)); } } - // Parse local files - if (readRel.has_local_files()) { + // Parse local files and construct split info. + if (sRead.has_local_files()) { using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; - const auto& fileList = readRel.local_files().items(); + const auto& fileList = sRead.local_files().items(); splitInfo->paths.reserve(fileList.size()); splitInfo->starts.reserve(fileList.size()); splitInfo->lengths.reserve(fileList.size()); for (const auto& file : fileList) { - // Expect all files to share the same index. + // Expect all Partitions share the same index. splitInfo->partitionIndex = file.partition_index(); splitInfo->paths.emplace_back(file.uri_file()); splitInfo->starts.emplace_back(file.start()); splitInfo->lengths.emplace_back(file.length()); switch (file.file_format_case()) { case SubstraitFileFormatCase::kOrc: + case SubstraitFileFormatCase::kDwrf: splitInfo->format = dwio::common::FileFormat::DWRF; break; case SubstraitFileFormatCase::kParquet: @@ -225,14 +260,13 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } } } - // Do not hard-code connector ID and allow for connectors other than Hive. static const std::string kHiveConnectorId = "test-hive"; // Velox requires Filter Pushdown must being enabled. bool filterPushdownEnabled = true; std::shared_ptr tableHandle; - if (!readRel.has_filter()) { + if (!sRead.has_filter()) { tableHandle = std::make_shared( kHiveConnectorId, "hive_table", @@ -240,14 +274,56 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( connector::hive::SubfieldFilters{}, nullptr); } else { - connector::hive::SubfieldFilters filters = - toVeloxFilter(colNameList, veloxTypeList, readRel.filter()); + // Flatten the conditions connected with 'and'. + std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; + std::vector<::substrait::Expression_SingularOrList> singularOrLists; + flattenConditions(sRead.filter(), scalarFunctions, singularOrLists); + + std::unordered_map> rangeRecorders; + for (uint32_t idx = 0; idx < veloxTypeList.size(); idx++) { + rangeRecorders[idx] = std::make_shared(); + } + + // Separate the filters to be two parts. The subfield part can be + // pushed down. + std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions; + std::vector<::substrait::Expression_SingularOrList> subfieldrOrLists; + + std::vector<::substrait::Expression_ScalarFunction> remainingFunctions; + std::vector<::substrait::Expression_SingularOrList> remainingrOrLists; + + separateFilters( + rangeRecorders, + scalarFunctions, + subfieldFunctions, + remainingFunctions, + singularOrLists, + subfieldrOrLists, + remainingrOrLists); + + // Create subfield filters based on the constructed filter info map. + connector::hive::SubfieldFilters subfieldFilters = toSubfieldFilters( + colNameList, veloxTypeList, subfieldFunctions, subfieldrOrLists); + // Connect the remaining filters with 'and'. + std::shared_ptr remainingFilter; + + if (!isPushDownSupportedByFormat(splitInfo->format, subfieldFilters)) { + // A subfieldFilter is not supported by the format, + // mark all filter as remaining filters. + subfieldFilters.clear(); + remainingFilter = connectWithAnd( + colNameList, veloxTypeList, scalarFunctions, singularOrLists); + } else { + remainingFilter = connectWithAnd( + colNameList, veloxTypeList, remainingFunctions, remainingrOrLists); + } + tableHandle = std::make_shared( kHiveConnectorId, "hive_table", filterPushdownEnabled, - std::move(filters), - nullptr); + std::move(subfieldFilters), + remainingFilter); } // Get assignments and out names. @@ -256,7 +332,7 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( std::unordered_map> assignments; for (int idx = 0; idx < colNameList.size(); idx++) { - auto outName = substraitParser_->makeNodeName(planNodeId_, idx); + auto outName = subParser_->makeNodeName(planNodeId_, idx); assignments[outName] = std::make_shared( colNameList[idx], connector::hive::HiveColumnHandle::ColumnType::kRegular, @@ -265,12 +341,13 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( } auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); - if (readRel.has_virtual_table()) { - return toVeloxPlan(readRel, outputType); - + if (sRead.has_virtual_table()) { + return toVeloxPlan(sRead, outputType); } else { auto tableScanNode = std::make_shared( nextPlanNodeId(), outputType, tableHandle, assignments); + // Set split info map. + splitInfoMap_[tableScanNode->id()] = splitInfo; return tableScanNode; } } @@ -286,13 +363,7 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( std::vector vectors; vectors.reserve(numVectors); - int64_t batchSize; - // For the empty vectors, eg,vectors = makeRowVector(ROW({}, {}), 1). - if (numColumns == 0) { - batchSize = 1; - } else { - batchSize = valueFieldNums / numColumns; - } + int64_t batchSize = valueFieldNums / numColumns; for (int64_t index = 0; index < numVectors; ++index) { std::vector children; @@ -372,26 +443,39 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( VELOX_CHECK( checkTypeExtension(substraitPlan), "The type extension only have unknown type.") - // Construct the function map based on the Substrait representation. + // Construct the function map based on the Substrait representation, + // and initialize the expression converter with it. constructFunctionMap(substraitPlan); - // Construct the expression converter. - exprConverter_ = - std::make_shared(pool_, functionMap_); - // In fact, only one RelRoot or Rel is expected here. VELOX_CHECK_EQ(substraitPlan.relations_size(), 1); - const auto& rel = substraitPlan.relations(0); - if (rel.has_root()) { - return toVeloxPlan(rel.root()); + const auto& sRel = substraitPlan.relations(0); + if (sRel.has_root()) { + return toVeloxPlan(sRel.root()); } - if (rel.has_rel()) { - return toVeloxPlan(rel.rel()); + if (sRel.has_rel()) { + return toVeloxPlan(sRel.rel()); } VELOX_FAIL("RelRoot or Rel is expected in Plan."); } +void SubstraitVeloxPlanConverter::constructFunctionMap( + const ::substrait::Plan& substraitPlan) { + // Construct the function map based on the Substrait representation. + for (const auto& sExtension : substraitPlan.extensions()) { + if (!sExtension.has_extension_function()) { + continue; + } + const auto& sFmap = sExtension.extension_function(); + auto id = sFmap.function_anchor(); + auto name = sFmap.name(); + functionMap_[id] = name; + } + exprConverter_ = + std::make_shared(pool_, functionMap_); +} + std::string SubstraitVeloxPlanConverter::nextPlanNodeId() { auto id = fmt::format("{}", planNodeId_); planNodeId_++; @@ -465,23 +549,22 @@ connector::hive::SubfieldFilters SubstraitVeloxPlanConverter::toVeloxFilter( for (const auto& scalarFunction : scalarFunctions) { auto filterNameSpec = substraitParser_->findFunctionSpec( functionMap_, scalarFunction.function_reference()); - auto filterName = getNameBeforeDelimiter(filterNameSpec, ":"); + auto filterName = substraitParser_->getFunctionName(filterNameSpec); int32_t colIdx; // TODO: Add different types' support here. double val; - for (auto& arg : scalarFunction.arguments()) { - auto argExpr = arg.value(); - auto typeCase = argExpr.rex_type_case(); + for (auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); switch (typeCase) { case ::substrait::Expression::RexTypeCase::kSelection: { - auto sel = argExpr.selection(); + auto sel = param.selection(); // TODO: Only direct reference is considered here. auto dRef = sel.direct_reference(); colIdx = substraitParser_->parseReferenceSegment(dRef); break; } case ::substrait::Expression::RexTypeCase::kLiteral: { - auto sLit = argExpr.literal(); + auto sLit = param.literal(); // TODO: Only double is considered here. val = sLit.fp64(); break; @@ -511,8 +594,10 @@ connector::hive::SubfieldFilters SubstraitVeloxPlanConverter::toVeloxFilter( // Construct the Filters. for (int idx = 0; idx < inputNameList.size(); idx++) { auto filterInfo = colInfoMap[idx]; - double leftBound; - double rightBound; + // Set the left bound to be negative infinity. + double leftBound = -1.0 / 0.0; + // Set the right bound to be positive infinity. + double rightBound = 1.0 / 0.0; bool leftUnbounded = true; bool rightUnbounded = true; bool leftExclusive = false; @@ -553,15 +638,82 @@ void SubstraitVeloxPlanConverter::flattenConditions( auto filterNameSpec = substraitParser_->findFunctionSpec( functionMap_, sFunc.function_reference()); // TODO: Only and relation is supported here. - if (getNameBeforeDelimiter(filterNameSpec, ":") == "and") { + if (substraitParser_->getFunctionName(filterNameSpec) == "and") { + for (const auto& sCondition : sFunc.args()) { + flattenConditions(sCondition, scalarFunctions); + } + } else { + scalarFunctions.emplace_back(sFunc); + } + break; + } + default: + VELOX_NYI("GetFlatConditions not supported for type '{}'", typeCase); + } +} + +std::string SubstraitVeloxPlanConverter::findFuncSpec(uint64_t id) { + return subParser_->findSubstraitFuncSpec(functionMap_, id); +} + +int32_t SubstraitVeloxPlanConverter::streamIsInput( + const ::substrait::ReadRel& sRead) { + if (sRead.has_local_files()) { + const auto& fileList = sRead.local_files().items(); + if (fileList.size() == 0) { + VELOX_FAIL("At least one file path is expected."); + } + + // The stream input will be specified with the format of + // "iterator:${index}". + std::string filePath = fileList[0].uri_file(); + std::string prefix = "iterator:"; + std::size_t pos = filePath.find(prefix); + if (pos == std::string::npos) { + return -1; + } + + // Get the index. + std::string idxStr = filePath.substr(pos + prefix.size(), filePath.size()); + try { + return stoi(idxStr); + } catch (const std::exception& err) { + VELOX_FAIL(err.what()); + } + } + if (validationMode_) { + return -1; + } + VELOX_FAIL("Local file is expected."); +} + +void SubstraitVeloxPlanConverter::flattenConditions( + const ::substrait::Expression& substraitFilter, + std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions, + std::vector<::substrait::Expression_SingularOrList>& singularOrLists) { + auto typeCase = substraitFilter.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kScalarFunction: { + auto sFunc = substraitFilter.scalar_function(); + auto filterNameSpec = subParser_->findSubstraitFuncSpec( + functionMap_, sFunc.function_reference()); + // TODO: Only and relation is supported here. + if (subParser_->getSubFunctionName(filterNameSpec) == "and") { for (const auto& sCondition : sFunc.arguments()) { - flattenConditions(sCondition.value(), scalarFunctions); + flattenConditions( + getExprFromFunctionArgument(sCondition), + scalarFunctions, + singularOrLists); } } else { scalarFunctions.emplace_back(sFunc); } break; } + case ::substrait::Expression::RexTypeCase::kSingularOrList: { + singularOrLists.emplace_back(substraitFilter.singular_or_list()); + break; + } default: VELOX_NYI("GetFlatConditions not supported for type '{}'", typeCase); } diff --git a/velox/substrait/SubstraitToVeloxPlan.h b/velox/substrait/SubstraitToVeloxPlan.h index 533a2f37963c..cb09da7ee241 100644 --- a/velox/substrait/SubstraitToVeloxPlan.h +++ b/velox/substrait/SubstraitToVeloxPlan.h @@ -57,9 +57,7 @@ class SubstraitVeloxPlanConverter { /// Index: the index of the partition this item belongs to. /// Starts: the start positions in byte to read from the items. /// Lengths: the lengths in byte to read from the items. - core::PlanNodePtr toVeloxPlan( - const ::substrait::ReadRel& readRel, - std::shared_ptr& splitInfo); + core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead); /// Convert Substrait ReadRel into Velox Values Node. core::PlanNodePtr toVeloxPlan( @@ -78,15 +76,43 @@ class SubstraitVeloxPlanConverter { /// Check the Substrait type extension only has one unknown extension. bool checkTypeExtension(const ::substrait::Plan& substraitPlan); - /// Construct the function map between the index and the Substrait function - /// name. - void constructFunctionMap(const ::substrait::Plan& substraitPlan); - - /// Return the function map used by this plan converter. - const std::unordered_map& getFunctionMap() const { + /// Convert Substrait ReadRel into Velox PlanNode. + /// Index: the index of the partition this item belongs to. + /// Starts: the start positions in byte to read from the items. + /// Lengths: the lengths in byte to read from the items. + std::shared_ptr toVeloxPlan( + const ::substrait::ReadRel& sRead, + u_int32_t& index, + std::vector& paths, + std::vector& starts, + std::vector& lengths); + + /// Used to convert Substrait Rel into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::Rel& sRel); + + /// Used to convert Substrait RelRoot into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::RelRoot& sRoot); + + /// Used to convert Substrait Plan into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::Plan& sPlan); + + /// Used to construct the function map between the index + /// and the Substrait function name. + void constructFuncMap(const ::substrait::Plan& sPlan); + + /// Will return the function map used by this plan converter. + const std::unordered_map& getFunctionMap() { return functionMap_; } + /// Will return the index of Partition to be scanned. + u_int32_t getPartitionIndex() { + return partitionIndex_; + } + /// Return the splitInfo map used by this plan converter. const std::unordered_map>& splitInfos() const { @@ -100,41 +126,77 @@ class SubstraitVeloxPlanConverter { /// name>:__..._ const std::string& findFunction(uint64_t id) const; - private: - /// Returns unique ID to use for plan node. Produces sequential numbers - /// starting from zero. - std::string nextPlanNodeId(); + /// Used to insert certain plan node as input. The plan node + /// id will start from the setted one. + void insertInputNode( + uint64_t inputIdx, + const std::shared_ptr& inputNode, + int planNodeId) { + inputNodesMap_[inputIdx] = inputNode; + planNodeId_ = planNodeId; + } - /// Used to convert Substrait Filter into Velox SubfieldFilters which will - /// be used in TableScan. - connector::hive::SubfieldFilters toVeloxFilter( - const std::vector& inputNameList, - const std::vector& inputTypeList, - const ::substrait::Expression& substraitFilter); + /// Used to check if ReadRel specifies an input of stream. + /// If yes, the index of input stream will be returned. + /// If not, -1 will be returned. + int32_t streamIsInput(const ::substrait::ReadRel& sRel); /// Multiple conditions are connected to a binary tree structure with /// the relation key words, including AND, OR, and etc. Currently, only /// AND is supported. This function is used to extract all the Substrait /// conditions in the binary tree structure into a vector. void flattenConditions( - const ::substrait::Expression& substraitFilter, + const ::substrait::Expression& sFilter, std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions); + /// Used to find the function specification in the constructed function map. + std::string findFuncSpec(uint64_t id); + + private: + /// The Partition index. + u_int32_t partitionIndex_; + + /// The file paths to be scanned. + std::vector paths_; + + /// The file starts in the scan. + std::vector starts_; + + /// The lengths to be scanned. + std::vector lengths_; + + /// The unique identification for each PlanNode. + int planNodeId_ = 0; + + /// The map storing the relations between the function id and the function + /// name. Will be constructed based on the Substrait representation. + std::unordered_map functionMap_; + + /// The map storing the pre-built plan nodes which can be accessed through + /// index. This map is only used when the computation of a Substrait plan + /// depends on other input nodes. + std::unordered_map> + inputNodesMap_; + /// The Substrait parser used to convert Substrait representations into /// recognizable representations. - std::shared_ptr substraitParser_{ + std::shared_ptr subParser_{ std::make_shared()}; /// The Expression converter used to convert Substrait representations into /// Velox expressions. std::shared_ptr exprConverter_; - /// The unique identification for each PlanNode. - int planNodeId_ = 0; + /// A function returning current function id and adding the plan node id by + /// one once called. + std::string nextPlanNodeId(); - /// The map storing the relations between the function id and the function - /// name. Will be constructed based on the Substrait representation. - std::unordered_map functionMap_; + /// Used to convert Substrait Filter into Velox SubfieldFilters which will + /// be used in TableScan. + connector::hive::SubfieldFilters toVeloxFilter( + const std::vector& inputNameList, + const std::vector& inputTypeList, + const ::substrait::Expression& substraitFilter); /// Mapping from leaf plan node ID to splits. std::unordered_map> @@ -142,6 +204,12 @@ class SubstraitVeloxPlanConverter { /// Memory pool. memory::MemoryPool* pool_; + /// Used to convert AggregateRel into Velox plan node. + /// The output of child node will be used as the input of Aggregation. + std::shared_ptr toVeloxAgg( + const ::substrait::AggregateRel& sAgg, + const std::shared_ptr& childNode, + const core::AggregationNode::Step& aggStep); }; } // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxPlanValidator.cpp b/velox/substrait/SubstraitToVeloxPlanValidator.cpp new file mode 100644 index 000000000000..977b43a88402 --- /dev/null +++ b/velox/substrait/SubstraitToVeloxPlanValidator.cpp @@ -0,0 +1,340 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/substrait/SubstraitToVeloxPlanValidator.h" +#include "TypeUtils.h" + +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" + +namespace facebook::velox::substrait { + +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Type& sType) { + switch (sType.kind_case()) { + case ::substrait::Type::KindCase::kBool: + case ::substrait::Type::KindCase::kI32: + case ::substrait::Type::KindCase::kI64: + case ::substrait::Type::KindCase::kFp64: + case ::substrait::Type::KindCase::kString: + return true; + default: + return false; + } +} + +bool SubstraitToVeloxPlanValidator::validateInputTypes( + const ::substrait::extensions::AdvancedExtension& extension, + std::vector& types) { + // The input type is wrapped in enhancement. + if (!extension.has_enhancement()) { + return false; + } + const auto& enhancement = extension.enhancement(); + ::substrait::Type inputType; + if (!enhancement.UnpackTo(&inputType)) { + return false; + } + if (!inputType.has_struct_()) { + return false; + } + + // Get the input types. + const auto& sTypes = inputType.struct_().types(); + for (const auto& sType : sTypes) { + try { + types.emplace_back(toVeloxType(subParser_->parseType(sType)->type)); + } catch (const VeloxException& err) { + std::cout << "Type is not supported in ProjectRel due to:" + << err.message() << std::endl; + return false; + } + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::ProjectRel& sProject) { + if (sProject.has_input() && !validate(sProject.input())) { + return false; + } + + // Get and validate the input types from extension. + if (!sProject.has_advanced_extension()) { + std::cout << "Input types are expected in ProjectRel." << std::endl; + return false; + } + const auto& extension = sProject.advanced_extension(); + std::vector types; + if (!validateInputTypes(extension, types)) { + std::cout << "Validation failed for input types in ProjectRel" << std::endl; + return false; + } + + int32_t inputPlanNodeId = 0; + // Create the fake input names to be used in row type. + std::vector names; + names.reserve(types.size()); + for (uint32_t colIdx = 0; colIdx < types.size(); colIdx++) { + names.emplace_back(subParser_->makeNodeName(inputPlanNodeId, colIdx)); + } + auto rowType = std::make_shared(std::move(names), std::move(types)); + + // Validate the project expressions. + const auto& projectExprs = sProject.expressions(); + std::vector> expressions; + expressions.reserve(projectExprs.size()); + try { + for (const auto& expr : projectExprs) { + expressions.emplace_back(exprConverter_->toVeloxExpr(expr, rowType)); + } + // Try to compile the expressions. If there is any unregistred funciton or + // mismatched type, exception will be thrown. + exec::ExprSet exprSet(std::move(expressions), &execCtx_); + } catch (const VeloxException& err) { + std::cout << "Validation failed for expression in ProjectRel due to:" + << err.message() << std::endl; + return false; + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::FilterRel& sFilter) { + return false; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::AggregateRel& sAgg) { + if (sAgg.has_input() && !validate(sAgg.input())) { + return false; + } + + // Validate input types. + if (sAgg.has_advanced_extension()) { + const auto& extension = sAgg.advanced_extension(); + std::vector types; + if (!validateInputTypes(extension, types)) { + std::cout << "Validation failed for input types in AggregateRel" + << std::endl; + return false; + } + } + + // Validate groupings. + for (const auto& grouping : sAgg.groupings()) { + for (const auto& groupingExpr : grouping.grouping_expressions()) { + const auto& typeCase = groupingExpr.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: + break; + default: + std::cout << "Only field is supported in groupings." << std::endl; + return false; + } + } + } + + // Validate aggregate functions. + std::vector funcSpecs; + funcSpecs.reserve(sAgg.measures().size()); + for (const auto& smea : sAgg.measures()) { + try { + const auto& aggFunction = smea.measure(); + funcSpecs.emplace_back( + planConverter_->findFuncSpec(aggFunction.function_reference())); + toVeloxType(subParser_->parseType(aggFunction.output_type())->type); + for (const auto& arg : aggFunction.args()) { + auto typeCase = arg.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: + break; + default: + std::cout << "Only field is supported in aggregate functions." + << std::endl; + return false; + } + } + } catch (const VeloxException& err) { + std::cout << "Validation failed for aggregate function due to: " + << err.message() << std::endl; + return false; + } + } + + std::unordered_set supportedFuncs = {"sum", "count", "avg"}; + for (const auto& funcSpec : funcSpecs) { + auto funcName = subParser_->getSubFunctionName(funcSpec); + if (supportedFuncs.find(funcName) == supportedFuncs.end()) { + std::cout << "Validation failed due to " << funcName + << " was not supported in AggregateRel." << std::endl; + return false; + } + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::ReadRel& sRead) { + if (!sRead.has_base_schema()) { + std::cout << "Validation failed due to schema was not found in ReadRel." + << std::endl; + return false; + } + const auto& sTypes = sRead.base_schema().struct_().types(); + for (const auto& sType : sTypes) { + if (!validate(sType)) { + std::cout << "Validation failed due to type was not supported in ReadRel." + << std::endl; + return false; + } + } + std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; + if (sRead.has_filter()) { + try { + planConverter_->flattenConditions(sRead.filter(), scalarFunctions); + } catch (const VeloxException& err) { + std::cout + << "Validation failed due to flattening conditions failed in ReadRel due to:" + << err.message() << std::endl; + return false; + } + } + // Get and validate the filter functions. + std::vector funcSpecs; + funcSpecs.reserve(scalarFunctions.size()); + for (const auto& scalarFunction : scalarFunctions) { + try { + funcSpecs.emplace_back( + planConverter_->findFuncSpec(scalarFunction.function_reference())); + } catch (const VeloxException& err) { + std::cout << "Validation failed in ReadRel due to:" << err.message() + << std::endl; + return false; + } + + if (scalarFunction.args().size() == 1) { + // Field is expected. + for (const auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: + break; + default: + std::cout << "Field is Expected." << std::endl; + return false; + } + } + } else if (scalarFunction.args().size() == 2) { + // Expect there being two args. One is field and the other is literal. + bool fieldExists = false; + bool litExists = false; + for (const auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: { + fieldExists = true; + break; + } + case ::substrait::Expression::RexTypeCase::kLiteral: { + litExists = true; + break; + } + default: + std::cout << "Type case: " << typeCase + << " is not supported in ReadRel." << std::endl; + return false; + } + } + if (!fieldExists || !litExists) { + std::cout << "Only the case of Field and Literal is supported." + << std::endl; + return false; + } + } else { + std::cout << "More than two args is not supported in ReadRel." + << std::endl; + return false; + } + } + std::unordered_set supportedFilters = { + "is_not_null", "gte", "gt", "lte", "lt"}; + std::unordered_set supportedTypes = {"opt", "req", "fp64"}; + for (const auto& funcSpec : funcSpecs) { + // Validate the functions. + auto funcName = subParser_->getSubFunctionName(funcSpec); + if (supportedFilters.find(funcName) == supportedFilters.end()) { + std::cout << "Validation failed due to " << funcName + << " was not supported in ReadRel." << std::endl; + return false; + } + + // Validate the types. + std::vector funcTypes; + subParser_->getSubFunctionTypes(funcSpec, funcTypes); + for (const auto& funcType : funcTypes) { + if (supportedTypes.find(funcType) == supportedTypes.end()) { + std::cout << "Validation failed due to " << funcType + << " was not supported in ReadRel." << std::endl; + return false; + } + } + } + return true; +} + +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Rel& sRel) { + if (sRel.has_aggregate()) { + return validate(sRel.aggregate()); + } + if (sRel.has_project()) { + return validate(sRel.project()); + } + if (sRel.has_filter()) { + return validate(sRel.filter()); + } + if (sRel.has_read()) { + return validate(sRel.read()); + } + return false; +} + +bool SubstraitToVeloxPlanValidator::validate( + const ::substrait::RelRoot& sRoot) { + if (sRoot.has_input()) { + const auto& sRel = sRoot.input(); + return validate(sRel); + } + return false; +} + +bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Plan& sPlan) { + functions::prestosql::registerAllScalarFunctions(); + // Create plan converter and expression converter to help the validation. + planConverter_->constructFuncMap(sPlan); + exprConverter_ = std::make_shared( + planConverter_->getFunctionMap()); + + for (const auto& sRel : sPlan.relations()) { + if (sRel.has_root()) { + return validate(sRel.root()); + } + if (sRel.has_rel()) { + return validate(sRel.rel()); + } + } + return false; +} + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxPlanValidator.h b/velox/substrait/SubstraitToVeloxPlanValidator.h new file mode 100644 index 000000000000..705b8ebec3af --- /dev/null +++ b/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/substrait/SubstraitToVeloxPlan.h" + +namespace facebook::velox::substrait { + +/// This class is used to validate whether the computing of +/// a Substrait plan is supported in Velox. +class SubstraitToVeloxPlanValidator { + public: + /// Used to validate whether the computing of this type is supported. + bool validate(const ::substrait::Type& sType); + + /// Used to validate whether the computing of this Aggregation is supported. + bool validate(const ::substrait::AggregateRel& sAgg); + + /// Used to validate whether the computing of this Project is supported. + bool validate(const ::substrait::ProjectRel& sProject); + + /// Used to validate whether the computing of this Filter is supported. + bool validate(const ::substrait::FilterRel& sFilter); + + /// Used to validate whether the computing of this Read is supported. + bool validate(const ::substrait::ReadRel& sRead); + + /// Used to validate whether the computing of this Rel is supported. + bool validate(const ::substrait::Rel& sRel); + + /// Used to validate whether the computing of this RelRoot is supported. + bool validate(const ::substrait::RelRoot& sRoot); + + /// Used to validate whether the computing of this Plan is supported. + bool validate(const ::substrait::Plan& sPlan); + + private: + /// A query context used for function validation. + std::shared_ptr queryCtx_{core::QueryCtx::createForTest()}; + + /// A memory pool used for function validation. + std::unique_ptr pool_{ + memory::getDefaultScopedMemoryPool()}; + + /// An execution context used for function validation. + core::ExecCtx execCtx_{pool_.get(), queryCtx_.get()}; + + /// A converter used to convert Substrait plan into Velox's plan node. + std::shared_ptr planConverter_ = + std::make_shared(); + + /// A parser used to convert Substrait plan into recognizable representations. + std::shared_ptr subParser_ = + std::make_shared(); + + /// An expression converter used to convert Substrait representations into + /// Velox expressions. + std::shared_ptr exprConverter_; + + /// Used to get types from advanced extension and validate them. + bool validateInputTypes( + const ::substrait::extensions::AdvancedExtension& extension, + std::vector& types); +}; + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/tests/PlanConversionTest.cpp b/velox/substrait/tests/PlanConversionTest.cpp new file mode 100644 index 000000000000..7f7d0fe56d45 --- /dev/null +++ b/velox/substrait/tests/PlanConversionTest.cpp @@ -0,0 +1,569 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/common/base/tests/Fs.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/test/utils/DataFiles.h" +#include "velox/exec/PartitionedOutputBufferManager.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/substrait/SubstraitToVeloxPlan.h" +#include "velox/type/Type.h" +#include "velox/type/tests/FilterBuilder.h" +#include "velox/type/tests/SubfieldFiltersBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::exec; +using namespace facebook::velox::common::test; +using namespace facebook::velox::exec::test; + +class PlanConversionTest : public virtual HiveConnectorTestBase, + public testing::WithParamInterface { + protected: + void SetUp() override { + useAsyncCache_ = GetParam(); + HiveConnectorTestBase::SetUp(); + } + + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + } + + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const std::shared_ptr& rowType) { + return HiveConnectorTestBase::makeVectors(rowType, count, rowsPerVector); + } + + class VeloxConverter { + public: + // This class is an iterator for Velox computing. + class WholeComputeResultIterator { + public: + WholeComputeResultIterator( + const std::shared_ptr& planNode, + u_int32_t index, + const std::vector& paths, + const std::vector& starts, + const std::vector& lengths) + : planNode_(planNode), + index_(index), + paths_(paths), + starts_(starts), + lengths_(lengths) { + // Construct the splits. + std::vector> + connectorSplits; + connectorSplits.reserve(paths.size()); + for (int idx = 0; idx < paths.size(); idx++) { + auto path = paths[idx]; + auto start = starts[idx]; + auto length = lengths[idx]; + auto split = std::make_shared< + facebook::velox::connector::hive::HiveConnectorSplit>( + facebook::velox::exec::test::kHiveConnectorId, + path, + facebook::velox::dwio::common::FileFormat::ORC, + start, + length); + connectorSplits.emplace_back(split); + } + splits_.reserve(connectorSplits.size()); + for (const auto& connectorSplit : connectorSplits) { + splits_.emplace_back(exec::Split(folly::copy(connectorSplit), -1)); + } + + params_.planNode = planNode; + cursor_ = std::make_unique(params_); + addSplits_ = [&](Task* task) { + if (noMoreSplits_) { + return; + } + for (auto& split : splits_) { + task->addSplit("0", std::move(split)); + } + task->noMoreSplits("0"); + noMoreSplits_ = true; + }; + } + + bool HasNext() { + if (!mayHasNext_) { + return false; + } + if (numRows_ > 0) { + return true; + } else { + addSplits_(cursor_->task().get()); + if (cursor_->moveNext()) { + result_ = cursor_->current(); + numRows_ += result_->size(); + return true; + } else { + mayHasNext_ = false; + return false; + } + } + } + + RowVectorPtr Next() { + numRows_ = 0; + return result_; + } + + private: + const std::shared_ptr planNode_; + std::unique_ptr cursor_; + exec::test::CursorParameters params_; + std::vector splits_; + bool noMoreSplits_ = false; + std::function addSplits_; + u_int32_t index_; + std::vector paths_; + std::vector starts_; + std::vector lengths_; + uint64_t numRows_ = 0; + bool mayHasNext_ = true; + RowVectorPtr result_; + }; + + // This method will resume the Substrait plan from Json file, + // and convert it into Velox PlanNode. A result iterator for + // Velox computing will be returned. + std::shared_ptr getResIter( + const std::string& subPlanPath) { + // Read json file and resume the Substrait plan. + std::ifstream subJson(subPlanPath); + std::stringstream buffer; + buffer << subJson.rdbuf(); + std::string subData = buffer.str(); + ::substrait::Plan subPlan; + google::protobuf::util::JsonStringToMessage(subData, &subPlan); + + auto planConverter = std::make_shared< + facebook::velox::substrait::SubstraitVeloxPlanConverter>(); + // Convert to Velox PlanNode. + auto planNode = planConverter->toVeloxPlan(subPlan); + + // Get the information for TableScan. + u_int32_t partitionIndex = planConverter->getPartitionIndex(); + std::vector paths = planConverter->getPaths(); + + // In test, need to get the absolute path of the generated ORC file. + auto tempPath = getTmpDirPath(); + std::vector absolutePaths; + absolutePaths.reserve(paths.size()); + + for (const auto& path : paths) { + absolutePaths.emplace_back(fmt::format("file://{}{}", tempPath, path)); + } + + std::vector starts = planConverter->getStarts(); + std::vector lengths = planConverter->getLengths(); + // Construct the result iterator. + auto resIter = std::make_shared( + planNode, partitionIndex, absolutePaths, starts, lengths); + return resIter; + } + + std::string getTmpDirPath() const { + return tmpDir_->path; + } + + std::shared_ptr tmpDir_{ + exec::test::TempDirectoryPath::create()}; + }; + + // This method can be used to create a Fixed-width type of Vector without Null + // values. + template + VectorPtr createSpecificScalar( + size_t size, + std::vector vals, + facebook::velox::memory::MemoryPool& pool) { + facebook::velox::BufferPtr values = AlignedBuffer::allocate(size, &pool); + auto valuesPtr = values->asMutableRange(); + facebook::velox::BufferPtr nulls = nullptr; + for (size_t i = 0; i < size; ++i) { + valuesPtr[i] = vals[i]; + } + return std::make_shared>( + &pool, nulls, size, values, std::vector{}); + } + + // This method can be used to create a String type of Vector without Null + // values. + VectorPtr createSpecificStringVector( + size_t size, + std::vector vals, + facebook::velox::memory::MemoryPool& pool) { + auto vector = BaseVector::create(VARCHAR(), size, &pool); + auto flatVector = vector->asFlatVector(); + + size_t childSize = 0; + std::vector lengths(size); + size_t nullCount = 0; + for (size_t i = 0; i < size; ++i) { + auto notNull = true; + vector->setNull(i, !notNull); + auto len = vals[i].size(); + lengths[i] = len; + childSize += len; + } + vector->setNullCount(0); + + BufferPtr buf = AlignedBuffer::allocate(childSize, &pool); + char* bufPtr = buf->asMutable(); + char* dest = bufPtr; + for (size_t i = 0; i < size; ++i) { + std::string str = vals[i]; + const char* chr = str.c_str(); + auto length = str.size(); + memcpy(dest, chr, length); + dest = dest + length; + } + size_t offset = 0; + for (size_t i = 0; i < size; ++i) { + if (!vector->isNullAt(i)) { + flatVector->set( + i, facebook::velox::StringView(bufPtr + offset, lengths[i])); + offset += lengths[i]; + } + } + return vector; + } + + void genLineitemORC(const std::shared_ptr& veloxConverter) { + auto type = + ROW({"l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment"}, + {BIGINT(), + BIGINT(), + BIGINT(), + INTEGER(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + VARCHAR(), + VARCHAR(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + std::unique_ptr pool{ + facebook::velox::memory::getDefaultScopedMemoryPool()}; + std::vector vectors; + // TPC-H lineitem table has 16 columns. + int colNum = 16; + vectors.reserve(colNum); + std::vector lOrderkeyData = { + 4636438147, + 2012485446, + 1635327427, + 8374290148, + 2972204230, + 8001568994, + 989963396, + 2142695974, + 6354246853, + 4141748419}; + vectors.emplace_back( + createSpecificScalar(10, lOrderkeyData, *pool)); + std::vector lPartkeyData = { + 263222018, + 255918298, + 143549509, + 96877642, + 201976875, + 196938305, + 100260625, + 273511608, + 112999357, + 299103530}; + vectors.emplace_back( + createSpecificScalar(10, lPartkeyData, *pool)); + std::vector lSuppkeyData = { + 2102019, + 13998315, + 12989528, + 4717643, + 9976902, + 12618306, + 11940632, + 871626, + 1639379, + 3423588}; + vectors.emplace_back( + createSpecificScalar(10, lSuppkeyData, *pool)); + std::vector lLinenumberData = {4, 6, 1, 5, 1, 2, 1, 5, 2, 6}; + vectors.emplace_back( + createSpecificScalar(10, lLinenumberData, *pool)); + std::vector lQuantityData = { + 6.0, 1.0, 19.0, 4.0, 6.0, 12.0, 23.0, 11.0, 16.0, 19.0}; + vectors.emplace_back( + createSpecificScalar(10, lQuantityData, *pool)); + std::vector lExtendedpriceData = { + 30586.05, + 7821.0, + 1551.33, + 30681.2, + 1941.78, + 66673.0, + 6322.44, + 41754.18, + 8704.26, + 63780.36}; + vectors.emplace_back( + createSpecificScalar(10, lExtendedpriceData, *pool)); + std::vector lDiscountData = { + 0.05, 0.06, 0.01, 0.07, 0.05, 0.06, 0.07, 0.05, 0.06, 0.07}; + vectors.emplace_back( + createSpecificScalar(10, lDiscountData, *pool)); + std::vector lTaxData = { + 0.02, 0.03, 0.01, 0.0, 0.01, 0.01, 0.03, 0.07, 0.01, 0.04}; + vectors.emplace_back(createSpecificScalar(10, lTaxData, *pool)); + std::vector lReturnflagData = { + "N", "A", "A", "R", "A", "N", "A", "A", "N", "R"}; + vectors.emplace_back( + createSpecificStringVector(10, lReturnflagData, *pool)); + std::vector lLinestatusData = { + "O", "F", "F", "F", "F", "O", "F", "F", "O", "F"}; + vectors.emplace_back( + createSpecificStringVector(10, lLinestatusData, *pool)); + std::vector lShipdateNewData = { + 8953.666666666666, + 8773.666666666666, + 9034.666666666666, + 8558.666666666666, + 9072.666666666666, + 8864.666666666666, + 9004.666666666666, + 8778.666666666666, + 9013.666666666666, + 8832.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lShipdateNewData, *pool)); + std::vector lCommitdateNewData = { + 10447.666666666666, + 8953.666666666666, + 8325.666666666666, + 8527.666666666666, + 8438.666666666666, + 10049.666666666666, + 9036.666666666666, + 8666.666666666666, + 9519.666666666666, + 9138.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lCommitdateNewData, *pool)); + std::vector lReceiptdateNewData = { + 10456.666666666666, + 8979.666666666666, + 8299.666666666666, + 8474.666666666666, + 8525.666666666666, + 9996.666666666666, + 9103.666666666666, + 8726.666666666666, + 9593.666666666666, + 9178.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lReceiptdateNewData, *pool)); + std::vector lShipinstructData = { + "COLLECT COD", + "NONE", + "TAKE BACK RETURN", + "NONE", + "TAKE BACK RETURN", + "NONE", + "DELIVER IN PERSON", + "DELIVER IN PERSON", + "TAKE BACK RETURN", + "NONE"}; + vectors.emplace_back( + createSpecificStringVector(10, lShipinstructData, *pool)); + std::vector lShipmodeData = { + "FOB", + "REG AIR", + "MAIL", + "FOB", + "RAIL", + "SHIP", + "REG AIR", + "REG AIR", + "TRUCK", + "AIR"}; + vectors.emplace_back(createSpecificStringVector(10, lShipmodeData, *pool)); + std::vector lCommentData = { + " the furiously final foxes. quickly final p", + "thely ironic", + "ate furiously. even, pending pinto bean", + "ackages af", + "odolites. slyl", + "ng the regular requests sleep above", + "lets above the slyly ironic theodolites sl", + "lyly regular excuses affi", + "lly unusual theodolites grow slyly above", + " the quickly ironic pains lose car"}; + vectors.emplace_back(createSpecificStringVector(10, lCommentData, *pool)); + + // Batches has only one RowVector here. + uint64_t nullCount = 0; + std::vector batches{std::make_shared( + pool.get(), type, nullptr, 10, vectors, nullCount)}; + + // Writes data into an ORC file. + auto sink = std::make_unique( + veloxConverter->getTmpDirPath() + "/mock_lineitem.orc"); + auto config = std::make_shared(); + const int64_t writerMemoryCap = std::numeric_limits::max(); + facebook::velox::dwrf::WriterOptions options; + options.config = config; + options.schema = type; + options.memoryBudget = writerMemoryCap; + options.flushPolicyFactory = nullptr; + options.layoutPlannerFactory = nullptr; + auto writer = std::make_unique( + options, + std::move(sink), + facebook::velox::memory::getProcessDefaultMemoryManager().getRoot()); + for (size_t i = 0; i < batches.size(); ++i) { + writer->write(batches[i]); + } + writer->close(); + } + + // Used to find the Velox path according current path. + std::string getVeloxPath() { + std::string veloxPath; + std::string currentPath = fs::current_path().c_str(); + size_t pos = 0; + + if ((pos = currentPath.find("project")) != std::string::npos) { + // In Github test, the Velox home is /root/project. + veloxPath = currentPath.substr(0, pos) + "project"; + } else if ((pos = currentPath.find("velox")) != std::string::npos) { + veloxPath = currentPath.substr(0, pos) + "velox"; + } else if ((pos = currentPath.find("fbcode")) != std::string::npos) { + veloxPath = currentPath; + } else { + throw std::runtime_error("Current path is not a valid Velox path."); + } + return veloxPath; + } +}; + +// This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's +// computing will be tested based on the generated ORC file. +// Input: Json file of the Substrait plan for the first stage of below modified +// TPC-H Q6 query: +// +// select sum(l_extendedprice*l_discount) as revenue from lineitem where +// l_shipdate >= 8766 and l_shipdate < 9131 and l_discount between .06 +// - 0.01 and .06 + 0.01 and l_quantity < 24 +// +// Tested Velox computings include: TableScan (Filter Pushdown) + Project + +// Aggregate +// Output: the Velox computed Aggregation result + +TEST_P(PlanConversionTest, q6FirstStageTest) { + auto veloxConverter = std::make_shared(); + std::string veloxPath = getVeloxPath(); + genLineitemORC(veloxConverter); + // Find and deserialize Substrait plan json file. + std::string subPlanPath = veloxPath + "/velox/substrait/tests/q6_first.json"; + auto resIter = veloxConverter->getResIter(subPlanPath); + while (resIter->HasNext()) { + auto rv = resIter->Next(); + auto size = rv->size(); + ASSERT_EQ(size, 1); + std::string res = rv->toString(0); + ASSERT_EQ(res, "{ [child at 0]: 13613.1921}"); + } +} + +// This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's +// computing will be tested based on the generated ORC file. +// Input: Json file of the Substrait plan for the first stage of the below +// modified TPC-H Q1 query: +// +// select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, +// sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - +// l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + +// l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as +// avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem +// where l_shipdate <= 10471 group by l_returnflag, l_linestatus order by +// l_returnflag, l_linestatus +// +// Tested Velox computings include: TableScan (Filter Pushdown) + Project + +// Aggregate +// Output: the Velox computed Aggregation result + +TEST_P(PlanConversionTest, q1FirstStageTest) { + auto veloxConverter = std::make_shared(); + std::string veloxPath = getVeloxPath(); + genLineitemORC(veloxConverter); + // Find and deserialize Substrait plan json file. + std::string subPlanPath = veloxPath + "/velox/substrait/tests/q1_first.json"; + auto resIter = veloxConverter->getResIter(subPlanPath); + while (resIter->HasNext()) { + auto rv = resIter->Next(); + auto size = rv->size(); + ASSERT_EQ(size, 3); + ASSERT_EQ( + rv->toString(0), + "{ [child at 0]: N, O, 34, 105963.31, 99911.3719, 101201.05309399999, 34, 3, 105963.31, 3, 0.16999999999999998, 3, 3}"); + ASSERT_EQ( + rv->toString(1), + "{ [child at 1]: A, F, 60, 59390.729999999996, 56278.5879, 59485.994223, 60, 5, 59390.729999999996, 5, 0.24, 5, 5}"); + ASSERT_EQ( + rv->toString(2), + "{ [child at 2]: R, F, 23, 94461.56, 87849.2508, 90221.880192, 23, 2, 94461.56, 2, 0.14, 2, 2}"); + } +} + +VELOX_INSTANTIATE_TEST_SUITE_P( + PlanConversionTests, + PlanConversionTest, + testing::Values(true, false)); diff --git a/velox/substrait/tests/q1_first.json b/velox/substrait/tests/q1_first.json new file mode 100644 index 000000000000..e11d8a7f05a1 --- /dev/null +++ b/velox/substrait/tests/q1_first.json @@ -0,0 +1,720 @@ +{ + "extension_uris": [], + "extensions": [ + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 1, + "name": "lte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 6, + "name": "sum:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 3, + "name": "subtract:opt_fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 0, + "name": "is_not_null:fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 2, + "name": "and:bool_bool" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 5, + "name": "add:opt_fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 7, + "name": "avg:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 4, + "name": "multiply:opt_fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 8, + "name": "count:opt_i32" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "base_schema": { + "names": [ + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate" + ], + "struct": { + "types": [ + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "type_variation_reference": 0, + "nullability": "NULLABILITY_UNSPECIFIED" + } + }, + "filter": { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 6 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 6 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 10471 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + "local_files": { + "items": [ + { + "format": "FILE_FORMAT_UNSPECIFIED", + "partition_index": "0", + "start": "0", + "length": "3719", + "uri_file": "/mock_lineitem.orc" + } + ] + } + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 4 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 5 + } + } + } + } + ] + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 4 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 5 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "literal": { + "nullable": false, + "fp64": 1 + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "literal": { + "nullable": false, + "fp64": 1 + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 5, + "args": [ + { + "literal": { + "nullable": false, + "fp64": 1 + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "i32": 1 + } + } + ] + } + }, + "groupings": [ + { + "grouping_expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + } + ] + } + ], + "measures": [ + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 4 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 5 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 7, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 7, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 7, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 6 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "measure": { + "function_reference": 8, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 7 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "i64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + } + } + } + ] + } + }, + "names": [ + "real_arrow_output", + "l_returnflag", + "l_linestatus", + "sum", + "sum", + "sum", + "sum", + "sum", + "count", + "sum", + "count", + "sum", + "count", + "count" + ] + } + } + ], + "expected_type_urls": [] +} diff --git a/velox/substrait/tests/q6_first.json b/velox/substrait/tests/q6_first.json new file mode 100644 index 000000000000..a48bf5da285e --- /dev/null +++ b/velox/substrait/tests/q6_first.json @@ -0,0 +1,523 @@ +{ + "extension_uris": [], + "extensions": [ + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 4, + "name": "lte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 6, + "name": "sum:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 3, + "name": "lt:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 0, + "name": "is_not_null:fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 1, + "name": "and:bool_bool" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 2, + "name": "gte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 5, + "name": "multiply:opt_fp64_fp64" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "base_schema": { + "names": [ + "l_quantity", + "l_extendedprice", + "l_discount", + "l_shipdate" + ], + "struct": { + "types": [ + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "type_variation_reference": 0, + "nullability": "NULLABILITY_UNSPECIFIED" + } + }, + "filter": { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 8766 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 9131 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.05 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.07 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 24 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + "local_files": { + "items": [ + { + "format": "FILE_FORMAT_UNSPECIFIED", + "partition_index": "0", + "start": "0", + "length": "3719", + "uri_file": "/mock_lineitem.orc" + } + ] + } + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ] + } + }, + "expressions": [ + { + "scalar_function": { + "function_reference": 5, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ] + } + }, + "groupings": [ + { + "grouping_expressions": [] + } + ], + "measures": [ + { + "measure": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ] + } + }, + "names": [ + "real_arrow_output", + "sum" + ] + } + } + ], + "expected_type_urls": [] +}