Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ namespace NKikimr::NKqp {
using namespace NYql::NDq;
using namespace NYql::NDqProto;

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
Expand All @@ -141,7 +141,7 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lo

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);

NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
TIntrusivePtr<TKqpCounters> counters,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct IKqpNodeComputeActorFactory {
struct TCreateArgs {
const NActors::TActorId& ExecuterId;
const ui64 TxId;
const ui64 LockTxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
NYql::NDqProto::TDqTask* Task;
TIntrusivePtr<NRm::TTxState> TxInfo;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);

} // anonymous namespace

TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeA

std::set<NActors::TActorId> Fetchers;
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
const ui64 LockTxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;

struct TLockHash {
Expand Down Expand Up @@ -67,7 +67,7 @@ class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeA
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
}

TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;


TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot,
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const ui64 lockTxId, const ui32 lockNodeId,
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
: Meta(meta)
Expand Down Expand Up @@ -417,7 +417,9 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
ev->Record.SetStatsMode(RuntimeSettings.StatsMode);
ev->Record.SetScanId(scanId);
ev->Record.SetTxId(std::get<ui64>(TxId));
ev->Record.SetLockTxId(LockTxId);
if (LockTxId) {
ev->Record.SetLockTxId(*LockTxId);
}
ev->Record.SetLockNodeId(LockNodeId);
ev->Record.SetTablePath(ScanDataMeta.TablePath);
ev->Record.SetSchemaVersion(ScanDataMeta.TableId.SchemaVersion);
Expand Down Expand Up @@ -466,7 +468,8 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData

state->LastKey = std::move(msg.LastKey);
const ui64 rowsCount = msg.GetRowsCount();
AFL_ENSURE(LockTxId == 0 || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action","got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
const NMiniKQL::TScanDataMetaFull ScanDataMeta;
const NYql::NDq::TComputeRuntimeSettings RuntimeSettings;
const NYql::NDq::TTxId TxId;
const ui64 LockTxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_SCAN_FETCH_ACTOR;
}

TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings,
std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const ui64 lockTxId, const ui32 lockNodeId,
std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2473,7 +2473,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
.TxId = TxId,
.LockTxId = lockTxId.GetOrElse(0),
.LockTxId = lockTxId,
.LockNodeId = SelfId().NodeId(),
.Executer = SelfId(),
.Snapshot = GetSnapshot(),
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
auto result = std::make_unique<TEvKqpNode::TEvStartKqpTasksRequest>(TasksGraph.GetMeta().GetArenaIntrusivePtr());
auto& request = result->Record;
request.SetTxId(TxId);
request.SetLockTxId(LockTxId);
if (LockTxId) {
request.SetLockTxId(*LockTxId);
}
request.SetLockNodeId(LockNodeId);
ActorIdToProto(ExecuterId, request.MutableExecuterActorId());

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TKqpPlanner {
struct TArgs {
TKqpTasksGraph& TasksGraph;
const ui64 TxId;
const ui64 LockTxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
const TActorId& Executer;
const IKqpGateway::TKqpSnapshot& Snapshot;
Expand Down Expand Up @@ -103,7 +103,7 @@ class TKqpPlanner {

private:
const ui64 TxId;
const ui64 LockTxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
const TActorId ExecuterId;
TVector<ui64> ComputeTasks;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
auto requester = ev->Sender;

ui64 txId = msg.GetTxId();
ui64 lockTxId = msg.GetLockTxId();
TMaybe<ui64> lockTxId = msg.HasLockTxId()
? TMaybe<ui64>(msg.GetLockTxId())
: Nothing();
ui32 lockNodeId = msg.GetLockNodeId();

YQL_ENSURE(msg.GetStartAllOrFail()); // todo: support partial start
Expand Down