From 9669a1efac406c136b3056798c635b6d0bff9839 Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Thu, 12 Sep 2024 13:04:06 +0000 Subject: [PATCH] fix(kqp): pass lockNodeId to stream lookup actor --- .../kqp/executer_actor/kqp_executer_impl.h | 1 + .../kqp/executer_actor/kqp_tasks_graph.cpp | 1 + ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 5 +++++ .../kqp/runtime/kqp_stream_lookup_actor.cpp | 6 ++++++ ydb/core/protos/kqp.proto | 19 ++++++++++--------- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 83dd14ae8eca..8de618408606 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -512,6 +512,7 @@ class TKqpExecuterBase : public TActorBootstrapped { } TasksGraph.GetMeta().SetLockTxId(lockTxId); + TasksGraph.GetMeta().SetLockNodeId(SelfId().NodeId()); LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId); if (IsDebugLogEnabled()) { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 6ebb045bd53a..fb53a62369bb 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -1128,6 +1128,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput& if (lockTxId) { input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId); + input.Meta.StreamLookupSettings->SetLockNodeId(tasksGraph.GetMeta().LockNodeId); } transformProto->MutableSettings()->PackFrom(*input.Meta.StreamLookupSettings); } else if (input.Meta.SequencerSettings) { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index e9141c7d5f84..fe0d1a42fbb3 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -91,6 +91,7 @@ struct TStageInfoMeta { struct TGraphMeta { IKqpGateway::TKqpSnapshot Snapshot; TMaybe LockTxId; + ui32 LockNodeId; std::unordered_map ResultChannelProxies; TActorId ExecuterId; bool UseFollowers = false; @@ -117,6 +118,10 @@ struct TGraphMeta { void SetLockTxId(TMaybe lockTxId) { LockTxId = lockTxId; } + + void SetLockNodeId(ui32 lockNodeId) { + LockNodeId = lockNodeId; + } }; struct TTaskInputMeta { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index e986a9cd3e96..17ef66580bbf 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -38,6 +38,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped()) + , NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) , Counters(counters) @@ -456,6 +457,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedRecord; record.SetMaxRows(defaultSettings.GetMaxRows()); record.SetMaxBytes(defaultSettings.GetMaxBytes()); @@ -586,6 +591,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped LockTxId; + const TMaybe NodeLockId; std::unordered_map Reads; std::unordered_map ReadsPerShard; std::shared_ptr> Partitioning; diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c9306c8bdad8..ec1a8cf8f58c 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -755,15 +755,16 @@ message TKqpTableSinkSettings { } message TKqpStreamLookupSettings { - optional NKqpProto.TKqpPhyTableId Table = 1; - repeated TKqpColumnMetadataProto KeyColumns = 2; - repeated TKqpColumnMetadataProto Columns = 3; - optional TKqpSnapshot Snapshot = 4; - optional uint64 LockTxId = 5; - optional bool ImmediateTx = 6; - repeated string LookupKeyColumns = 7; - optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8; - optional bool AllowInconsistentReads = 9 [default = false]; + optional NKqpProto.TKqpPhyTableId Table = 1; + repeated TKqpColumnMetadataProto KeyColumns = 2; + repeated TKqpColumnMetadataProto Columns = 3; + optional TKqpSnapshot Snapshot = 4; + optional uint64 LockTxId = 5; + optional bool ImmediateTx = 6; + repeated string LookupKeyColumns = 7; + optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8; + optional bool AllowInconsistentReads = 9 [default = false]; + optional uint32 LockNodeId = 10; } message TKqpSequencerSettings {