Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 65 additions & 21 deletions ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,87 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
auto deleteRows = node.Cast<TKqlDeleteRows>();

TMaybeNode<TKqlLookupTableBase> lookup;
TMaybeNode<TKqlReadTable> read;
TMaybeNode<TCoSkipNullMembers> skipNulMembers;

if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
} else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) {
skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>();
lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>();
} else if (deleteRows.Input().Maybe<TKqlReadTable>()) {
read = deleteRows.Input().Cast<TKqlReadTable>();
} else {
return node;
}

YQL_ENSURE(lookup);
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
return node;
}

auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
YQL_ENSURE(lookupKeyType);

// Only consider complete PK lookups
TMaybeNode<TExprBase> deleteInput;
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, deleteRows.Table().Path());
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

TExprBase deleteInput = lookup.Cast().LookupKeys();
if (skipNulMembers) {
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
.Input(deleteInput)
.Members(skipNulMembers.Cast().Members())
if (lookup) {
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
return node;
}

auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
YQL_ENSURE(lookupKeyType);

// Only consider complete PK lookups
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

deleteInput = lookup.Cast().LookupKeys();
if (skipNulMembers) {
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
.Input(deleteInput.Cast())
.Members(skipNulMembers.Cast().Members())
.Done();
}
} else if (read) {
if (deleteRows.Table().Raw() != read.Cast().Table().Raw()) {
return node;
}

const auto& rangeFrom = read.Cast().Range().From();
const auto& rangeTo = read.Cast().Range().To();

if (!rangeFrom.Maybe<TKqlKeyInc>() || !rangeTo.Maybe<TKqlKeyInc>()) {
return node;
}

if (rangeFrom.Raw() != rangeTo.Raw()) {
return node;
}

if (rangeFrom.ArgCount() != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

TVector<TExprBase> structMembers;
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
TCoAtom columnNameAtom(ctx.NewAtom(node.Pos(), tableDesc.Metadata->KeyColumnNames[i]));

auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
.Name(columnNameAtom)
.Value(rangeFrom.Arg(i))
.Done();

structMembers.push_back(member);
}

deleteInput = Build<TCoAsList>(ctx, node.Pos())
.Add<TCoAsStruct>()
.Add(structMembers)
.Build()
.Done();
}
}

YQL_ENSURE(deleteInput);

return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
.Table(deleteRows.Table())
.Input(deleteInput)
.Input(deleteInput.Cast())
.Done();
}

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
.Done();
}

if (auto readRange = node.Maybe<TKqlReadTableRanges>()) {
return Build<TKqlReadTableRanges>(ctx, read.Pos())
.Table(read.Table())
.Ranges(read.Ranges())
.Columns(usedColumns.Cast())
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.PrefixPointsExpr(readRange.PrefixPointsExpr())
.Done();
}

return Build<TKqlReadTableRangesBase>(ctx, read.Pos())
.CallableName(read.CallableName())
.Table(read.Table())
Expand Down
56 changes: 53 additions & 3 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,59 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
lookupKeys = skipNullMembers.Input();
}

TKqpReadTableSettings settings;
if (skipNullMembers) {
auto skipNullColumns = skipNullMembers.Cast().Members();

if (skipNullColumns) {
for (const auto &column : skipNullColumns.Cast()) {
settings.AddSkipNullKey(TString(column.Value()));
}
}
}

const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
if (auto lookupKeysFlatMap = lookupKeys.Maybe<TCoFlatMapBase>()) {
auto flatMapRangeInput = lookupKeysFlatMap.Cast().Input().Maybe<TCoRangeFinalize>();

// This rule should depend on feature flag for safety
if (!flatMapRangeInput || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
return {};
}

auto lookupKeysType = lookupKeys.Ref().GetTypeAnn();
YQL_ENSURE(lookupKeysType);
YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List);
auto itemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
YQL_ENSURE(itemType->GetKind() == ETypeAnnotationKind::Struct);
auto structType = itemType->Cast<TStructExprType>();

TVector<TString> usedColumns;
usedColumns.reserve(structType->GetSize());
for (const auto& keyColumnName : table.Metadata->KeyColumnNames) {
if (!structType->FindItem(keyColumnName)) {
break;
}

usedColumns.emplace_back(keyColumnName);
}

YQL_ENSURE(usedColumns.size() == structType->GetSize());

TKqpReadTableExplainPrompt prompt;
prompt.SetUsedKeyColumns(std::move(usedColumns));
prompt.SetPointPrefixLen(structType->GetSize());


return Build<TKqlReadTableRanges>(ctx, lookup.Pos())
.Table(lookup.Table())
.Ranges(flatMapRangeInput.Cast())
.Columns(lookup.Columns())
.Settings(settings.BuildNode(ctx, lookup.Pos()))
.ExplainPrompt(prompt.BuildNode(ctx, lookup.Pos()))
.Done();
}

auto maybeAsList = lookupKeys.Maybe<TCoAsList>();
if (!maybeAsList) {
return {};
Expand All @@ -369,7 +422,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
}

// full pk expected
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
if (table.Metadata->KeyColumnNames.size() != maybeStruct.Cast().ArgCount()) {
return {};
}
Expand All @@ -380,7 +432,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
keyColumnsStruct.insert({TString(tuple.Name().Value()), tuple.Value().Cast()});
}

TKqpReadTableSettings settings;
TVector<TExprBase> keyValues;
keyValues.reserve(maybeStruct.Cast().ArgCount());
for (const auto& name : table.Metadata->KeyColumnNames) {
Expand All @@ -396,7 +447,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
for (const auto &column : skipNullColumns.Cast()) {
settings.AddSkipNullKey(TString(column.Value()));
}

}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
if (intersection == 0) {
newShard->AddPoint(std::move(points[pointIndex]));
CA_LOG_D("Add point to new shardId: " << partition.ShardId);
}
if (intersection < 0) {
} else {
YQL_ENSURE(intersection > 0, "Missed intersection of point and partition ranges.");
break;
}
pointIndex += 1;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3520,14 +3520,14 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {

NJson::TJsonValue plan;
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableRangeScan");
UNIT_ASSERT(streamLookup.IsDefined());

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 2);
}
}

Expand Down
31 changes: 14 additions & 17 deletions ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,26 +293,23 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {

std::string canon;
if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor");
UNIT_ASSERT(lookupActorSpan);
auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
UNIT_ASSERT(readActorSpan);

auto dsReads = lookupActorSpan->get().FindAll("Datashard.Read"); // Lookup actor sends EvRead to each shard.
auto dsReads = readActorSpan->get().FindAll("Datashard.Read"); // Read actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) "
", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
"])])]) , (ComputeActor) , (RunTasks)])])";
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
} else {
auto deSpan = trace.Root.BFSFindOne("DataExecuter");
UNIT_ASSERT(deSpan);
Expand Down