diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp index f15373fee867..a63323d090d7 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp @@ -93,7 +93,8 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase { { #define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name) AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate)); - AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoin)); + AddHandler(0, &TDqPhyGraceJoin::Match, HNDL(RewriteMapJoinWithGraceCore)); + AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoinWithMapCore)); AddHandler(0, &TDqPhyCrossJoin::Match, HNDL(RewriteCrossJoin)); AddHandler(0, &TDqPhyJoinDict::Match, HNDL(RewriteDictJoin)); AddHandler(0, &TDqJoin::Match, HNDL(RewritePureJoin)); @@ -110,9 +111,15 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase { return output; } - TMaybeNode RewriteMapJoin(TExprBase node, TExprContext& ctx) { - TExprBase output = DqPeepholeRewriteMapJoin(node, ctx); - DumpAppliedRule("RewriteMapJoin", node.Ptr(), output.Ptr(), ctx); + TMaybeNode RewriteMapJoinWithGraceCore(TExprBase node, TExprContext& ctx) { + TExprBase output = DqPeepholeRewriteMapJoinWithGraceCore(node, ctx); + DumpAppliedRule("RewriteMapJoinWithGraceCore", node.Ptr(), output.Ptr(), ctx); + return output; + } + + TMaybeNode RewriteMapJoinWithMapCore(TExprBase node, TExprContext& ctx) { + TExprBase output = DqPeepholeRewriteMapJoinWithMapCore(node, ctx); + DumpAppliedRule("RewriteMapJoinWithMapCore", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 0207e4955627..447169763105 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -430,7 +430,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { // It is now possible as we don't use datashard transactions for reads in data queries. bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node); TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, - pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false + pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false) ); DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx); return output; diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index 3b712925169f..85aea5cd7c3a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -83,7 +83,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, OptEnableOlapProvideComputeSharding); REGISTER_SETTING(*this, OverrideStatistics); REGISTER_SETTING(*this, OverridePlanner); - + REGISTER_SETTING(*this, UseGraceJoinCoreForMap); REGISTER_SETTING(*this, OptUseFinalizeByKey); REGISTER_SETTING(*this, CostBasedOptimizationLevel); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index b84df7089001..8a335351716b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -53,6 +53,7 @@ struct TKikimrSettings { NCommon::TConfSetting OverrideStatistics; NCommon::TConfSetting EnableSpillingNodes; NCommon::TConfSetting OverridePlanner; + NCommon::TConfSetting UseGraceJoinCoreForMap; /* Disable optimizer rules */ NCommon::TConfSetting OptDisableTopSort; diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index 0d29ed67202f..bac572a4f4c3 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -47,6 +47,11 @@ {"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true} ] }, + { + "Name": "TDqPhyGraceJoin", + "Base": "TDqJoinBase", + "Match": {"Type": "Callable", "Name": "DqPhyGraceJoin"} + }, { "Name": "TDqPhyMapJoin", "Base": "TDqJoinBase", diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 4a374ce35ff0..1dda4093b383 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -323,8 +323,8 @@ std::pair, TVector> GetJoinKeys(const TDqJoin& join, T } -TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput, - TExprContext& ctx) +TDqJoinBase DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput, + TExprContext& ctx, bool useGraceCore) { static const std::set supportedTypes = {"Inner"sv, "Left"sv, "LeftOnly"sv, "LeftSemi"sv}; auto joinType = join.JoinType().Value(); @@ -349,16 +349,29 @@ TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, auto leftFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), leftInput, leftFilterKeys); auto rightFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), rightInput, rightFilterKeys); - return Build(ctx, join.Pos()) - .LeftInput(leftFilteredInput) - .LeftLabel(join.LeftLabel()) - .RightInput(rightFilteredInput) - .RightLabel(join.RightLabel()) - .JoinType(join.JoinType()) - .JoinKeys(join.JoinKeys()) - .LeftJoinKeyNames(join.LeftJoinKeyNames()) - .RightJoinKeyNames(join.RightJoinKeyNames()) - .Done(); + if (useGraceCore) { + return Build(ctx, join.Pos()) + .LeftInput(leftFilteredInput) + .LeftLabel(join.LeftLabel()) + .RightInput(rightFilteredInput) + .RightLabel(join.RightLabel()) + .JoinType(join.JoinType()) + .JoinKeys(join.JoinKeys()) + .LeftJoinKeyNames(join.LeftJoinKeyNames()) + .RightJoinKeyNames(join.RightJoinKeyNames()) + .Done(); + } else { + return Build(ctx, join.Pos()) + .LeftInput(leftFilteredInput) + .LeftLabel(join.LeftLabel()) + .RightInput(rightFilteredInput) + .RightLabel(join.RightLabel()) + .JoinType(join.JoinType()) + .JoinKeys(join.JoinKeys()) + .LeftJoinKeyNames(join.LeftJoinKeyNames()) + .RightJoinKeyNames(join.RightJoinKeyNames()) + .Done(); + } } } // namespace @@ -609,7 +622,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T .Done(); } -TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) { +TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap) { static const std::set supportedTypes = { "Inner"sv, "Left"sv, @@ -760,7 +773,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& TMaybeNode phyJoin; if (join.JoinType().Value() != "Cross"sv) { - phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx); + phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx, useGraceCoreForMap); } else { YQL_ENSURE(join.JoinKeys().Empty()); diff --git a/ydb/library/yql/dq/opt/dq_opt_join.h b/ydb/library/yql/dq/opt/dq_opt_join.h index 50e7fe21ac5e..148dda6faebc 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.h +++ b/ydb/library/yql/dq/opt/dq_opt_join.h @@ -16,10 +16,10 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter); -NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx); +NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap); NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, - IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true); + IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true, bool useGraceCoreForMap = false); NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx); diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp index 1fc1e59cd194..4342b68d1f79 100644 --- a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -130,8 +131,146 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx } return structMembers; } + +TExprNode::TPtr ExpandJoinInput(const TStructExprType& type, TExprNode::TPtr&& arg, TExprContext& ctx) { + return ctx.Builder(arg->Pos()) + .Callable("ExpandMap") + .Add(0, std::move(arg)) + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + auto i = 0U; + for (const auto& item : type.GetItems()) { + parent.Callable(i++, "Member") + .Arg(0, "item") + .Atom(1, item->GetName()) + .Seal(); + } + return parent; + }) + .Seal() + .Seal().Build(); +} + } // anonymous namespace end +TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const TExprBase& node, TExprContext& ctx) { + if (!node.Maybe()) { + return node; + } + const auto graceJoin = node.Cast(); + const auto pos = graceJoin.Pos(); + + const TString leftTableLabel(GetTableLabel(graceJoin.LeftLabel())); + const TString rightTableLabel(GetTableLabel(graceJoin.RightLabel())); + + auto [leftKeyColumnNodes, rightKeyColumnNodes] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel); + const auto keyWidth = leftKeyColumnNodes.size(); + + const auto itemTypeLeft = GetSequenceItemType(graceJoin.LeftInput(), false, ctx)->Cast(); + const auto itemTypeRight = GetSequenceItemType(graceJoin.RightInput(), false, ctx)->Cast(); + + TExprNode::TListType leftRenames, rightRenames; + std::vector fullColNames; + ui32 outputIndex = 0; + + for (auto i = 0u; i < itemTypeLeft->GetSize(); i++) { + TString name(itemTypeLeft->GetItems()[i]->GetName()); + if (leftTableLabel) { + name = leftTableLabel + "." + name; + } + fullColNames.push_back(name); + leftRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i))); + leftRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++))); + } + if (graceJoin.JoinType().Value() != "LeftOnly" && graceJoin.JoinType().Value() != "LeftSemi") { + for (auto i = 0u; i < itemTypeRight->GetSize(); i++) { + TString name(itemTypeRight->GetItems()[i]->GetName()); + if (rightTableLabel) { + name = rightTableLabel + "." + name; + } + fullColNames.push_back(name); + rightRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i))); + rightRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++))); + } + } + + TTypeAnnotationNode::TListType keyTypesLeft(keyWidth); + TTypeAnnotationNode::TListType keyTypesRight(keyWidth); + TTypeAnnotationNode::TListType keyTypes(keyWidth); + for (auto i = 0U; i < keyTypes.size(); ++i) { + const auto keyTypeLeft = itemTypeLeft->FindItemType(leftKeyColumnNodes[i]->Content()); + const auto keyTypeRight = itemTypeRight->FindItemType(rightKeyColumnNodes[i]->Content()); + bool optKey = false; + keyTypes[i] = JoinDryKeyType(keyTypeLeft, keyTypeRight, optKey, ctx); + if (!keyTypes[i]) { + keyTypes.clear(); + keyTypesLeft.clear(); + keyTypesRight.clear(); + break; + } + keyTypesLeft[i] = optKey ? ctx.MakeType(keyTypes[i]) : keyTypes[i]; + keyTypesRight[i] = optKey ? ctx.MakeType(keyTypes[i]) : keyTypes[i]; + } + + auto leftInput = ExpandJoinInput(*itemTypeLeft, ctx.NewCallable(graceJoin.LeftInput().Pos(), "ToFlow", {graceJoin.LeftInput().Ptr()}), ctx); + auto rightInput = ExpandJoinInput(*itemTypeRight, ctx.NewCallable(graceJoin.RightInput().Pos(), "ToFlow", {graceJoin.RightInput().Ptr()}), ctx); + YQL_ENSURE(!keyTypes.empty()); + + for (auto i = 0U; i < leftKeyColumnNodes.size(); i++) { + const auto origName = TString(leftKeyColumnNodes[i]->Content()); + auto index = itemTypeLeft->FindItem(origName); + YQL_ENSURE(index); + leftKeyColumnNodes[i] = ctx.NewAtom(leftKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index)); + } + for (auto i = 0U; i < rightKeyColumnNodes.size(); i++) { + const auto origName = TString(rightKeyColumnNodes[i]->Content()); + auto index = itemTypeRight->FindItem(origName); + YQL_ENSURE(index); + rightKeyColumnNodes[i] = ctx.NewAtom(rightKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index)); + } + + auto [leftKeyColumnNodesCopy, rightKeyColumnNodesCopy] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel); + + auto graceJoinCore = Build(ctx, pos) + .LeftInput(std::move(leftInput)) + .RightInput(std::move(rightInput)) + .JoinKind(graceJoin.JoinType()) + .LeftKeysColumns(ctx.NewList(pos, std::move(leftKeyColumnNodes))) + .RightKeysColumns(ctx.NewList(pos, std::move(rightKeyColumnNodes))) + .LeftRenames(ctx.NewList(pos, std::move(leftRenames))) + .RightRenames(ctx.NewList(pos, std::move(rightRenames))) + .LeftKeysColumnNames(ctx.NewList(pos, std::move(leftKeyColumnNodesCopy))) + .RightKeysColumnNames(ctx.NewList(pos, std::move(rightKeyColumnNodesCopy))) + .Flags() + .Build() + .Done(); + + auto graceNode = ctx.Builder(pos) + .Callable("NarrowMap") + .Add(0, graceJoinCore.Ptr()) + .Lambda(1) + .Params("output", fullColNames.size()) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& colName : fullColNames) { + parent.List(i) + .Atom(0, colName) + .Arg(1, "output", i) + .Seal(); + i++; + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Build(); + + return TExprBase(graceNode); +} + /** * Rewrites a `KqpMapJoin` to the `MapJoinCore`. * @@ -142,10 +281,11 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx * (rely on the fact that there will be only one element in the `FlatMap`-stream) * - Align key types using `StrictCast`, use internal columns to store converted left keys */ -TExprBase DqPeepholeRewriteMapJoin(const TExprBase& node, TExprContext& ctx) { +TExprBase DqPeepholeRewriteMapJoinWithMapCore(const TExprBase& node, TExprContext& ctx) { if (!node.Maybe()) { return node; } + const auto mapJoin = node.Cast(); const auto pos = mapJoin.Pos(); diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.h b/ydb/library/yql/dq/opt/dq_opt_peephole.h index a8951dd12e83..dbe279df7d1f 100644 --- a/ydb/library/yql/dq/opt/dq_opt_peephole.h +++ b/ydb/library/yql/dq/opt/dq_opt_peephole.h @@ -10,7 +10,8 @@ namespace NYql::NDq { NNodes::TExprBase DqPeepholeRewriteCrossJoin(const NNodes::TExprBase& node, TExprContext& ctx); NNodes::TExprBase DqPeepholeRewriteJoinDict(const NNodes::TExprBase& node, TExprContext& ctx); -NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprContext& ctx); +NNodes::TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const NNodes::TExprBase& node, TExprContext& ctx); +NNodes::TExprBase DqPeepholeRewriteMapJoinWithMapCore(const NNodes::TExprBase& node, TExprContext& ctx); NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx); NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx); NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 45cd2f9b5b6c..fabdbd1a3fcd 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -2654,7 +2654,7 @@ TMaybeNode DqFlipJoin(const TDqJoin& join, TExprContext& ctx) { TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin) + const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin, bool useGraceCoreForMap) { if (!node.Maybe()) { return node; @@ -2704,7 +2704,7 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon // separate stage to receive data from both sides of join. // TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This // requires some additional knowledge, probably with use of constraints. - return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx); + return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx, useGraceCoreForMap); } TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) { diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 91ecf3dd571e..9241eb4c83ed 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -1054,6 +1054,10 @@ THolder CreateDqTypeAnnotationTransformer(TTypeAnnotationCont return AnnotateDqJoin(input, ctx); } + if (TDqPhyGraceJoin::Match(input.Get())) { + return AnnotateDqMapOrDictJoin(input, ctx); + } + if (TDqPhyMapJoin::Match(input.Get())) { return AnnotateDqMapOrDictJoin(input, ctx); } diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 24f16bd28312..46c49346b389 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -115,6 +115,7 @@ TDqConfiguration::TDqConfiguration() { } return res; }); + REGISTER_SETTING(*this, UseGraceJoinCoreForMap); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 8fe9163d9037..f05ff0610cd8 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -144,6 +144,7 @@ struct TDqSettings { NCommon::TConfSetting _MaxAttachmentsSize; NCommon::TConfSetting DisableCheckpoints; + NCommon::TConfSetting UseGraceJoinCoreForMap; // This options will be passed to executor_actor and worker_actor template diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index b31f184eb8fe..c8001eec4b96 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -57,7 +57,8 @@ namespace NYql::NDqs { TExprBase node{inputExpr}; PERFORM_RULE(DqPeepholeRewriteCrossJoin, node, ctx); PERFORM_RULE(DqPeepholeRewriteJoinDict, node, ctx); - PERFORM_RULE(DqPeepholeRewriteMapJoin, node, ctx); + PERFORM_RULE(DqPeepholeRewriteMapJoinWithGraceCore, node, ctx); + PERFORM_RULE(DqPeepholeRewriteMapJoinWithMapCore, node, ctx); PERFORM_RULE(DqPeepholeRewritePureJoin, node, ctx); PERFORM_RULE(DqPeepholeRewriteReplicate, node, ctx); PERFORM_RULE(DqPeepholeDropUnusedInputs, node, ctx); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 0eba2f3be213..2b2a3493f370 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -255,7 +255,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { const auto join = node.Cast(); const TParentsMap* parentsMap = getParents(); const auto mode = Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off); - return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, mode); + const auto useGraceJoin = Config->UseGraceJoinCoreForMap.Get().GetOrElse(false); + return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, mode, true, useGraceJoin); } template diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp index 1b5572ebf0ad..604a8ab8a5f9 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp @@ -48,6 +48,7 @@ class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase { AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate)); AddHandler({ TDqJoin::CallableName(), + TDqPhyGraceJoin::CallableName(), TDqPhyMapJoin::CallableName(), TDqPhyCrossJoin::CallableName(), TDqPhyJoinDict::CallableName(), diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index e39d37272c37..d4bb24800a00 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -34,6 +34,7 @@ class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqReplicateAlwaysError)); } AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin)); + AddHandler({TDqPhyGraceJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin)); AddHandler({TDqPhyJoinDict::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp index b01c855618d7..940848205333 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp @@ -47,7 +47,7 @@ class TDqExecutionValidator { return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get()); }, [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasJoin, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) { - if (TDqPhyMapJoin::Match(n.Get())) { + if (TDqPhyMapJoin::Match(n.Get()) || TDqPhyGraceJoin::Match(n.Get())) { hasJoin = hasMapJoin = true; } else if (TCoGraceJoinCore::Match(n.Get()) || TCoGraceSelfJoinCore::Match(n.Get())) { hasJoin = true;