diff --git a/velox/substrait/SubstraitToVeloxPlan.cpp b/velox/substrait/SubstraitToVeloxPlan.cpp index 57d1ae16de71..c391b2f09ffe 100644 --- a/velox/substrait/SubstraitToVeloxPlan.cpp +++ b/velox/substrait/SubstraitToVeloxPlan.cpp @@ -496,18 +496,20 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( } std::shared_ptr - SubstraitVeloxPlanConverter::toVeloxPlan( - const ::substrait::ReadRel& sRead, - std::shared_ptr& splitInfo) { - // Check if the ReadRel specifies an input of stream. If yes, the - // pre-built input node will be used as the data source. + SubstraitVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& sRead) { + // Check if the ReadRel specifies an input of stream. If yes, the pre-built + // input node will be used as the data source. + auto splitInfo = std::make_shared(); auto streamIdx = streamIsInput(sRead); if (streamIdx >= 0) { if (inputNodesMap_.find(streamIdx) == inputNodesMap_.end()) { VELOX_FAIL( "Could not find source index {} in input nodes map.", streamIdx); } - return inputNodesMap_[streamIdx]; + auto streamNode = inputNodesMap_[streamIdx]; + splitInfo->isStream = true; + splitInfoMap_[streamNode->id()] = splitInfo; + return streamNode; } // Otherwise, will create TableScan node for ReadRel. @@ -527,7 +529,7 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( } } - // Parse local files + // Parse local files and construct split info. if (sRead.has_local_files()) { const auto& fileList = sRead.local_files().items(); splitInfo->paths.reserve(fileList.size()); @@ -609,6 +611,8 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( } else { auto tableScanNode = std::make_shared( nextPlanNodeId(), outputType, tableHandle, assignments); + // Set split info map. + splitInfoMap_[tableScanNode->id()] = splitInfo; return tableScanNode; } } @@ -684,11 +688,7 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( return toVeloxPlan(sRel.join()); } if (sRel.has_read()) { - auto splitInfo = std::make_shared(); - - auto planNode = toVeloxPlan(sRel.read(), splitInfo); - splitInfoMap_[planNode->id()] = splitInfo; - return planNode; + return toVeloxPlan(sRel.read()); } VELOX_NYI("Substrait conversion not supported for Rel."); } diff --git a/velox/substrait/SubstraitToVeloxPlan.h b/velox/substrait/SubstraitToVeloxPlan.h index 9b5eb86f71b5..12da623ace15 100644 --- a/velox/substrait/SubstraitToVeloxPlan.h +++ b/velox/substrait/SubstraitToVeloxPlan.h @@ -82,8 +82,7 @@ class SubstraitVeloxPlanConverter { /// Starts: the start positions in byte to read from the items. /// Lengths: the lengths in byte to read from the items. std::shared_ptr toVeloxPlan( - const ::substrait::ReadRel& sRead, - std::shared_ptr& splitInfo); + const ::substrait::ReadRel& sRead); /// Used to convert Substrait Rel into Velox PlanNode. std::shared_ptr toVeloxPlan( diff --git a/velox/substrait/SubstraitToVeloxPlanValidator.cpp b/velox/substrait/SubstraitToVeloxPlanValidator.cpp index 2b4cfda55b62..000e01177984 100644 --- a/velox/substrait/SubstraitToVeloxPlanValidator.cpp +++ b/velox/substrait/SubstraitToVeloxPlanValidator.cpp @@ -299,9 +299,7 @@ bool SubstraitToVeloxPlanValidator::validate( bool SubstraitToVeloxPlanValidator::validate( const ::substrait::ReadRel& sRead) { try { - auto splitInfo = std::make_shared(); - - planConverter_->toVeloxPlan(sRead, splitInfo); + planConverter_->toVeloxPlan(sRead); } catch (const VeloxException& err) { std::cout << "ReadRel validation failed due to:" << err.message() << std::endl;