Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,10 @@
{
"Name": "TKqlStreamLookupTable",
"Base": "TKqlLookupTableBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}
},
{
"Name": "TKqlStreamIdxLookupJoin",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KqlStreamIdxLookupJoin"},
"Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"},
"Children": [
{"Index": 0, "Name": "LeftInput", "Type": "TExprBase"},
{"Index": 1, "Name": "LeftLabel", "Type": "TCoAtom"},
{"Index": 2, "Name": "RightTable", "Type": "TKqpTable"},
{"Index": 3, "Name": "RightColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "RightLabel", "Type": "TCoAtom"},
{"Index": 5, "Name": "JoinType", "Type": "TCoAtom"}
{"Index": 3, "Name": "LookupStrategy", "Type": "TCoAtom"}
]

},
{
"Name": "TKqlSequencer",
Expand Down Expand Up @@ -468,16 +457,26 @@
]
},
{
"Name": "TKqpIndexLookupJoin",
"Name": "TKqlIndexLookupJoinBase",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"},
"Match": {"Type": "CallableBase"},
"Children": [
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
{"Index": 1, "Name": "JoinType", "Type": "TCoAtom"},
{"Index": 2, "Name": "LeftLabel", "Type": "TCoAtom"},
{"Index": 3, "Name": "RightLabel", "Type": "TCoAtom"}
]
},
{
"Name": "TKqlIndexLookupJoin",
"Base": "TKqlIndexLookupJoinBase",
"Match": {"Type": "Callable", "Name": "KqlIndexLookupJoin"}
},
{
"Name": "TKqpIndexLookupJoin",
"Base": "TKqlIndexLookupJoinBase",
"Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"}
},
{
"Name": "TKqpCnSequencer",
"Base": "TKqpConnection",
Expand Down
186 changes: 68 additions & 118 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -467,10 +467,57 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
}

YQL_ENSURE(lookupType);
if (!EnsureStructType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;

const TStructExprType* structType = nullptr;
bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get());
if (isStreamLookup) {
auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy);
if (!EnsureAtom(*lookupStrategy, ctx)) {
return TStatus::Error;
}

if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) {
if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
}

if (!EnsureTupleTypeSize(node->Pos(), lookupType, 2, ctx)) {
return TStatus::Error;
}

auto tupleType = lookupType->Cast<TTupleExprType>();
if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}

if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[1], ctx)) {
return TStatus::Error;
}

structType = tupleType->GetItems()[0]->Cast<TStructExprType>();
auto leftRowType = tupleType->GetItems()[1]->Cast<TStructExprType>();

TVector<const TTypeAnnotationNode*> outputTypes;
outputTypes.push_back(leftRowType);
outputTypes.push_back(ctx.MakeType<TOptionalExprType>(rowType));

rowType = ctx.MakeType<TTupleExprType>(outputTypes);
} else {
if (!EnsureStructType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
}

structType = lookupType->Cast<TStructExprType>();
}
} else {
if (!EnsureStructType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
}

structType = lookupType->Cast<TStructExprType>();
}
auto structType = lookupType->Cast<TStructExprType>();

YQL_ENSURE(structType);

ui32 keyColumnsCount = 0;
if (TKqlLookupIndexBase::Match(node.Get())) {
Expand Down Expand Up @@ -1338,105 +1385,6 @@ TStatus AnnotateSequencer(const TExprNode::TPtr& node, TExprContext& ctx, const
return TStatus::Ok;
}

TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
if (!EnsureArgsCount(*node, 6, ctx)) {
return TStatus::Error;
}

auto leftInputType = node->Child(TKqlStreamIdxLookupJoin::idx_LeftInput)->GetTypeAnn();
const TTypeAnnotationNode* leftInputItemType;
if (!EnsureNewSeqType<false>(node->Pos(), *leftInputType, ctx, &leftInputItemType)) {
return TStatus::Error;
}

YQL_ENSURE(leftInputItemType);
if (!EnsureTupleType(node->Pos(), *leftInputItemType, ctx)) {
return TStatus::Error;
}

if (!EnsureTupleTypeSize(node->Pos(), leftInputItemType, 2, ctx)) {
return TStatus::Error;
}

auto leftInputTupleType = leftInputItemType->Cast<TTupleExprType>();
if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}

if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[1], ctx)) {
return TStatus::Error;
}

if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel), ctx)) {
return TStatus::Error;
}

TCoAtom leftLabel(node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel));

auto rightTable = ResolveTable(node->Child(TKqlStreamIdxLookupJoin::idx_RightTable), ctx, cluster, tablesData);
if (!rightTable.second) {
return TStatus::Error;
}

const TStructExprType* inputKeysType = leftInputTupleType->GetItems()[0]->Cast<TStructExprType>();
for (const auto& inputKey : inputKeysType->GetItems()) {
if (!rightTable.second->GetKeyColumnIndex(TString(inputKey->GetName()))) {
return TStatus::Error;
}
}

if (!EnsureTupleOfAtoms(*node->Child(TKqlStreamIdxLookupJoin::idx_RightColumns), ctx)) {
return TStatus::Error;
}

TCoAtomList rightColumns{node->ChildPtr(TKqlStreamIdxLookupJoin::idx_RightColumns)};
for (const auto& rightColumn : rightColumns) {
if (!rightTable.second->GetColumnType(TString(rightColumn.Value()))) {
return TStatus::Error;
}
}

auto rightDataType = GetReadTableRowType(ctx, tablesData, cluster, rightTable.first, rightColumns, withSystemColumns);
if (!rightDataType) {
return TStatus::Error;
}

if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel), ctx)) {
return TStatus::Error;
}

TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel));
TCoAtom joinType(node->Child(TKqlStreamIdxLookupJoin::idx_JoinType));

const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>();
TVector<const TItemExprType*> resultStructItems;
for (const auto& member : leftDataType->GetItems()) {
resultStructItems.emplace_back(
ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", member->GetName()), member->GetItemType())
);
}

if (RightJoinSideAllowed(joinType.Value())) {
for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
const bool makeOptional = RightJoinSideOptional(joinType.Value()) && !member->GetItemType()->IsOptionalOrNull();

const TTypeAnnotationNode* memberType = makeOptional
? ctx.MakeType<TOptionalExprType>(member->GetItemType())
: member->GetItemType();

resultStructItems.emplace_back(
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), memberType)
);
}
}

auto rowType = ctx.MakeType<TStructExprType>(resultStructItems);
node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
return TStatus::Ok;
}

TStatus AnnotateKqpProgram(const TExprNode::TPtr& node, TExprContext& ctx) {
if (!EnsureArgsCount(*node, 2, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -1621,7 +1569,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

YQL_ENSURE(inputItemType);

if (lookupStrategy.Value() == "LookupRows") {
if (lookupStrategy.Value() == TKqpStreamLookupStrategyName) {
if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
Expand All @@ -1640,7 +1588,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));

} else if (lookupStrategy.Value() == "LookupJoinRows") {
} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) {
if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
Expand Down Expand Up @@ -1694,7 +1642,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
return TStatus::Error;
}

auto inputType = node->Child(TKqpIndexLookupJoin::idx_Input)->GetTypeAnn();
auto inputType = node->Child(TKqlIndexLookupJoinBase::idx_Input)->GetTypeAnn();
const TTypeAnnotationNode* inputItemType;
if (!EnsureNewSeqType<false>(node->Pos(), *inputType, ctx, &inputItemType)) {
return TStatus::Error;
Expand Down Expand Up @@ -1725,22 +1673,22 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
return TStatus::Error;
}

if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_JoinType), ctx)) {
if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_JoinType), ctx)) {
return TStatus::Error;
}

if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_LeftLabel), ctx)) {
if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_LeftLabel), ctx)) {
return TStatus::Error;
}

TCoAtom leftLabel(node->Child(TKqpIndexLookupJoin::idx_LeftLabel));
TCoAtom leftLabel(node->Child(TKqlIndexLookupJoinBase::idx_LeftLabel));

if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_RightLabel), ctx)) {
if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_RightLabel), ctx)) {
return TStatus::Error;
}

TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel));
TCoAtom joinType(node->Child(TKqpIndexLookupJoin::idx_JoinType));
TCoAtom rightLabel(node->Child(TKqlIndexLookupJoinBase::idx_RightLabel));
TCoAtom joinType(node->Child(TKqlIndexLookupJoinBase::idx_JoinType));

TVector<const TItemExprType*> resultStructItems;
for (const auto& item : leftRowType->GetItems()) {
Expand All @@ -1764,7 +1712,13 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
}

auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems);
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
const bool isPhysical = TKqpIndexLookupJoin::Match(node.Get());
if (isPhysical) {
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
} else {
node->SetTypeAnn(ctx.MakeType<TListExprType>(outputRowType));
}

return TStatus::Ok;
}

Expand Down Expand Up @@ -1892,7 +1846,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
}

if (TKqpIndexLookupJoin::Match(input.Get())) {
if (TKqlIndexLookupJoinBase::Match(input.Get())) {
return AnnotateIndexLookupJoin(input, ctx);
}

Expand Down Expand Up @@ -1924,10 +1878,6 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateSequencer(input, ctx, cluster, *tablesData);
}

if (TKqlStreamIdxLookupJoin::Match(input.Get())) {
return AnnotateStreamIdxLookupJoin(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
}

if (TKqpProgram::Match(input.Get())) {
return AnnotateKqpProgram(input, ctx);
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,17 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx,
.Done();
}

if (auto maybeStreamLookup = lookup.Maybe<TKqlStreamLookupTable>()) {
auto streamLookup = maybeStreamLookup.Cast();

return Build<TKqlStreamLookupTable>(ctx, lookup.Pos())
.Table(streamLookup.Table())
.LookupKeys(streamLookup.LookupKeys())
.Columns(usedColumns.Cast())
.LookupStrategy(streamLookup.LookupStrategy())
.Done();
}

return Build<TKqlLookupTableBase>(ctx, lookup.Pos())
.CallableName(lookup.CallableName())
.Table(lookup.Table())
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx,
.Table(read.Table())
.LookupKeys(readIndexTable.Ptr())
.Columns(read.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
} else {
return Build<TKqlLookupTable>(ctx, read.Pos())
Expand Down Expand Up @@ -365,6 +366,7 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(lookupIndex.LookupKeys())
.Columns(lookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

Expand All @@ -382,12 +384,14 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(lookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();

return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(lookupIndex.Table())
.LookupKeys(lookupIndexTable.Ptr())
.Columns(lookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

Expand Down Expand Up @@ -424,6 +428,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

Expand All @@ -433,12 +438,14 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();

return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(streamLookupIndex.Table())
.LookupKeys(lookupIndexTable.Ptr())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

Expand Down
Loading