diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 337746019d2b..538862ca1f25 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -180,21 +180,10 @@ { "Name": "TKqlStreamLookupTable", "Base": "TKqlLookupTableBase", - "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"} - }, - { - "Name": "TKqlStreamIdxLookupJoin", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KqlStreamIdxLookupJoin"}, + "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}, "Children": [ - {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"}, - {"Index": 1, "Name": "LeftLabel", "Type": "TCoAtom"}, - {"Index": 2, "Name": "RightTable", "Type": "TKqpTable"}, - {"Index": 3, "Name": "RightColumns", "Type": "TCoAtomList"}, - {"Index": 4, "Name": "RightLabel", "Type": "TCoAtom"}, - {"Index": 5, "Name": "JoinType", "Type": "TCoAtom"} + {"Index": 3, "Name": "LookupStrategy", "Type": "TCoAtom"} ] - }, { "Name": "TKqlSequencer", @@ -468,9 +457,9 @@ ] }, { - "Name": "TKqpIndexLookupJoin", + "Name": "TKqlIndexLookupJoinBase", "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"}, + "Match": {"Type": "CallableBase"}, "Children": [ {"Index": 0, "Name": "Input", "Type": "TExprBase"}, {"Index": 1, "Name": "JoinType", "Type": "TCoAtom"}, @@ -478,6 +467,16 @@ {"Index": 3, "Name": "RightLabel", "Type": "TCoAtom"} ] }, + { + "Name": "TKqlIndexLookupJoin", + "Base": "TKqlIndexLookupJoinBase", + "Match": {"Type": "Callable", "Name": "KqlIndexLookupJoin"} + }, + { + "Name": "TKqpIndexLookupJoin", + "Base": "TKqlIndexLookupJoinBase", + "Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"} + }, { "Name": "TKqpCnSequencer", "Base": "TKqpConnection", diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 2796aa882c4f..374c4e151907 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -425,7 +425,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData, bool withSystemColumns) { - if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) { + if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) { return TStatus::Error; } @@ -467,10 +467,57 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons } YQL_ENSURE(lookupType); - if (!EnsureStructType(node->Pos(), *lookupType, ctx)) { - return TStatus::Error; + + const TStructExprType* structType = nullptr; + bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get()); + if (isStreamLookup) { + auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy); + if (!EnsureAtom(*lookupStrategy, ctx)) { + return TStatus::Error; + } + + if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) { + if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) { + return TStatus::Error; + } + + if (!EnsureTupleTypeSize(node->Pos(), lookupType, 2, ctx)) { + return TStatus::Error; + } + + auto tupleType = lookupType->Cast(); + if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[0], ctx)) { + return TStatus::Error; + } + + if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[1], ctx)) { + return TStatus::Error; + } + + structType = tupleType->GetItems()[0]->Cast(); + auto leftRowType = tupleType->GetItems()[1]->Cast(); + + TVector outputTypes; + outputTypes.push_back(leftRowType); + outputTypes.push_back(ctx.MakeType(rowType)); + + rowType = ctx.MakeType(outputTypes); + } else { + if (!EnsureStructType(node->Pos(), *lookupType, ctx)) { + return TStatus::Error; + } + + structType = lookupType->Cast(); + } + } else { + if (!EnsureStructType(node->Pos(), *lookupType, ctx)) { + return TStatus::Error; + } + + structType = lookupType->Cast(); } - auto structType = lookupType->Cast(); + + YQL_ENSURE(structType); ui32 keyColumnsCount = 0; if (TKqlLookupIndexBase::Match(node.Get())) { @@ -1338,105 +1385,6 @@ TStatus AnnotateSequencer(const TExprNode::TPtr& node, TExprContext& ctx, const return TStatus::Ok; } -TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, - const TKikimrTablesData& tablesData, bool withSystemColumns) -{ - if (!EnsureArgsCount(*node, 6, ctx)) { - return TStatus::Error; - } - - auto leftInputType = node->Child(TKqlStreamIdxLookupJoin::idx_LeftInput)->GetTypeAnn(); - const TTypeAnnotationNode* leftInputItemType; - if (!EnsureNewSeqType(node->Pos(), *leftInputType, ctx, &leftInputItemType)) { - return TStatus::Error; - } - - YQL_ENSURE(leftInputItemType); - if (!EnsureTupleType(node->Pos(), *leftInputItemType, ctx)) { - return TStatus::Error; - } - - if (!EnsureTupleTypeSize(node->Pos(), leftInputItemType, 2, ctx)) { - return TStatus::Error; - } - - auto leftInputTupleType = leftInputItemType->Cast(); - if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[0], ctx)) { - return TStatus::Error; - } - - if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[1], ctx)) { - return TStatus::Error; - } - - if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel), ctx)) { - return TStatus::Error; - } - - TCoAtom leftLabel(node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel)); - - auto rightTable = ResolveTable(node->Child(TKqlStreamIdxLookupJoin::idx_RightTable), ctx, cluster, tablesData); - if (!rightTable.second) { - return TStatus::Error; - } - - const TStructExprType* inputKeysType = leftInputTupleType->GetItems()[0]->Cast(); - for (const auto& inputKey : inputKeysType->GetItems()) { - if (!rightTable.second->GetKeyColumnIndex(TString(inputKey->GetName()))) { - return TStatus::Error; - } - } - - if (!EnsureTupleOfAtoms(*node->Child(TKqlStreamIdxLookupJoin::idx_RightColumns), ctx)) { - return TStatus::Error; - } - - TCoAtomList rightColumns{node->ChildPtr(TKqlStreamIdxLookupJoin::idx_RightColumns)}; - for (const auto& rightColumn : rightColumns) { - if (!rightTable.second->GetColumnType(TString(rightColumn.Value()))) { - return TStatus::Error; - } - } - - auto rightDataType = GetReadTableRowType(ctx, tablesData, cluster, rightTable.first, rightColumns, withSystemColumns); - if (!rightDataType) { - return TStatus::Error; - } - - if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel), ctx)) { - return TStatus::Error; - } - - TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel)); - TCoAtom joinType(node->Child(TKqlStreamIdxLookupJoin::idx_JoinType)); - - const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast(); - TVector resultStructItems; - for (const auto& member : leftDataType->GetItems()) { - resultStructItems.emplace_back( - ctx.MakeType(TString::Join(leftLabel.Value(), ".", member->GetName()), member->GetItemType()) - ); - } - - if (RightJoinSideAllowed(joinType.Value())) { - for (const auto& member : rightDataType->Cast()->GetItems()) { - const bool makeOptional = RightJoinSideOptional(joinType.Value()) && !member->GetItemType()->IsOptionalOrNull(); - - const TTypeAnnotationNode* memberType = makeOptional - ? ctx.MakeType(member->GetItemType()) - : member->GetItemType(); - - resultStructItems.emplace_back( - ctx.MakeType(TString::Join(rightLabel.Value(), ".", member->GetName()), memberType) - ); - } - } - - auto rowType = ctx.MakeType(resultStructItems); - node->SetTypeAnn(ctx.MakeType(rowType)); - return TStatus::Ok; -} - TStatus AnnotateKqpProgram(const TExprNode::TPtr& node, TExprContext& ctx) { if (!EnsureArgsCount(*node, 2, ctx)) { return TStatus::Error; @@ -1621,7 +1569,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext YQL_ENSURE(inputItemType); - if (lookupStrategy.Value() == "LookupRows") { + if (lookupStrategy.Value() == TKqpStreamLookupStrategyName) { if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) { return TStatus::Error; } @@ -1640,7 +1588,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext node->SetTypeAnn(ctx.MakeType(rowType)); - } else if (lookupStrategy.Value() == "LookupJoinRows") { + } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) { if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) { return TStatus::Error; } @@ -1694,7 +1642,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) return TStatus::Error; } - auto inputType = node->Child(TKqpIndexLookupJoin::idx_Input)->GetTypeAnn(); + auto inputType = node->Child(TKqlIndexLookupJoinBase::idx_Input)->GetTypeAnn(); const TTypeAnnotationNode* inputItemType; if (!EnsureNewSeqType(node->Pos(), *inputType, ctx, &inputItemType)) { return TStatus::Error; @@ -1725,22 +1673,22 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) return TStatus::Error; } - if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_JoinType), ctx)) { + if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_JoinType), ctx)) { return TStatus::Error; } - if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_LeftLabel), ctx)) { + if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_LeftLabel), ctx)) { return TStatus::Error; } - TCoAtom leftLabel(node->Child(TKqpIndexLookupJoin::idx_LeftLabel)); + TCoAtom leftLabel(node->Child(TKqlIndexLookupJoinBase::idx_LeftLabel)); - if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_RightLabel), ctx)) { + if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_RightLabel), ctx)) { return TStatus::Error; } - TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel)); - TCoAtom joinType(node->Child(TKqpIndexLookupJoin::idx_JoinType)); + TCoAtom rightLabel(node->Child(TKqlIndexLookupJoinBase::idx_RightLabel)); + TCoAtom joinType(node->Child(TKqlIndexLookupJoinBase::idx_JoinType)); TVector resultStructItems; for (const auto& item : leftRowType->GetItems()) { @@ -1764,7 +1712,13 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) } auto outputRowType = ctx.MakeType(resultStructItems); - node->SetTypeAnn(ctx.MakeType(outputRowType)); + const bool isPhysical = TKqpIndexLookupJoin::Match(node.Get()); + if (isPhysical) { + node->SetTypeAnn(ctx.MakeType(outputRowType)); + } else { + node->SetTypeAnn(ctx.MakeType(outputRowType)); + } + return TStatus::Ok; } @@ -1892,7 +1846,7 @@ TAutoPtr CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); } - if (TKqpIndexLookupJoin::Match(input.Get())) { + if (TKqlIndexLookupJoinBase::Match(input.Get())) { return AnnotateIndexLookupJoin(input, ctx); } @@ -1924,10 +1878,6 @@ TAutoPtr CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateSequencer(input, ctx, cluster, *tablesData); } - if (TKqlStreamIdxLookupJoin::Match(input.Get())) { - return AnnotateStreamIdxLookupJoin(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); - } - if (TKqpProgram::Match(input.Get())) { return AnnotateKqpProgram(input, ctx); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index e9b52fd947d2..6e1d01c5568f 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -214,6 +214,17 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx, .Done(); } + if (auto maybeStreamLookup = lookup.Maybe()) { + auto streamLookup = maybeStreamLookup.Cast(); + + return Build(ctx, lookup.Pos()) + .Table(streamLookup.Table()) + .LookupKeys(streamLookup.LookupKeys()) + .Columns(usedColumns.Cast()) + .LookupStrategy(streamLookup.LookupStrategy()) + .Done(); + } + return Build(ctx, lookup.Pos()) .CallableName(lookup.CallableName()) .Table(lookup.Table()) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index 144d0ed31207..a872addcf58a 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -321,6 +321,7 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx, .Table(read.Table()) .LookupKeys(readIndexTable.Ptr()) .Columns(read.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } else { return Build(ctx, read.Pos()) @@ -365,6 +366,7 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(lookupIndex.LookupKeys()) .Columns(lookupIndex.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } @@ -382,12 +384,14 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(lookupIndex.LookupKeys()) .Columns(keyColumnsList) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); return Build(ctx, node.Pos()) .Table(lookupIndex.Table()) .LookupKeys(lookupIndexTable.Ptr()) .Columns(lookupIndex.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } @@ -424,6 +428,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(streamLookupIndex.LookupKeys()) .Columns(streamLookupIndex.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } @@ -433,12 +438,14 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(streamLookupIndex.LookupKeys()) .Columns(keyColumnsList) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); return Build(ctx, node.Pos()) .Table(streamLookupIndex.Table()) .LookupKeys(lookupIndexTable.Ptr()) .Columns(streamLookupIndex.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index f54c7a3ad848..c03e01de88cd 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -236,6 +236,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, .Build() .Build() .Columns(columns) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } @@ -249,6 +250,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, .Build() .Build() .Columns(columns) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } @@ -338,15 +340,112 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) { #define DBG(...) template -TMaybeNode BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, ReadType rightRead, TExprContext& ctx) { +TMaybeNode BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, const TKqpMatchReadResult& rightReadMatch, TExprContext& ctx) { TString leftLabel = join.LeftLabel().Maybe() ? TString(join.LeftLabel().Cast().Value()) : ""; TString rightLabel = join.RightLabel().Maybe() ? TString(join.RightLabel().Cast().Value()) : ""; + auto rightRead = rightReadMatch.Read.template Cast(); - return Build(ctx, join.Pos()) - .LeftInput(leftInput) + TExprBase lookupJoin = Build(ctx, join.Pos()) + .Table(rightRead.Table()) + .LookupKeys(leftInput) + .Columns(rightRead.Columns()) + .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName) + .Done(); + + // Stream lookup join output: stream>> + // so we should apply filters to second element of tuple for each row + + if (rightReadMatch.ExtractMembers) { + lookupJoin = Build(ctx, join.Pos()) + .Input(lookupJoin) + .Lambda() + .Args({"tuple"}) + .Body() + .Add() + .Tuple("tuple") + .Index().Value("0").Build() + .Build() + .Add() + .Input() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Members(rightReadMatch.ExtractMembers.Cast().Members()) + .Build() + .Build() + .Build() + .Done(); + } + + if (rightReadMatch.FilterNullMembers) { + lookupJoin = Build(ctx, join.Pos()) + .Input(lookupJoin) + .Lambda() + .Args({"tuple"}) + .Body() + .Add() + .Tuple("tuple") + .Index().Value("0").Build() + .Build() + .Add() + .Input() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Members(rightReadMatch.FilterNullMembers.Cast().Members()) + .Build() + .Build() + .Build() + .Done(); + } + + if (rightReadMatch.SkipNullMembers) { + lookupJoin = Build(ctx, join.Pos()) + .Input(lookupJoin) + .Lambda() + .Args({"tuple"}) + .Body() + .Add() + .Tuple("tuple") + .Index().Value("0").Build() + .Build() + .Add() + .Input() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Members(rightReadMatch.SkipNullMembers.Cast().Members()) + .Build() + .Build() + .Build() + .Done(); + } + + if (rightReadMatch.FlatMap) { + lookupJoin = Build(ctx, join.Pos()) + .Input(lookupJoin) + .Lambda() + .Args({"tuple"}) + .Body() + .Add() + .Tuple("tuple") + .Index().Value("0").Build() + .Build() + .Add() + .Input() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Lambda(rightReadMatch.FlatMap.Cast().Lambda()) + .Build() + .Build() + .Build() + .Done(); + } + + return Build(ctx, join.Pos()) + .Input(lookupJoin) .LeftLabel().Build(leftLabel) - .RightTable(rightRead.Table()) - .RightColumns(rightRead.Columns()) .RightLabel().Build(rightLabel) .JoinType(join.JoinType()) .Done(); @@ -689,7 +788,7 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Build() .Done(); - return BuildKqpStreamIndexLookupJoin(join, leftInput, rightRead, ctx); + return BuildKqpStreamIndexLookupJoin(join, leftInput, *rightReadMatch, ctx); } auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos()); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index a4e8e2f094b1..7d28a665766c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -273,6 +273,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T .Table(read.Table()) .LookupKeys(lookupKeys) .Columns(read.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } } else { @@ -433,6 +434,7 @@ TExprBase KqpRewriteLookupTable(const TExprBase& node, TExprContext& ctx, const .Table(lookup.Table()) .LookupKeys(lookup.LookupKeys()) .Columns(lookup.Columns()) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index abb55cd92879..1f627bc45af3 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -399,6 +399,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx .Table(read.Table()) .Columns(read.Columns()) .LookupKeys(keys) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } } else { diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 330bcd400f93..5a356dcbce93 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -34,7 +34,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage)); AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage)); AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages)); - AddHandler(0, &TKqlStreamIdxLookupJoin::Match, HNDL(BuildStreamIdxLookupJoinStages)); + AddHandler(0, &TKqlIndexLookupJoin::Match, HNDL(BuildStreamIdxLookupJoinStages)); AddHandler(0, &TKqlSequencer::Match, HNDL(BuildSequencerStages)); AddHandler(0, [](auto) { return true; }, HNDL(RemoveRedundantSortByPk)); AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 854ab938f319..74cba669f5ae 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -702,7 +702,7 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Table(lookup.Table()) .Columns(lookup.Columns()) .InputType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx)) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .LookupStrategy(lookup.LookupStrategy()) .Done(); } else if (lookup.LookupKeys().Maybe()) { @@ -713,7 +713,7 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Table(lookup.Table()) .Columns(lookup.Columns()) .InputType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx)) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .LookupStrategy(lookup.LookupStrategy()) .Done(); } else { return node; @@ -738,32 +738,24 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase } NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx) { - if (!node.Maybe()) { + if (!node.Maybe()) { return node; } - const auto& idxLookupJoin = node.Cast(); - YQL_ENSURE(idxLookupJoin.LeftInput().Maybe(), "Expected UnionAll as left input"); - - auto output = idxLookupJoin.LeftInput().Cast().Output(); - auto cnStreamIdxLookupJoin = Build(ctx, idxLookupJoin.Pos()) - .Output(output) - .Table(idxLookupJoin.RightTable()) - .Columns(idxLookupJoin.RightColumns()) - .InputType(ExpandType(idxLookupJoin.Pos(), *output.Ref().GetTypeAnn(), ctx)) - .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName) - .Done(); + const auto& idxLookupJoin = node.Cast(); return Build(ctx, node.Pos()) .Output() .Stage() .Inputs() - .Add(cnStreamIdxLookupJoin) + .Add(idxLookupJoin.Input()) .Build() .Program() .Args({"stream_lookup_join_output"}) .Body() - .Input("stream_lookup_join_output") + .Input() + .Input("stream_lookup_join_output") + .Build() .JoinType(idxLookupJoin.JoinType()) .LeftLabel(idxLookupJoin.LeftLabel()) .RightLabel(idxLookupJoin.RightLabel()) diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 3c69410c54a6..9bd08fb8095a 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -61,7 +61,8 @@ void PrepareTables(TSession session) { (101, "Value21"), (102, "Value22"), (103, "Value23"), - (NULL, "Value24"); + (NULL, "Value24"), + (104, NULL); REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES ("eProcess", false, 4), @@ -174,7 +175,7 @@ Y_UNIT_TEST(MultiJoins) { CompareYson(answer, FormatResultSetYson(result.GetResultSet(0))); } -Y_UNIT_TEST(Inner) { +Y_UNIT_TEST_TWIN(Inner, StreamLookup) { Test( R"( SELECT l.Key, l.Fk, l.Value, r.Key, r.Value @@ -186,10 +187,10 @@ Y_UNIT_TEST(Inner) { )", R"([ [[1];[101];["Value1"];[101];["Value21"]] - ])", 2); + ])", 2, StreamLookup); } -Y_UNIT_TEST(Left) { +Y_UNIT_TEST_TWIN(Left, StreamLookup) { Test( R"( SELECT l.Key, l.Fk, l.Value, r.Key, r.Value @@ -201,14 +202,14 @@ Y_UNIT_TEST(Left) { )", R"([ [[3];[103];["Value2"];[103];["Value23"]]; - [[4];[104];["Value2"];#;#]; + [[4];[104];["Value2"];[104];#]; [[5];[105];["Value3"];#;#]; [[6];#;["Value6"];#;#]; [[7];#;["Value7"];#;#] - ])", 1); + ])", 2, StreamLookup); } -Y_UNIT_TEST(LeftOnly) { +Y_UNIT_TEST_TWIN(LeftOnly, StreamLookup) { Test( R"( SELECT l.Key, l.Fk, l.Value @@ -219,11 +220,10 @@ Y_UNIT_TEST(LeftOnly) { ORDER BY l.Key )", R"([ - [[4];[104];["Value2"]]; [[5];[105];["Value3"]]; [[6];#;["Value6"]]; [[7];#;["Value7"]] - ])", 1); + ])", 2, StreamLookup); } Y_UNIT_TEST(LeftSemi) { @@ -236,8 +236,9 @@ Y_UNIT_TEST(LeftSemi) { WHERE l.Value != 'Value1' -- left table payload filter )", R"([ - [[3];[103];["Value2"]] - ])", 1); + [[3];[103];["Value2"]]; + [[4];[104];["Value2"]] + ])", 2); } Y_UNIT_TEST(RightSemi) { @@ -253,7 +254,7 @@ Y_UNIT_TEST(RightSemi) { R"([ [[101];["Value21"]]; [[103];["Value23"]] - ])", 3); + ])", 4); } Y_UNIT_TEST_TWIN(SimpleInnerJoin, StreamLookup) { @@ -268,8 +269,9 @@ Y_UNIT_TEST_TWIN(SimpleInnerJoin, StreamLookup) { R"([ [[1];[101];["Value1"];[101];["Value21"]]; [[2];[102];["Value1"];[102];["Value22"]]; - [[3];[103];["Value2"];[103];["Value23"]] - ])", 3, StreamLookup); + [[3];[103];["Value2"];[103];["Value23"]]; + [[4];[104];["Value2"];[104];#] + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(InnerJoinCustomColumnOrder, StreamLookup) { @@ -284,8 +286,9 @@ Y_UNIT_TEST_TWIN(InnerJoinCustomColumnOrder, StreamLookup) { R"([ [["Value21"];[1];[101];["Value1"];[101]]; [["Value22"];[2];[102];["Value1"];[102]]; - [["Value23"];[3];[103];["Value2"];[103]] - ])", 3, StreamLookup); + [["Value23"];[3];[103];["Value2"];[103]]; + [#;[4];[104];["Value2"];[104]] + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(InnerJoinOnlyRightColumn, StreamLookup) { @@ -298,10 +301,11 @@ Y_UNIT_TEST_TWIN(InnerJoinOnlyRightColumn, StreamLookup) { ORDER BY r.Value; )", R"([ + [#]; [["Value21"]]; [["Value22"]]; [["Value23"]] - ])", 3, StreamLookup); + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(InnerJoinOnlyLeftColumn, StreamLookup) { @@ -316,8 +320,9 @@ Y_UNIT_TEST_TWIN(InnerJoinOnlyLeftColumn, StreamLookup) { R"([ [[101]]; [[102]]; - [[103]] - ])", 3, StreamLookup); + [[103]]; + [[104]] + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(InnerJoinLeftFilter, StreamLookup) { @@ -331,8 +336,9 @@ Y_UNIT_TEST_TWIN(InnerJoinLeftFilter, StreamLookup) { ORDER BY l.Key; )", R"([ - [[3];[103];["Value2"];[103];["Value23"]] - ])", 1, StreamLookup); + [[3];[103];["Value2"];[103];["Value23"]]; + [[4];[104];["Value2"];[104];#] + ])", 2, StreamLookup); } Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) { @@ -348,11 +354,11 @@ Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) { [[1];[101];["Value1"];[101];["Value21"]]; [[2];[102];["Value1"];[102];["Value22"]]; [[3];[103];["Value2"];[103];["Value23"]]; - [[4];[104];["Value2"];#;#]; + [[4];[104];["Value2"];[104];#]; [[5];[105];["Value3"];#;#]; [[6];#;["Value6"];#;#]; [[7];#;["Value7"];#;#] - ])", 3, StreamLookup); + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) { @@ -368,11 +374,11 @@ Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) { [["Value21"];[1];[101];["Value1"];[101]]; [["Value22"];[2];[102];["Value1"];[102]]; [["Value23"];[3];[103];["Value2"];[103]]; - [#;[4];#;["Value2"];[104]]; + [#;[4];[104];["Value2"];[104]]; [#;[5];#;["Value3"];[105]]; [#;[6];#;["Value6"];#]; [#;[7];#;["Value7"];#] - ])", 3, StreamLookup); + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) { @@ -392,7 +398,7 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) { [["Value21"]]; [["Value22"]]; [["Value23"]] - ])", 3, StreamLookup); + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) { @@ -412,41 +418,74 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) { [[103]]; [[104]]; [[105]] - ])", 3, StreamLookup); + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(SimpleLeftOnlyJoin, StreamLookup) { Test( R"( - SELECT l.Key, l.Fk, l.Value - FROM `/Root/Left` AS l - LEFT ONLY JOIN `/Root/Right` AS r - ON l.Fk = r.Key - ORDER BY l.Key - )", - R"([ - [[4];[104];["Value2"]]; - [[5];[105];["Value3"]]; - [[6];#;["Value6"]]; - [[7];#;["Value7"]] - ])", 3, StreamLookup); + SELECT l.Key, l.Fk, l.Value + FROM `/Root/Left` AS l + LEFT ONLY JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Key + )", + R"([ + [[5];[105];["Value3"]]; + [[6];#;["Value6"]]; + [[7];#;["Value7"]] + ])", 4, StreamLookup); } Y_UNIT_TEST_TWIN(LeftOnlyJoinValueColumn, StreamLookup) { Test( R"( - SELECT l.Value - FROM `/Root/Left` AS l - LEFT ONLY JOIN `/Root/Right` AS r - ON l.Fk = r.Key - ORDER BY l.Value - )", - R"([ - [["Value2"]]; - [["Value3"]]; - [["Value6"]]; - [["Value7"]] - ])", 3, StreamLookup); + SELECT l.Value + FROM `/Root/Left` AS l + LEFT ONLY JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Value + )", + R"([ + [["Value3"]]; + [["Value6"]]; + [["Value7"]] + ])", 4, StreamLookup); +} + +Y_UNIT_TEST_TWIN(LeftJoinRightNullFilter, StreamLookup) { + Test( + R"( + SELECT l.Value, r.Value + FROM `/Root/Left` AS l + LEFT JOIN `/Root/Right` AS r + ON l.Fk = r.Key + WHERE r.Value IS NULL + ORDER BY l.Value + )", + R"([ + [["Value2"];#]; + [["Value3"];#]; + [["Value6"];#]; + [["Value7"];#] + ])", 4, StreamLookup); +} + +Y_UNIT_TEST_TWIN(LeftJoinSkipNullFilter, StreamLookup) { + Test( + R"( + SELECT l.Value, r.Value + FROM `/Root/Left` AS l + LEFT JOIN `/Root/Right` AS r + ON l.Fk = r.Key + WHERE r.Value IS NOT NULL + ORDER BY l.Value + )", + R"([ + [["Value1"];["Value21"]]; + [["Value1"];["Value22"]]; + [["Value2"];["Value23"]] + ])", 4, StreamLookup); } void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) {