Skip to content

Commit e5de0d2

Browse files
committed
KIKIMR-20432, KIKIMR-20431: support pg and not null types for stream lookup join
fix(kqp): support pg and not null types for stream lookup join
1 parent 6155095 commit e5de0d2

File tree

7 files changed

+134
-16
lines changed

7 files changed

+134
-16
lines changed

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,12 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
366366
keyColumnProto->SetName(keyColumn);
367367
keyColumnProto->SetId(columnIt->second.Id);
368368
keyColumnProto->SetTypeId(columnIt->second.Type.GetTypeId());
369+
370+
if (columnIt->second.Type.GetTypeId() == NScheme::NTypeIds::Pg) {
371+
auto& typeInfo = *keyColumnProto->MutableTypeInfo();
372+
typeInfo.SetPgTypeId(NPg::PgTypeIdFromTypeDesc(columnIt->second.Type.GetTypeDesc()));
373+
typeInfo.SetPgTypeMod(columnIt->second.TypeMod);
374+
}
369375
}
370376

371377
for (const auto& keyColumn : streamLookup.GetKeyColumns()) {
@@ -382,6 +388,12 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
382388
columnProto->SetName(column);
383389
columnProto->SetId(columnIt->second.Id);
384390
columnProto->SetTypeId(columnIt->second.Type.GetTypeId());
391+
392+
if (columnIt->second.Type.GetTypeId() == NScheme::NTypeIds::Pg) {
393+
auto& typeInfo = *columnProto->MutableTypeInfo();
394+
typeInfo.SetPgTypeId(NPg::PgTypeIdFromTypeDesc(columnIt->second.Type.GetTypeDesc()));
395+
typeInfo.SetPgTypeMod(columnIt->second.TypeMod);
396+
}
385397
}
386398

387399
settings->SetLookupStrategy(streamLookup.GetLookupStrategy());

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ bool RightJoinSideAllowed(const TStringBuf& joinType) {
2727
return joinType != "LeftOnly";
2828
}
2929

30+
bool RightJoinSideOptional(const TStringBuf& joinType) {
31+
return joinType == "Left";
32+
}
33+
3034
const TTypeAnnotationNode* MakeKqpEffectType(TExprContext& ctx) {
3135
return ctx.MakeType<TResourceExprType>(KqpEffectTag);
3236
}
@@ -1379,8 +1383,14 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c
13791383

13801384
if (RightJoinSideAllowed(joinType.Value())) {
13811385
for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
1386+
const bool makeOptional = RightJoinSideOptional(joinType.Value()) && !member->GetItemType()->IsOptionalOrNull();
1387+
1388+
const TTypeAnnotationNode* memberType = makeOptional
1389+
? ctx.MakeType<TOptionalExprType>(member->GetItemType())
1390+
: member->GetItemType();
1391+
13821392
resultStructItems.emplace_back(
1383-
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType())
1393+
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), memberType)
13841394
);
13851395
}
13861396
}
@@ -1704,8 +1714,14 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
17041714

17051715
if (RightJoinSideAllowed(joinType.Value())) {
17061716
for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) {
1717+
const bool makeOptional = RightJoinSideOptional(joinType.Value()) && !item->GetItemType()->IsOptionalOrNull();
1718+
1719+
const TTypeAnnotationNode* itemType = makeOptional
1720+
? ctx.MakeType<TOptionalExprType>(item->GetItemType())
1721+
: item->GetItemType();
1722+
17071723
resultStructItems.emplace_back(
1708-
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType())
1724+
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), itemType)
17091725
);
17101726
}
17111727
}

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
614614
return {};
615615
}
616616

617-
const bool useStreamIndexLookupJoin = kqpCtx.IsDataQuery()
617+
const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
618618
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
619619
&& supportedStreamJoinKinds.contains(join.JoinType().Value());
620620

ydb/core/kqp/runtime/kqp_program_builder.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ bool RightJoinSideAllowed(const TString& joinType) {
163163
return joinType != "LeftOnly";
164164
}
165165

166+
bool RightJoinSideOptional(const TString& joinType) {
167+
return joinType == "Left";
168+
}
166169
} // namespace
167170

168171
TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry)
@@ -355,7 +358,16 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
355358
for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
356359
TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
357360
: TString::Join(rightLabel, ".", rightRowType->GetMemberName(i));
358-
rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i));
361+
362+
const bool makeOptional = RightJoinSideOptional(joinType)
363+
&& rightRowType->GetMemberType(i)->GetKind() != TType::EKind::Optional
364+
&& rightRowType->GetMemberType(i)->GetKind() != TType::EKind::Pg;
365+
366+
TType* memberType = makeOptional
367+
? TOptionalType::Create(rightRowType->GetMemberType(i), GetTypeEnvironment())
368+
: rightRowType->GetMemberType(i);
369+
370+
rowTypeBuilder.Add(newMemberName, memberType);
359371
}
360372
}
361373

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,13 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
150150
TSysTables::TTableColumnInfo{
151151
keyColumn.GetName(),
152152
keyColumn.GetId(),
153-
NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())},
154-
"",
153+
NScheme::TTypeInfo{
154+
static_cast<NScheme::TTypeId>(keyColumn.GetTypeId()),
155+
keyColumn.GetTypeId() == NScheme::NTypeIds::Pg
156+
? NPg::TypeDescFromPgTypeId(keyColumn.GetTypeInfo().GetPgTypeId())
157+
: nullptr
158+
},
159+
keyColumn.GetTypeInfo().GetPgTypeMod(),
155160
keyOrder++
156161
}
157162
);
@@ -169,7 +174,12 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
169174
Columns.emplace_back(TSysTables::TTableColumnInfo{
170175
column.GetName(),
171176
column.GetId(),
172-
NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())}
177+
NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId()),
178+
column.GetTypeId() == NScheme::NTypeIds::Pg
179+
? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId())
180+
: nullptr,
181+
},
182+
column.GetTypeInfo().GetPgTypeMod()
173183
});
174184
}
175185
}

ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1756,9 +1756,11 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
17561756
}
17571757
}
17581758

1759-
1760-
Y_UNIT_TEST(JoinBothTablesWithNotNullPk) {
1761-
TKikimrRunner kikimr;
1759+
Y_UNIT_TEST_TWIN(JoinBothTablesWithNotNullPk, StreamLookup) {
1760+
NKikimrConfig::TAppConfig appConfig;
1761+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);
1762+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
1763+
TKikimrRunner kikimr(settings);
17621764
auto client = kikimr.GetTableClient();
17631765
auto session = client.CreateSession().GetValueSync().GetSession();
17641766

@@ -1805,8 +1807,11 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
18051807
}
18061808
}
18071809

1808-
Y_UNIT_TEST(JoinLeftTableWithNotNullPk) {
1809-
TKikimrRunner kikimr;
1810+
Y_UNIT_TEST_TWIN(JoinLeftTableWithNotNullPk, StreamLookup) {
1811+
NKikimrConfig::TAppConfig appConfig;
1812+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);
1813+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
1814+
TKikimrRunner kikimr(settings);
18101815
auto client = kikimr.GetTableClient();
18111816
auto session = client.CreateSession().GetValueSync().GetSession();
18121817

@@ -1873,6 +1878,63 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
18731878
}
18741879
}
18751880

1881+
Y_UNIT_TEST_TWIN(JoinRightTableWithNotNullColumns, StreamLookup) {
1882+
NKikimrConfig::TAppConfig appConfig;
1883+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);
1884+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
1885+
TKikimrRunner kikimr(settings);
1886+
auto client = kikimr.GetTableClient();
1887+
auto session = client.CreateSession().GetValueSync().GetSession();
1888+
1889+
{
1890+
auto createTableResult = session.ExecuteSchemeQuery(Q1_(R"(
1891+
CREATE TABLE `/Root/Left` (
1892+
Key Uint64,
1893+
Value String,
1894+
PRIMARY KEY (Key)
1895+
);
1896+
)")).ExtractValueSync();
1897+
UNIT_ASSERT_C(createTableResult.IsSuccess(), createTableResult.GetIssues().ToString());
1898+
1899+
auto result = session.ExecuteDataQuery(Q1_(R"(
1900+
UPSERT INTO `/Root/Left` (Key, Value) VALUES (1, 'lValue1'), (2, 'lValue2');
1901+
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1902+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1903+
}
1904+
1905+
{
1906+
auto createTableResult = session.ExecuteSchemeQuery(Q1_(R"(
1907+
CREATE TABLE `/Root/Right` (
1908+
Key Uint64 NOT NULL,
1909+
Value String NOT NULL,
1910+
PRIMARY KEY (Key)
1911+
);
1912+
)")).ExtractValueSync();
1913+
UNIT_ASSERT_C(createTableResult.IsSuccess(), createTableResult.GetIssues().ToString());
1914+
1915+
auto result = session.ExecuteDataQuery(Q1_(R"(
1916+
UPSERT INTO `/Root/Right` (Key, Value) VALUES (1, 'rValue1'), (3, 'rValue3'), (4, 'rValue4');
1917+
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1918+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1919+
}
1920+
1921+
{ // left join
1922+
const auto query = Q1_(R"(
1923+
SELECT l.Key, l.Value, r.Key, r.Value FROM `/Root/Left` AS l LEFT
1924+
JOIN `/Root/Right` AS r
1925+
ON l.Key = r.Key
1926+
ORDER BY l.Key;
1927+
)");
1928+
1929+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1930+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1931+
CompareYson(R"([
1932+
[[1u];["lValue1"];[1u];["rValue1"]];
1933+
[[2u];["lValue2"];#;#]
1934+
])", FormatResultSetYson(result.GetResultSet(0)));
1935+
}
1936+
}
1937+
18761938
Y_UNIT_TEST(Describe) {
18771939
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
18781940
auto client = kikimr.GetTableClient();

ydb/core/kqp/ut/pg/kqp_pg_ut.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,8 +2484,14 @@ Y_UNIT_TEST_SUITE(KqpPg) {
24842484
}
24852485
}
24862486

2487-
Y_UNIT_TEST(JoinWithQueryService) {
2488-
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
2487+
Y_UNIT_TEST_TWIN(JoinWithQueryService, StreamLookup) {
2488+
NKikimrConfig::TAppConfig appConfig;
2489+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);
2490+
auto serverSettings = TKikimrSettings()
2491+
.SetAppConfig(appConfig)
2492+
.SetWithSampleTables(false);
2493+
2494+
TKikimrRunner kikimr(serverSettings);
24892495
auto client = kikimr.GetTableClient();
24902496
auto db = kikimr.GetQueryClient();
24912497
auto settings = NYdb::NQuery::TExecuteQuerySettings()
@@ -2506,8 +2512,8 @@ Y_UNIT_TEST_SUITE(KqpPg) {
25062512
const auto query = Q_(R"(
25072513
--!syntax_pg
25082514
CREATE TABLE t2(
2509-
id2 int4 PRIMARY KEY,
2510-
val2 text
2515+
id2 int4 PRIMARY KEY NOT NULL,
2516+
val2 text NOT NULL
25112517
))");
25122518
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
25132519
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

0 commit comments

Comments
 (0)