diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp index 7dcf855ce037..aa4dfb187fde 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp @@ -16,6 +16,7 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK auto deleteRows = node.Cast(); TMaybeNode lookup; + TMaybeNode read; TMaybeNode skipNulMembers; if (deleteRows.Input().Maybe()) { @@ -23,36 +24,79 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK } else if (deleteRows.Input().Maybe().Input().Maybe()) { skipNulMembers = deleteRows.Input().Cast(); lookup = skipNulMembers.Input().Cast(); + } else if (deleteRows.Input().Maybe()) { + read = deleteRows.Input().Cast(); } else { return node; } - YQL_ENSURE(lookup); - if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) { - return node; - } - - auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn(); - auto lookupKeyType = lookupKeysType->Cast()->GetItemType()->Cast(); - YQL_ENSURE(lookupKeyType); - - // Only consider complete PK lookups + TMaybeNode deleteInput; const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, deleteRows.Table().Path()); - if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) { - return node; - } - - TExprBase deleteInput = lookup.Cast().LookupKeys(); - if (skipNulMembers) { - deleteInput = Build(ctx, skipNulMembers.Cast().Pos()) - .Input(deleteInput) - .Members(skipNulMembers.Cast().Members()) + if (lookup) { + if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) { + return node; + } + + auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn(); + auto lookupKeyType = lookupKeysType->Cast()->GetItemType()->Cast(); + YQL_ENSURE(lookupKeyType); + + // Only consider complete PK lookups + if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) { + return node; + } + + deleteInput = lookup.Cast().LookupKeys(); + if (skipNulMembers) { + deleteInput = Build(ctx, skipNulMembers.Cast().Pos()) + .Input(deleteInput.Cast()) + .Members(skipNulMembers.Cast().Members()) + .Done(); + } + } else if (read) { + if (deleteRows.Table().Raw() != read.Cast().Table().Raw()) { + return node; + } + + const auto& rangeFrom = read.Cast().Range().From(); + const auto& rangeTo = read.Cast().Range().To(); + + if (!rangeFrom.Maybe() || !rangeTo.Maybe()) { + return node; + } + + if (rangeFrom.Raw() != rangeTo.Raw()) { + return node; + } + + if (rangeFrom.ArgCount() != tableDesc.Metadata->KeyColumnNames.size()) { + return node; + } + + TVector structMembers; + for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) { + TCoAtom columnNameAtom(ctx.NewAtom(node.Pos(), tableDesc.Metadata->KeyColumnNames[i])); + + auto member = Build(ctx, node.Pos()) + .Name(columnNameAtom) + .Value(rangeFrom.Arg(i)) + .Done(); + + structMembers.push_back(member); + } + + deleteInput = Build(ctx, node.Pos()) + .Add() + .Add(structMembers) + .Build() .Done(); - } + } + + YQL_ENSURE(deleteInput); return Build(ctx, deleteRows.Pos()) .Table(deleteRows.Table()) - .Input(deleteInput) + .Input(deleteInput.Cast()) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index 6e1d01c5568f..72fe19fd8a11 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -124,6 +124,17 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext& .Done(); } + if (auto readRange = node.Maybe()) { + return Build(ctx, read.Pos()) + .Table(read.Table()) + .Ranges(read.Ranges()) + .Columns(usedColumns.Cast()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .PrefixPointsExpr(readRange.PrefixPointsExpr()) + .Done(); + } + return Build(ctx, read.Pos()) .CallableName(read.CallableName()) .Table(read.Table()) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index 7d28a665766c..a5eba5e404a5 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -353,6 +353,59 @@ TMaybeNode KqpRewriteLiteralLookup(const TExprBase& node, TExprContex lookupKeys = skipNullMembers.Input(); } + TKqpReadTableSettings settings; + if (skipNullMembers) { + auto skipNullColumns = skipNullMembers.Cast().Members(); + + if (skipNullColumns) { + for (const auto &column : skipNullColumns.Cast()) { + settings.AddSkipNullKey(TString(column.Value())); + } + } + } + + const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value()); + if (auto lookupKeysFlatMap = lookupKeys.Maybe()) { + auto flatMapRangeInput = lookupKeysFlatMap.Cast().Input().Maybe(); + + // This rule should depend on feature flag for safety + if (!flatMapRangeInput || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) { + return {}; + } + + auto lookupKeysType = lookupKeys.Ref().GetTypeAnn(); + YQL_ENSURE(lookupKeysType); + YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List); + auto itemType = lookupKeysType->Cast()->GetItemType(); + YQL_ENSURE(itemType->GetKind() == ETypeAnnotationKind::Struct); + auto structType = itemType->Cast(); + + TVector usedColumns; + usedColumns.reserve(structType->GetSize()); + for (const auto& keyColumnName : table.Metadata->KeyColumnNames) { + if (!structType->FindItem(keyColumnName)) { + break; + } + + usedColumns.emplace_back(keyColumnName); + } + + YQL_ENSURE(usedColumns.size() == structType->GetSize()); + + TKqpReadTableExplainPrompt prompt; + prompt.SetUsedKeyColumns(std::move(usedColumns)); + prompt.SetPointPrefixLen(structType->GetSize()); + + + return Build(ctx, lookup.Pos()) + .Table(lookup.Table()) + .Ranges(flatMapRangeInput.Cast()) + .Columns(lookup.Columns()) + .Settings(settings.BuildNode(ctx, lookup.Pos())) + .ExplainPrompt(prompt.BuildNode(ctx, lookup.Pos())) + .Done(); + } + auto maybeAsList = lookupKeys.Maybe(); if (!maybeAsList) { return {}; @@ -369,7 +422,6 @@ TMaybeNode KqpRewriteLiteralLookup(const TExprBase& node, TExprContex } // full pk expected - const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value()); if (table.Metadata->KeyColumnNames.size() != maybeStruct.Cast().ArgCount()) { return {}; } @@ -380,7 +432,6 @@ TMaybeNode KqpRewriteLiteralLookup(const TExprBase& node, TExprContex keyColumnsStruct.insert({TString(tuple.Name().Value()), tuple.Value().Cast()}); } - TKqpReadTableSettings settings; TVector keyValues; keyValues.reserve(maybeStruct.Cast().ArgCount()); for (const auto& name : table.Metadata->KeyColumnNames) { @@ -396,7 +447,6 @@ TMaybeNode KqpRewriteLiteralLookup(const TExprBase& node, TExprContex for (const auto &column : skipNullColumns.Cast()) { settings.AddSkipNullKey(TString(column.Value())); } - } } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 9d1815092fab..22f1bf9bcfc9 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -728,8 +728,8 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq if (intersection == 0) { newShard->AddPoint(std::move(points[pointIndex])); CA_LOG_D("Add point to new shardId: " << partition.ShardId); - } - if (intersection < 0) { + } else { + YQL_ENSURE(intersection > 0, "Missed intersection of point and partition ranges."); break; } pointIndex += 1; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index d79e0f3440a1..2c0067a1a755 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3520,14 +3520,14 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { NJson::TJsonValue plan; NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); - auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableRangeScan"); UNIT_ASSERT(streamLookup.IsDefined()); auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/KeyValue"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 2); } } diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 16719754f7a4..b1a2780486a5 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -293,26 +293,23 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { std::string canon; if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor"); - UNIT_ASSERT(lookupActorSpan); + auto readActorSpan = trace.Root.BFSFindOne("ReadActor"); + UNIT_ASSERT(readActorSpan); - auto dsReads = lookupActorSpan->get().FindAll("Datashard.Read"); // Lookup actor sends EvRead to each shard. + auto dsReads = readActorSpan->get().FindAll("Datashard.Read"); // Read actor sends EvRead to each shard. UNIT_ASSERT_EQUAL(dsReads.size(), 2); - canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " - ", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) " - ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (Datashard.Read " - "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) " - ", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " - ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " - ", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read " - "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) " - ", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " - ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " - ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " - ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])" - "])])]) , (ComputeActor) , (RunTasks)])])"; + canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " + ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) " + ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " + "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " + ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) " + ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " + ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction " + "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " + ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " + ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " + "-> [(Tablet.WriteLog.LogEntry)])])])])])])])"; } else { auto deSpan = trace.Root.BFSFindOne("DataExecuter"); UNIT_ASSERT(deSpan);