Skip to content

Commit 2360165

Browse files
committed
fix(kqp): use ReadTableRange instead LookupTable
1 parent 4fe7784 commit 2360165

File tree

4 files changed

+119
-45
lines changed

4 files changed

+119
-45
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp

Lines changed: 83 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,106 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
1616
auto deleteRows = node.Cast<TKqlDeleteRows>();
1717

1818
TMaybeNode<TKqlLookupTableBase> lookup;
19+
TMaybeNode<TKqlReadTable> read;
20+
TMaybeNode<TKqlReadTableRanges> rangeRead;
1921
TMaybeNode<TCoSkipNullMembers> skipNulMembers;
2022

2123
if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
2224
lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
2325
} else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) {
2426
skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>();
2527
lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>();
28+
} else if (deleteRows.Input().Maybe<TKqlReadTable>()) {
29+
read = deleteRows.Input().Cast<TKqlReadTable>();
30+
} else if (deleteRows.Input().Maybe<TKqlReadTableRanges>()) {
31+
rangeRead = deleteRows.Input().Cast<TKqlReadTableRanges>();
2632
} else {
2733
return node;
2834
}
2935

30-
YQL_ENSURE(lookup);
31-
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
32-
return node;
33-
}
34-
35-
auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
36-
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
37-
YQL_ENSURE(lookupKeyType);
38-
39-
// Only consider complete PK lookups
36+
TMaybeNode<TExprBase> deleteInput;
4037
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, deleteRows.Table().Path());
41-
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
42-
return node;
43-
}
44-
45-
TExprBase deleteInput = lookup.Cast().LookupKeys();
46-
if (skipNulMembers) {
47-
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
48-
.Input(deleteInput)
49-
.Members(skipNulMembers.Cast().Members())
38+
if (lookup) {
39+
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
40+
return node;
41+
}
42+
43+
auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
44+
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
45+
YQL_ENSURE(lookupKeyType);
46+
47+
// Only consider complete PK lookups
48+
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
49+
return node;
50+
}
51+
52+
deleteInput = lookup.Cast().LookupKeys();
53+
if (skipNulMembers) {
54+
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
55+
.Input(deleteInput.Cast())
56+
.Members(skipNulMembers.Cast().Members())
57+
.Done();
58+
}
59+
} else if (read) {
60+
if (deleteRows.Table().Raw() != read.Cast().Table().Raw()) {
61+
return node;
62+
}
63+
64+
const auto& rangeFrom = read.Cast().Range().From();
65+
const auto& rangeTo = read.Cast().Range().To();
66+
67+
if (!rangeFrom.Maybe<TKqlKeyInc>() || !rangeTo.Maybe<TKqlKeyInc>()) {
68+
return node;
69+
}
70+
71+
if (rangeFrom.Raw() != rangeTo.Raw()) {
72+
return node;
73+
}
74+
75+
if (rangeFrom.ArgCount() != tableDesc.Metadata->KeyColumnNames.size()) {
76+
return node;
77+
}
78+
79+
TVector<TExprBase> structMembers;
80+
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
81+
TCoAtom columnNameAtom(ctx.NewAtom(node.Pos(), tableDesc.Metadata->KeyColumnNames[i]));
82+
83+
auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
84+
.Name(columnNameAtom)
85+
.Value(rangeFrom.Arg(i))
86+
.Done();
87+
88+
structMembers.push_back(member);
89+
}
90+
91+
deleteInput = Build<TCoAsList>(ctx, node.Pos())
92+
.Add<TCoAsStruct>()
93+
.Add(structMembers)
94+
.Build()
5095
.Done();
96+
} else if (rangeRead) {
97+
YQL_ENSURE(rangeRead);
98+
if (deleteRows.Table().Raw() != rangeRead.Cast().Table().Raw()) {
99+
return node;
100+
}
101+
102+
if (rangeRead.Cast().Ranges().Maybe<TCoVoid>()) {
103+
return node;
104+
}
105+
106+
auto prompt = TKqpReadTableExplainPrompt::Parse(rangeRead.Cast());
107+
if (prompt.PointPrefixLen != tableDesc.Metadata->KeyColumnNames.size()) {
108+
return node;
109+
}
110+
111+
deleteInput = rangeRead.Cast().PrefixPointsExpr();
51112
}
52113

114+
YQL_ENSURE(deleteInput);
115+
53116
return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
54117
.Table(deleteRows.Table())
55-
.Input(deleteInput)
118+
.Input(deleteInput.Cast())
56119
.Done();
57120
}
58121

ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
233233
// NOTE: Use more efficient full key lookup implementation in datashard.
234234
// Consider using lookup for partial keys as well once better constant folding
235235
// is available, currently it can introduce redundant compute stage.
236-
useDataOrGenericQueryLookup = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && isFullKey;
236+
useDataOrGenericQueryLookup = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && isFullKey
237+
&& !kqpCtx.Config->EnableKqpDataQuerySourceRead;
237238
useScanQueryLookup = kqpCtx.IsScanQuery() && isFullKey
238239
&& kqpCtx.Config->EnableKqpScanQueryStreamLookup;
239240
}

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,29 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
367367
YQL_CLOG(DEBUG, ProviderKqp) << "Ranges extracted: " << KqpExprToPrettyString(*ranges, ctx);
368368
YQL_CLOG(DEBUG, ProviderKqp) << "Residual lambda: " << KqpExprToPrettyString(*residualLambda, ctx);
369369

370+
auto buildRangeRead = [&](const TMaybeNode<TExprBase>& prefix, TMaybe<TExprBase>& result) {
371+
if (indexName) {
372+
result = Build<TKqlReadTableIndexRanges>(ctx, read.Pos())
373+
.Table(read.Table())
374+
.Ranges(ranges)
375+
.Columns(read.Columns())
376+
.Settings(read.Settings())
377+
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
378+
.Index(indexName.Cast())
379+
.PrefixPointsExpr(prefix)
380+
.Done();
381+
} else {
382+
result = Build<TKqlReadTableRanges>(ctx, read.Pos())
383+
.Table(read.Table())
384+
.Ranges(ranges)
385+
.Columns(read.Columns())
386+
.Settings(read.Settings())
387+
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
388+
.PrefixPointsExpr(prefix)
389+
.Done();
390+
}
391+
};
392+
370393
TMaybe<TExprBase> input;
371394
if (kqpCtx.Config->PredicateExtract20 &&
372395
(tableDesc.Metadata->Kind == EKikimrTableKind::Datashard ||
@@ -413,8 +436,14 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
413436
};
414437

415438
if (buildResult.LiteralRange) {
416-
bool ispoint = buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size();
417-
if (ispoint) {
439+
const bool isFullKey = buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size();
440+
const bool useDataOrGenericQueryLookup = isFullKey
441+
&& (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
442+
&& !kqpCtx.Config->EnableKqpDataQuerySourceRead;
443+
const bool useScanQueryLookup = isFullKey
444+
&& kqpCtx.IsScanQuery()
445+
&& kqpCtx.Config->EnableKqpScanQueryStreamLookup;
446+
if (useDataOrGenericQueryLookup || useScanQueryLookup) {
418447
TVector<TExprBase> structMembers;
419448
for (size_t i = 0; i < tableDesc.Metadata->KeyColumnNames.size(); ++i) {
420449
auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
@@ -465,7 +494,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
465494
} else if (buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size()) {
466495
YQL_ENSURE(prefixPointsExpr);
467496
residualLambda = pointsExtractionResult.PrunedLambda;
468-
buildLookup(prefixPointsExpr, input);
497+
buildRangeRead(prefixPointsExpr, input);
469498
}
470499
}
471500

@@ -475,26 +504,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
475504
prefix = prefixPointsExpr;
476505
}
477506

478-
if (indexName) {
479-
input = Build<TKqlReadTableIndexRanges>(ctx, read.Pos())
480-
.Table(read.Table())
481-
.Ranges(ranges)
482-
.Columns(read.Columns())
483-
.Settings(read.Settings())
484-
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
485-
.Index(indexName.Cast())
486-
.PrefixPointsExpr(prefix)
487-
.Done();
488-
} else {
489-
input = Build<TKqlReadTableRanges>(ctx, read.Pos())
490-
.Table(read.Table())
491-
.Ranges(ranges)
492-
.Columns(read.Columns())
493-
.Settings(read.Settings())
494-
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
495-
.PrefixPointsExpr(prefix)
496-
.Done();
497-
}
507+
buildRangeRead(prefix, input);
498508
}
499509

500510
*input = readMatch->BuildProcessNodes(*input, ctx);

ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
171171
NJson::ReadJsonTree(stats.query_plan(), &plan, true);
172172

173173
auto stages = FindPlanStages(plan);
174-
UNIT_ASSERT_VALUES_EQUAL(stages.size(), 2);
174+
UNIT_ASSERT_VALUES_EQUAL(stages.size(), QueryService ? 2 : 3);
175175

176176
i64 totalTasks = 0;
177177
for (const auto& stage : stages) {

0 commit comments

Comments
 (0)