Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

TasksGraph.GetMeta().SetLockTxId(lockTxId);
TasksGraph.GetMeta().SetLockNodeId(SelfId().NodeId());

LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
if (IsDebugLogEnabled()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&

if (lockTxId) {
input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId);
input.Meta.StreamLookupSettings->SetLockNodeId(tasksGraph.GetMeta().LockNodeId);
}
transformProto->MutableSettings()->PackFrom(*input.Meta.StreamLookupSettings);
} else if (input.Meta.SequencerSettings) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct TStageInfoMeta {
struct TGraphMeta {
IKqpGateway::TKqpSnapshot Snapshot;
TMaybe<ui64> LockTxId;
ui32 LockNodeId;
std::unordered_map<ui64, TActorId> ResultChannelProxies;
TActorId ExecuterId;
bool UseFollowers = false;
Expand All @@ -117,6 +118,10 @@ struct TGraphMeta {
void SetLockTxId(TMaybe<ui64> lockTxId) {
LockTxId = lockTxId;
}

void SetLockNodeId(ui32 lockNodeId) {
LockNodeId = lockNodeId;
}
};

struct TTaskInputMeta {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, AllowInconsistentReads(settings.GetAllowInconsistentReads())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
Expand Down Expand Up @@ -456,6 +457,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
record.SetLockTxId(*LockTxId);
}

if (NodeLockId) {
record.SetLockNodeId(*NodeLockId);
}

auto defaultSettings = GetDefaultReadSettings()->Record;
record.SetMaxRows(defaultSettings.GetMaxRows());
record.SetMaxBytes(defaultSettings.GetMaxBytes());
Expand Down Expand Up @@ -586,6 +591,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
IKqpGateway::TKqpSnapshot Snapshot;
const bool AllowInconsistentReads;
const TMaybe<ui64> LockTxId;
const TMaybe<ui32> NodeLockId;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, TShardState> ReadsPerShard;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
Expand Down
19 changes: 10 additions & 9 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -755,15 +755,16 @@ message TKqpTableSinkSettings {
}

message TKqpStreamLookupSettings {
optional NKqpProto.TKqpPhyTableId Table = 1;
repeated TKqpColumnMetadataProto KeyColumns = 2;
repeated TKqpColumnMetadataProto Columns = 3;
optional TKqpSnapshot Snapshot = 4;
optional uint64 LockTxId = 5;
optional bool ImmediateTx = 6;
repeated string LookupKeyColumns = 7;
optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
optional bool AllowInconsistentReads = 9 [default = false];
optional NKqpProto.TKqpPhyTableId Table = 1;
repeated TKqpColumnMetadataProto KeyColumns = 2;
repeated TKqpColumnMetadataProto Columns = 3;
optional TKqpSnapshot Snapshot = 4;
optional uint64 LockTxId = 5;
optional bool ImmediateTx = 6;
repeated string LookupKeyColumns = 7;
optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
optional bool AllowInconsistentReads = 9 [default = false];
optional uint32 LockNodeId = 10;
}

message TKqpSequencerSettings {
Expand Down