diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 724e69d859a0..6fbc56da74a1 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -2443,6 +2443,11 @@ "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "WideFromBlocks"} }, + { + "Name": "TCoWideToBlocks", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "WideToBlocks"} + }, { "Name": "TCoPgSelect", "Base": "TCallable", diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 9166dd97b87d..5903a8849715 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -5488,6 +5488,19 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo return true; } +bool CanRewriteToBlocksWithInput(const TExprNode& input, const TTypeAnnotationContext& types) { + EBlockEngineMode effectiveMode = types.UseBlocks ? EBlockEngineMode::Force : types.BlockEngineMode; + switch (effectiveMode) { + case NYql::EBlockEngineMode::Disable: + return false; + case NYql::EBlockEngineMode::Auto: + return input.IsCallable("WideFromBlocks"); + case NYql::EBlockEngineMode::Force: + return true; + } + Y_UNREACHABLE(); +} + TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { const auto lambda = node->TailPtr(); if (node->Head().IsCallable("WideFromBlocks")) { @@ -5504,6 +5517,10 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& } } + if (!CanRewriteToBlocksWithInput(node->Head(), types)) { + return node; + } + auto multiInputType = node->Head().GetTypeAnn()->Cast()->GetItemType()->Cast(); ui32 newNodes; TNodeMap rewritePositions; @@ -5541,6 +5558,10 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& } TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { + if (!CanRewriteToBlocksWithInput(node->Head(), types)) { + return node; + } + auto multiInputType = node->Head().GetTypeAnn()->Cast()->GetItemType()->Cast(); auto lambda = node->ChildPtr(1); YQL_ENSURE(lambda->ChildrenSize() == 2); // filter lambda should have single output @@ -5661,6 +5682,10 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte return node; } + if (!CanRewriteToBlocksWithInput(node->Head(), types)) { + return node; + } + TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks"; YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; return ctx.Builder(node->Pos()) @@ -5701,6 +5726,10 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex return node; } + if (!CanRewriteToBlocksWithInput(node->Head(), types)) { + return node; + } + TString newName = node->Content() + TString("Blocks"); YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; auto children = node->ChildrenList(); @@ -7445,6 +7474,14 @@ TExprNode::TPtr DropToFlowDeps(const TExprNode::TPtr& node, TExprContext& ctx) { return ctx.ChangeChildren(*node, std::move(children)); } +TExprNode::TPtr OptimizeToFlow(const TExprNode::TPtr& node, TExprContext&) { + if (node->ChildrenSize() == 1 && node->Head().IsCallable("FromFlow")) { + YQL_CLOG(DEBUG, CorePeepHole) << "Drop ToFlow over FromFlow"; + return node->Head().HeadPtr(); + } + return node; +} + TExprNode::TPtr BuildCheckedBinaryOpOverDecimal(TPositionHandle pos, TStringBuf op, const TExprNode::TPtr& lhs, const TExprNode::TPtr& rhs, const TTypeAnnotationNode& resultType, TExprContext& ctx) { auto typeNode = ExpandType(pos, resultType, ctx); return ctx.Builder(pos) @@ -7701,6 +7738,7 @@ struct TPeepHoleRules { {"AggrGreater", &ExpandAggrCompare}, {"AggrLessOrEqual", &ExpandAggrCompare}, {"AggrGreaterOrEqual", &ExpandAggrCompare}, + {"ToFlow", &OptimizeToFlow}, }; const TExtPeepHoleOptimizerMap FinalStageExtRules = {}; diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index 51f6fccade9c..a8f231c575f2 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -444,7 +444,7 @@ bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TE return resolveStatus == IArrowResolver::OK; } -TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBlocks, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { +TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx) { TVector newArgs; newArgs.reserve(stage.Inputs().Size()); TNodeOnNodeOwnedMap argsMap; @@ -464,18 +464,6 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBl needRebuild = true; auto itemType = arg.Ref().GetTypeAnn()->Cast()->GetItemType()->Cast(); TExprNode::TPtr newArgNode = newArg.Ptr(); - if (useChannelBlocks && IsCompatibleWithBlocks(arg.Pos(), *itemType, ctx, typesCtx)) { - // input will actually be wide block stream - convert it to wide stream first - newArgNode = ctx.Builder(arg.Pos()) - .Callable("FromFlow") - .Callable(0, "WideFromBlocks") - .Callable(0, "ToFlow") - .Add(0, newArg.Ptr()) - .Seal() - .Seal() - .Seal() - .Build(); - } // input will actually be wide stream - need to convert it back to stream auto argReplace = ctx.Builder(arg.Pos()) .Callable("FromFlow") @@ -522,8 +510,7 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBl .Done(); } -TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, bool useChannelBlocks, - TExprContext& ctx, TTypeAnnotationContext& typesCtx) +TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, TExprContext& ctx) { TCoLambda program(ctx.DeepCopyLambda(stage.Program().Ref())); @@ -551,19 +538,6 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr .Seal() .Build(); - if (useChannelBlocks && IsCompatibleWithBlocks(resultStream->Pos(), outputItemType, ctx, typesCtx)) { - // convert wide stream to wide block stream - resultStream = ctx.Builder(resultStream->Pos()) - .Callable("FromFlow") - .Callable(0, "WideToBlocks") - .Callable(0, "ToFlow") - .Add(0, resultStream) - .Seal() - .Seal() - .Seal() - .Build(); - } - return Build(ctx, stage.Pos()) .InitFrom(stage) .Program() @@ -575,20 +549,16 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr .Done(); } -TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, bool useChannelBlocks, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { +TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, TExprContext& ctx) { const TStructExprType* outputItemType = GetStageOutputItemType(stage); - return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, useChannelBlocks, ctx, typesCtx), - *outputItemType, useChannelBlocks, ctx, typesCtx); + return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, ctx), *outputItemType, ctx); } -IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TPtr input, TExprNode::TPtr& output, - TExprContext& ctx, TTypeAnnotationContext& typesCtx) +IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { output = input; TNodeOnNodeOwnedMap replaces; TNodeSet processedStages; - YQL_ENSURE(mode == CHANNEL_WIDE || mode == CHANNEL_WIDE_BLOCK); - const bool useChannelBlocks = mode == CHANNEL_WIDE_BLOCK; VisitExpr(input, [&](const TExprNode::TPtr& node) { if (node->IsLambda()) { return false; @@ -599,7 +569,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TP if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast())) { auto conn = maybeConn.Cast(); processedStages.insert(conn.Output().Stage().Raw()); - auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast(), useChannelBlocks, ctx, typesCtx); + auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast(), ctx); auto outputItemType = GetStageOutputItemType(conn.Output().Stage().Cast()); if (conn.Maybe()) { @@ -646,7 +616,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TP auto stage = expr.Maybe().Cast(); if (!processedStages.contains(stage.Raw())) { processedStages.insert(stage.Raw()); - auto newStage = RebuildStageInputsAsWide(stage, useChannelBlocks, ctx, typesCtx); + auto newStage = RebuildStageInputsAsWide(stage, ctx); if (newStage.Raw() != stage.Raw()) { replaces[stage.Raw()] = newStage.Ptr(); } @@ -668,9 +638,180 @@ IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TP return status; } +bool CanRebuildForWideBlockChannelOutput(bool forceBlocks, const TDqPhyStage& stage, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + auto outputItemType = stage.Program().Ref().GetTypeAnn()->Cast()->GetItemType(); + if (IsWideBlockType(*outputItemType)) { + // output is already wide block + return false; + } + + auto stageSettings = TDqStageSettings::Parse(stage); + if (!stageSettings.WideChannels) { + return false; + } + + YQL_ENSURE(stageSettings.OutputNarrowType); + + if (!IsCompatibleWithBlocks(stage.Pos(), *stageSettings.OutputNarrowType, ctx, typesCtx)) { + return false; + } + + if (!forceBlocks) { + // ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...))) + if (!stage.Program().Body().Maybe() || + !stage.Program().Body().Cast().Input().Maybe()) + { + return false; + } + } + + return true; +} + +bool CanRebuildForWideBlockChannelOutput(bool forceBlocks, const TDqOutput& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + ui32 index = FromString(output.Index().Value()); + if (index != 0) { + // stage has multiple outputs + return false; + } + + return CanRebuildForWideBlockChannelOutput(forceBlocks, output.Stage().Cast(), ctx, typesCtx); +} + +bool CanRebuildForWideBlockChannelOutput(bool forceBlocks, const TDqConnection& conn, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + if (conn.Maybe() || conn.Maybe()) { + return false; + } + + ui32 index = FromString(conn.Output().Index().Value()); + if (index != 0) { + // stage has multiple outputs + return false; + } + + return CanRebuildForWideBlockChannelOutput(forceBlocks, conn.Output().Stage().Cast(), ctx, typesCtx); +} + +TDqPhyStage RebuildStageInputsAsWideBlock(bool forceBlocks, const TDqPhyStage& stage, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + TVector newArgs; + newArgs.reserve(stage.Inputs().Size()); + TNodeOnNodeOwnedMap argsMap; + + YQL_ENSURE(stage.Inputs().Size() == stage.Program().Args().Size()); + + bool needRebuild = false; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + TCoArgument arg = stage.Program().Args().Arg(i); + + auto newArg = TCoArgument(ctx.NewArgument(arg.Pos(), arg.Name())); + newArgs.emplace_back(newArg); + + auto maybeConn = stage.Inputs().Item(i).Maybe(); + + if (maybeConn && CanRebuildForWideBlockChannelOutput(forceBlocks, maybeConn.Cast().Output(), ctx, typesCtx)) { + needRebuild = true; + // input will actually be wide block stream - convert it to wide stream first + TExprNode::TPtr newArgNode = ctx.Builder(arg.Pos()) + .Callable("FromFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "ToFlow") + .Add(0, newArg.Ptr()) + .Seal() + .Seal() + .Seal() + .Build(); + argsMap.emplace(arg.Raw(), newArgNode); + } else { + argsMap.emplace(arg.Raw(), newArg.Ptr()); + } + } + + if (!needRebuild) { + return stage; + } + + return Build(ctx, stage.Pos()) + .InitFrom(stage) + .Program() + .Args(newArgs) + .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap)) + .Build() + .Done(); +} + +TDqPhyStage RebuildStageOutputAsWideBlock(const TDqPhyStage& stage, TExprContext& ctx) +{ + return Build(ctx, stage.Pos()) + .InitFrom(stage) + .Program() + .Args(stage.Program().Args()) + .Body() + .Input() + .Input() + .Input(stage.Program().Body()) + .Build() + .Build() + .Build() + .Build() + .Done(); +} + +TDqPhyStage RebuildStageAsWideBlock(bool forceBlocks, const TDqPhyStage& stage, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + return RebuildStageOutputAsWideBlock(RebuildStageInputsAsWideBlock(forceBlocks, stage, ctx, typesCtx), ctx); +} + +IGraphTransformer::TStatus DqEnableWideBlockChannels(bool forceBlocks, TExprNode::TPtr input, TExprNode::TPtr& output, + TExprContext& ctx, TTypeAnnotationContext& typesCtx) +{ + output = input; + TNodeOnNodeOwnedMap replaces; + TNodeSet processedStages; + VisitExpr(input, [&](const TExprNode::TPtr& node) { + if (node->IsLambda()) { + return false; + } + + TExprBase expr{node}; + auto maybeConn = expr.Maybe(); + if (maybeConn && CanRebuildForWideBlockChannelOutput(forceBlocks, maybeConn.Cast(), ctx, typesCtx)) { + auto conn = maybeConn.Cast(); + processedStages.insert(conn.Output().Stage().Raw()); + auto newStage = RebuildStageAsWideBlock(forceBlocks, conn.Output().Stage().Cast(), ctx, typesCtx); + auto newOutput = Build(ctx, conn.Output().Pos()) + .InitFrom(conn.Output()) + .Stage(newStage) + .Done(); + replaces[conn.Raw()] = ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr()); + } else if (expr.Maybe()) { + auto stage = expr.Maybe().Cast(); + if (!processedStages.contains(stage.Raw())) { + processedStages.insert(stage.Raw()); + auto newStage = RebuildStageInputsAsWideBlock(forceBlocks, stage, ctx, typesCtx); + if (newStage.Raw() != stage.Raw()) { + replaces[stage.Raw()] = newStage.Ptr(); + } + } + } + + return true; + }); + + if (replaces.empty()) { + return IGraphTransformer::TStatus::Ok; + } + + YQL_CLOG(INFO, CoreDq) << "[DQ/Build/EnableWideBlockChannels] " << "Enabled block channels for " << replaces.size() << " stages"; + TOptimizeExprSettings settings{nullptr}; + settings.VisitLambdas = false; + auto status = RemapExpr(input, output, replaces, ctx, settings); + YQL_CLOG(TRACE, CoreDq) << "[DQ/Build/EnableWideBlockChannels] " << "Dump: " << NCommon::ExprToPrettyString(ctx, *output); + return status; +} + } // namespace TAutoPtr CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx, EChannelMode mode) { + Y_UNUSED(typesCtx); TVector transformers; transformers.push_back(TTransformStage(CreateFunctorTransformer( @@ -691,11 +832,24 @@ TAutoPtr CreateDqBuildPhyStagesTransformer(bool allowDependan TIssuesIds::DEFAULT_ERROR)); if (mode != CHANNEL_SCALAR) { + transformers.push_back(TTransformStage(CreateFunctorTransformer(&DqEnableWideChannels), + "EnableWideChannels", + TIssuesIds::DEFAULT_ERROR)); + } + + return CreateCompositeGraphTransformer(transformers, false); +} + +TAutoPtr CreateDqBuildWideBlockChannelsTransformer(TTypeAnnotationContext& typesCtx, EChannelMode mode) { + TVector transformers; + + if (mode == CHANNEL_WIDE_AUTO_BLOCK || mode == CHANNEL_WIDE_FORCE_BLOCK) { transformers.push_back(TTransformStage(CreateFunctorTransformer( [mode, &typesCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - return DqEnableWideChannels(mode, input, output, ctx, typesCtx); + const bool forceBlocks = mode == CHANNEL_WIDE_FORCE_BLOCK; + return DqEnableWideBlockChannels(forceBlocks, input, output, ctx, typesCtx); }), - "EnableWideChannels", + "EnableBlockChannels", TIssuesIds::DEFAULT_ERROR)); } diff --git a/ydb/library/yql/dq/opt/dq_opt_build.h b/ydb/library/yql/dq/opt/dq_opt_build.h index 6cd9911630f6..5cdebb250070 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.h +++ b/ydb/library/yql/dq/opt/dq_opt_build.h @@ -8,10 +8,14 @@ namespace NYql::NDq { enum EChannelMode { CHANNEL_SCALAR, - CHANNEL_WIDE, - CHANNEL_WIDE_BLOCK, + CHANNEL_WIDE_SCALAR, + CHANNEL_WIDE_AUTO_BLOCK, + CHANNEL_WIDE_FORCE_BLOCK, }; TAutoPtr CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx, EChannelMode mode); +// This transformer should be run in final peephole stage. It enables block channels according to "mode" argument +TAutoPtr CreateDqBuildWideBlockChannelsTransformer(TTypeAnnotationContext& typesCtx, EChannelMode mode); + } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index da5608602795..86a307f8e969 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -227,6 +227,30 @@ struct TPublicIds { using TPtr = std::shared_ptr; }; +NDq::EChannelMode GetConfiguredChannelMode(const TDqStatePtr& state, const TTypeAnnotationContext& typesCtx) { + const bool useWideChannels = state->Settings->UseWideChannels.Get().GetOrElse(typesCtx.BlockEngineMode != EBlockEngineMode::Disable); + const TMaybe useChannelBlocks = state->Settings->UseWideBlockChannels.Get(); + NDq::EChannelMode mode; + if (!useWideChannels) { + mode = NDq::EChannelMode::CHANNEL_SCALAR; + } else if (useChannelBlocks.Defined()) { + mode = *useChannelBlocks ? NDq::EChannelMode::CHANNEL_WIDE_FORCE_BLOCK : NDq::EChannelMode::CHANNEL_WIDE_SCALAR; + } else { + switch (typesCtx.BlockEngineMode) { + case NYql::EBlockEngineMode::Auto: + mode = NDq::EChannelMode::CHANNEL_WIDE_AUTO_BLOCK; + break; + case NYql::EBlockEngineMode::Force: + mode = NDq::EChannelMode::CHANNEL_WIDE_FORCE_BLOCK; + break; + case NYql::EBlockEngineMode::Disable: + mode = NDq::EChannelMode::CHANNEL_WIDE_SCALAR; + break; + } + } + return mode; +} + struct TDqsPipelineConfigurator : public IPipelineConfigurator { public: TDqsPipelineConfigurator(const TDqStatePtr& state, const THashMap& providerParams) @@ -259,21 +283,10 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator { if (State_->Settings->UseBlockReader.Get().GetOrElse(false)) { pipeline->Add(NDqs::CreateDqsRewritePhyBlockReadOnDqIntegrationTransformer(*pipeline->GetTypeAnnotationContext()), "ReplaceWideReadsWithBlock"); } - bool useWideChannels = State_->Settings->UseWideChannels.Get().GetOrElse(false); - bool useChannelBlocks = State_->Settings->UseWideBlockChannels.Get().GetOrElse(false); - NDq::EChannelMode mode; - if (!useWideChannels) { - mode = NDq::EChannelMode::CHANNEL_SCALAR; - } else if (!useChannelBlocks) { - mode = NDq::EChannelMode::CHANNEL_WIDE; - } else { - mode = NDq::EChannelMode::CHANNEL_WIDE_BLOCK; - } + TTypeAnnotationContext& typesCtx = *pipeline->GetTypeAnnotationContext(); + NDq::EChannelMode mode = GetConfiguredChannelMode(State_, typesCtx); pipeline->Add( - NDq::CreateDqBuildPhyStagesTransformer( - !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(true), - *pipeline->GetTypeAnnotationContext(), mode - ), + NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(true), typesCtx, mode), "BuildPhy"); pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables"); } @@ -353,7 +366,10 @@ TExprNode::TPtr DqMarkBlockStage(const TDqPhyStage& stage, TExprContext& ctx) { struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator { public: - TDqsFinalPipelineConfigurator() = default; + explicit TDqsFinalPipelineConfigurator(const TDqStatePtr& state) + : State_(state) + { + } private: void AfterCreate(TTransformationPipeline*) const final {} @@ -361,6 +377,10 @@ struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator { void AfterOptimize(TTransformationPipeline* pipeline) const final { auto typeCtx = pipeline->GetTypeAnnotationContext(); + NDq::EChannelMode mode = GetConfiguredChannelMode(State_, *typeCtx); + pipeline->Add(NDq::CreateDqBuildWideBlockChannelsTransformer(*typeCtx, mode), + "DqBuildWideBlockChannels", + TIssuesIds::DEFAULT_ERROR); pipeline->Add(CreateFunctorTransformer( [typeCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { TOptimizeExprSettings optSettings{typeCtx.Get()}; @@ -374,9 +394,10 @@ struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator { }, ctx, optSettings); } ), - "DqAfterPeephole", + "DqMarkBlockStages", TIssuesIds::DEFAULT_ERROR); } + TDqStatePtr State_; }; class TDqExecTransformer: public TExecTransformerBase, TCounters @@ -1885,7 +1906,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx, const THashMap& providerParams) const { TDqsPipelineConfigurator peepholeConfig(State, providerParams); - TDqsFinalPipelineConfigurator finalPeepholeConfg; + TDqsFinalPipelineConfigurator finalPeepholeConfg(State); TPeepholeSettings peepholeSettings; peepholeSettings.CommonConfig = &peepholeConfig; peepholeSettings.FinalConfig = &finalPeepholeConfg;