diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index 3d528f481fca..10050fa9c4d5 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -667,6 +667,7 @@ ui32 THead::GetCount() const //how much offsets before last batch and how much offsets in last batch Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset); + return Batches.back().GetOffset() - Offset + Batches.back().GetCount(); } @@ -940,16 +941,23 @@ auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional { + if (HeadSize + BlobsSize == 0) { //if nothing to compact at all + NeedCompactHead = false; + } + std::optional res; if (NeedCompactHead) { NeedCompactHead = false; - GlueNewHead = false; res = CreateFormedBlob(0, false); + + StartOffset = NewHead.Offset + NewHead.GetCount(); + NewHead.Clear(); + NewHead.Offset = StartOffset; } TKey newKey(TKeyPrefix::TypeData, Partition, - NewHead.Offset + oldKey.GetOffset(), + StartOffset, oldKey.GetPartNo(), oldKey.GetCount(), oldKey.GetInternalPartsCount(), diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index fd87b41bbce7..7eb4af4d4cc6 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -816,6 +816,7 @@ struct TEvPQ { ui64 TxId; TVector Operations; TActorId SupportivePartitionActor; + bool ForceFalse = false; }; struct TEvTxCalcPredicateResult : public TEventLocal { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 599185157922..3f6a3025568f 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -251,7 +251,7 @@ ui64 TPartition::UserDataSize() const { return 0; } - // We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is + // We assume that DataKeysBody contains an up-to-date set of blobs, their relevance is // maintained by the background process. However, the last block may contain several irrelevant // messages. Because of them, we throw out the size of the entire blob. auto size = Size(); @@ -964,15 +964,13 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&) { + PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate"); + PendingEvents.emplace_back(ev->ReleaseBase().Release()); } void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&) { - PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); } @@ -983,9 +981,32 @@ void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContex void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&) { - PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + +void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&) +{ + PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoRequest"); + + Y_ABORT_UNLESS(IsSupportive()); + + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + +void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&) +{ + PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoResponse"); + + Y_ABORT_UNLESS(!IsSupportive()); + + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + +void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&) +{ + PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoError"); + + Y_ABORT_UNLESS(!IsSupportive()); PendingEvents.emplace_back(ev->ReleaseBase().Release()); } @@ -1084,7 +1105,9 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx } void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) { + PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError"); auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId, "Write info requested while writes are not complete"); ctx.Send(ev->Sender, response); @@ -1110,6 +1133,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon response->MessagesSizes = std::move(MessageSize.GetValues()); response->InputLags = std::move(SupportivePartitionTimeLag); + PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse"); ctx.Send(ev->Sender, response); } @@ -1204,7 +1228,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError"); + PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " << + "Cookie " << ev->Get()->Cookie << + ", Message " << ev->Get()->Message); WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx); } @@ -1890,7 +1916,6 @@ void TPartition::ProcessCommitQueue() { return this->ExecUserActionOrTransaction(event, request); }; while (!UserActionAndTxPendingCommit.empty()) { - // UserActionAndTxPendingCommit.pop_front(); auto& front = UserActionAndTxPendingCommit.front(); auto state = ECommitState::Committed; if (auto* tx = get_if>(&front.Event)) { @@ -2059,8 +2084,9 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple Y_ABORT_UNLESS(t->ChangeConfig); Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig); - if (!FirstEvent) + if (!FirstEvent) { return EProcessResult::Blocked; + } ChangingConfig = true; // Should remove this and add some id to TEvChangeConfig if we want to batch change of configs t->State = ECommitState::Committed; @@ -2216,6 +2242,9 @@ void TPartition::CommitWriteOperations(TTransaction& t) PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead); if (!t.WriteInfo->BodyKeys.empty()) { + bool needCompactHead = + (Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0; + PartitionedBlob = TPartitionedBlob(Partition, NewHead.Offset, "", // SourceId @@ -2225,7 +2254,7 @@ void TPartition::CommitWriteOperations(TTransaction& t) Head, NewHead, Parameters->HeadCleared, // headCleared - Head.PackedSize != 0, // needCompactHead + needCompactHead, // needCompactHead MaxBlobSize); for (auto& k : t.WriteInfo->BodyKeys) { @@ -2236,9 +2265,9 @@ void TPartition::CommitWriteOperations(TTransaction& t) CompactedKeys.emplace_back(write->Key, write->Value.size()); ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get()); } + Parameters->CurOffset += k.Key.GetCount(); } - PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size()); if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get()); @@ -2249,17 +2278,15 @@ void TPartition::CommitWriteOperations(TTransaction& t) ctx); } - const auto& last = t.WriteInfo->BodyKeys.back(); - - NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount()); + NewHead.Clear(); + NewHead.Offset = Parameters->CurOffset; } if (!t.WriteInfo->BlobsFromHead.empty()) { auto& first = t.WriteInfo->BlobsFromHead.front(); NewHead.PartNo = first.GetPartNo(); - Parameters->CurOffset = NewHead.Offset; - Parameters->HeadCleared = !t.WriteInfo->BodyKeys.empty(); + Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty(); PartitionedBlob = TPartitionedBlob(Partition, NewHead.Offset, @@ -2304,6 +2331,8 @@ void TPartition::CommitWriteOperations(TTransaction& t) } } + Parameters->FirstCommitWriteOperations = false; + WriteInfosApplied.emplace_back(std::move(t.WriteInfo)); } @@ -2580,6 +2609,8 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) void TPartition::ResendPendingEvents(const TActorContext& ctx) { + PQ_LOG_D("Resend pending events. Count " << PendingEvents.size()); + while (!PendingEvents.empty()) { ctx.Schedule(TDuration::Zero(), PendingEvents.front().release()); PendingEvents.pop_front(); @@ -3452,8 +3483,6 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&) { - PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition"); - Y_ABORT_UNLESS(IsSupportive()); PendingEvents.emplace_back(ev->ReleaseBase().Release()); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index d892ff1b4125..ec25e25707ef 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -417,6 +417,9 @@ class TPartition : public TActorBootstrapped { void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx); void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx); void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx); void ChangePlanStepAndTxId(ui64 step, ui64 txId); @@ -527,10 +530,10 @@ class TPartition : public TActorBootstrapped { HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); - HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle); + HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, HandleOnInit); - HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle); - HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle); + HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, HandleOnInit); + HFuncTraced(TEvPQ::TEvGetWriteInfoError, HandleOnInit); HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit); IgnoreFunc(TEvPQ::TEvTxBatchComplete); default: @@ -622,6 +625,7 @@ class TPartition : public TActorBootstrapped { ui64 CurOffset; bool OldPartsCleared; bool HeadCleared; + bool FirstCommitWriteOperations = true; }; static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 1823b02969b7..a1ad01d61363 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -469,7 +469,7 @@ void TPartition::OnHandleWriteResponse(const TActorContext& ctx) void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) { - PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse."); + PQ_LOG_T("TPartition::Handle TEvHandleWriteResponse."); OnHandleWriteResponse(ctx); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 496461f24254..cfc914324e40 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -821,9 +822,7 @@ void TPersQueue::MoveTopTxToCalculating(TDistributedTransaction& tx, Y_ABORT_UNLESS(false); } - tx.State = NKikimrPQ::TTransaction::CALCULATING; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + TryChangeTxState(tx, NKikimrPQ::TTransaction::CALCULATING); } void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId) @@ -866,8 +865,6 @@ void TPersQueue::CreateSupportivePartitionActor(const TPartitionId& partitionId, void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info, const TActorContext& ctx) { - TxWrites.clear(); - if (info.HasNextSupportivePartitionId()) { NextSupportivePartitionId = info.GetNextSupportivePartitionId(); } else { @@ -948,6 +945,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& return; } + TxWrites.clear(); + for (const auto& readRange : readRanges) { Y_ABORT_UNLESS(readRange.HasStatus()); if (readRange.GetStatus() != NKikimrProto::OK && @@ -967,10 +966,14 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& NKikimrPQ::TTransaction tx; Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue())); - PQ_LOG_D("Load tx " << tx.ShortDebugString()); + PQ_LOG_D("Restore Tx. " << + "TxId: " << tx.GetTxId() << + ", Step: " << tx.GetStep() << + ", State: " << NKikimrPQ::TTransaction_EState_Name(tx.GetState()) << + ", WriteId: " << tx.GetWriteId().ShortDebugString()); if (tx.GetState() == NKikimrPQ::TTransaction::CALCULATED) { - PQ_LOG_D("fix tx state"); + PQ_LOG_D("Fix tx state"); tx.SetState(NKikimrPQ::TTransaction::PLANNED); } @@ -981,6 +984,11 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); } } + + if (tx.HasWriteId()) { + PQ_LOG_D("Link TxId " << tx.GetTxId() << " with WriteId " << GetWriteId(tx)); + TxWrites[GetWriteId(tx)].TxId = tx.GetTxId(); + } } } @@ -1145,6 +1153,7 @@ void TPersQueue::BeginWriteTabletState(const TActorContext& ctx, NKikimrPQ::ETab kvCmd->SetKey(KeyState()); kvCmd->SetValue(strState); kvCmd->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + kvCmd->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); ctx.Send(ctx.SelfID, kvRequest.Release()); } @@ -1219,7 +1228,7 @@ TPartitionInfo& TPersQueue::GetPartitionInfo(const TPartitionId& partitionId) void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvPartitionCounters" << + PQ_LOG_T("Handle TEvPQ::TEvPartitionCounters" << " PartitionId " << ev->Get()->Partition); const auto& partitionId = ev->Get()->Partition; @@ -1647,6 +1656,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, write->SetKey(KeyConfig()); write->SetValue(str); write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); TSourceIdWriter sourceIdWriter(ESourceIdFormat::Proto); for (const auto& mg : bootstrapCfg.GetExplicitMessageGroups()) { @@ -1763,7 +1773,7 @@ void TPersQueue::ProcessStatusRequests(const TActorContext &ctx) { void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPersQueue::TEvStatus"); + PQ_LOG_T("Handle TEvPersQueue::TEvStatus"); ReadBalancerActorId = ev->Sender; @@ -2721,6 +2731,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& } else { ans = CreateResponseProxy(ev->Sender, ctx.SelfID, TopicName, p, m, s, c, ResourceMetrics, ctx); } + ResponseProxy[responseCookie] = ans; Counters->Simple()[COUNTER_PQ_TABLET_INFLIGHT].Set(ResponseProxy.size()); @@ -2831,7 +2842,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& void TPersQueue::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvTabletPipe::TEvServerConnected"); + PQ_LOG_T("Handle TEvTabletPipe::TEvServerConnected"); auto it = PipesInfo.insert({ev->Get()->ClientId, {}}).first; it->second.ServerActors++; @@ -2844,7 +2855,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActo void TPersQueue::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvTabletPipe::TEvServerDisconnected"); + PQ_LOG_T("Handle TEvTabletPipe::TEvServerDisconnected"); //inform partition if needed; auto it = PipesInfo.find(ev->Get()->ClientId); @@ -3307,10 +3318,18 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte } if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) { + if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) { + if (ack) { + PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer()); + ctx.Send(ev->Sender, ack.release()); + return; + } + } + tx->OnReadSet(event, ev->Sender, std::move(ack)); if (tx->State == NKikimrPQ::TTransaction::WAIT_RS) { - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); TryWriteTxs(ctx); } @@ -3339,7 +3358,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorCo tx->UnbindMsgsFromPipe(event.GetTabletConsumer()); if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) { - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); TryWriteTxs(ctx); } @@ -3364,7 +3383,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC tx->OnTxCalcPredicateResult(event); - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); TryWriteTxs(ctx); } @@ -3387,7 +3406,7 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const tx->OnProposePartitionConfigResult(event); - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); TryWriteTxs(ctx); } @@ -3410,7 +3429,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& c tx->OnTxCommitDone(event); - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); TryWriteTxs(ctx); } @@ -3461,7 +3480,7 @@ void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId, void TPersQueue::CreateSupportivePartitionActors(const TActorContext& ctx) { - for (auto& partitionId : PendingSupportivePartitions) { + for (const auto& partitionId : PendingSupportivePartitions) { CreateSupportivePartitionActor(partitionId, ctx); } @@ -3561,6 +3580,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) case NKikimrPQ::TTransaction::UNKNOWN: tx.OnProposeTransaction(event, GetAllowedStep(), TabletID()); + PQ_LOG_D("Propose TxId " << tx.TxId << ", WriteId " << tx.WriteId); if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) { UpdateReadRuleGenerations(tx.TabletConfig); @@ -3572,10 +3592,11 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) "PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}", TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId); TTxWriteInfo& writeInfo = TxWrites.at(writeId); + PQ_LOG_D("Link TxId " << tx.TxId << " with WriteId " << writeId); writeInfo.TxId = tx.TxId; } - CheckTxState(ctx, tx); + TryExecuteTxs(ctx, tx); break; case NKikimrPQ::TTransaction::PREPARING: case NKikimrPQ::TTransaction::PREPARED: @@ -3632,9 +3653,9 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) Y_ABORT_UNLESS(TxQueue.empty() || (TxQueue.back() < std::make_pair(step, txId))); tx.OnPlanStep(step); - CheckTxState(ctx, tx); + TryExecuteTxs(ctx, tx); - TxQueue.emplace(step, txId); + TxQueue.emplace_back(step, txId); } else { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << @@ -3679,7 +3700,7 @@ void TPersQueue::ProcessWriteTxs(const TActorContext& ctx, tx->AddCmdWrite(request, state); - ChangedTxs.insert(txId); + ChangedTxs.emplace(tx->Step, txId); } WriteTxs.clear(); @@ -3698,7 +3719,7 @@ void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx, auto tx = GetTransaction(ctx, txId); if (tx) { - ChangedTxs.insert(txId); + ChangedTxs.emplace(tx->Step, txId); } } @@ -3746,6 +3767,7 @@ void TPersQueue::AddCmdWriteTabletTxInfo(NKikimrClient::TKeyValueRequest& reques auto command = request.AddCmdWrite(); command->SetKey(KeyTxInfo()); command->SetValue(value); + command->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); } void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info) @@ -3884,10 +3906,22 @@ TMaybe TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, TDistributedTransaction& tx) { + auto OriginalPartitionExists = [this](ui32 partitionId) { + return Partitions.contains(TPartitionId(partitionId)); + }; + + // if the predicate is violated, the transaction will end with the ABORTED code + bool forceFalse = false; THashMap> events; for (auto& operation : tx.Operations) { ui32 originalPartitionId = operation.GetPartitionId(); + + if (!OriginalPartitionExists(originalPartitionId)) { + forceFalse = true; + continue; + } + auto& event = events[originalPartitionId]; if (!event) { event = std::make_unique(tx.Step, tx.TxId); @@ -3902,29 +3936,43 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, if (tx.WriteId.Defined()) { const TWriteId& writeId = *tx.WriteId; - Y_ABORT_UNLESS(TxWrites.contains(writeId), - "PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}", - TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId); - const TTxWriteInfo& writeInfo = TxWrites.at(writeId); + if (TxWrites.contains(writeId)) { + const TTxWriteInfo& writeInfo = TxWrites.at(writeId); + + for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) { + if (!OriginalPartitionExists(originalPartitionId)) { + PQ_LOG_W("Unknown partition " << originalPartitionId << " for TxId " << tx.TxId); + forceFalse = true; + continue; + } - for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) { - Y_ABORT_UNLESS(Partitions.contains(partitionId)); - const TPartitionInfo& partition = Partitions.at(partitionId); + auto& event = events[originalPartitionId]; + if (!event) { + event = std::make_unique(tx.Step, tx.TxId); + } - auto& event = events[originalPartitionId]; - if (!event) { - event = std::make_unique(tx.Step, tx.TxId); - } + if (!Partitions.contains(partitionId)) { + PQ_LOG_W("Unknown partition " << partitionId << " for TxId " << tx.TxId); + forceFalse = true; + continue; + } + + const TPartitionInfo& partition = Partitions.at(partitionId); - event->SupportivePartitionActor = partition.Actor; + event->SupportivePartitionActor = partition.Actor; + } + } else { + PQ_LOG_W("Unknown WriteId " << writeId << " for TxId " << tx.TxId); + forceFalse = true; } } for (auto& [originalPartitionId, event] : events) { TPartitionId partitionId(originalPartitionId); - Y_ABORT_UNLESS(Partitions.contains(partitionId)); const TPartitionInfo& partition = Partitions.at(partitionId); + event->ForceFalse = forceFalse; + ctx.Send(partition.Actor, event.release()); } @@ -3942,7 +3990,7 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx, auto p = Partitions.find(TPartitionId(partitionId)); Y_ABORT_UNLESS(p != Partitions.end(), - "Tablet %" PRIu64 ", Partition %" PRIu32 ", TxId %" PRIu64, + "PQ %" PRIu64 ", Partition %" PRIu32 ", TxId %" PRIu64, TabletID(), partitionId, tx.TxId); ctx.Send(p->second.Actor, event.release()); @@ -3982,7 +4030,8 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx, result->Record.SetTxId(tx.TxId); result->Record.SetStep(tx.Step); - PQ_LOG_D("send TEvPersQueue::TEvProposeTransactionResult(" << + PQ_LOG_D("TxId: " << tx.TxId << + " send TEvPersQueue::TEvProposeTransactionResult(" << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(result->Record.GetStatus()) << ")"); ctx.Send(tx.SourceActor, std::move(result)); @@ -4035,12 +4084,138 @@ TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx, return &p->second; } +void TPersQueue::PushTxInQueue(TDistributedTransaction& tx, TDistributedTransaction::EState state) +{ + auto& txQueue = TxsOrder[state]; + txQueue.push_back(tx.TxId); + tx.Pending = txQueue.size() > 1; +} + +void TPersQueue::ChangeTxState(TDistributedTransaction& tx, + TDistributedTransaction::EState newState) +{ + tx.State = newState; + + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); +} + +bool TPersQueue::TryChangeTxState(TDistributedTransaction& tx, + TDistributedTransaction::EState newState) +{ + auto oldState = tx.State; + Y_ABORT_UNLESS(TxsOrder.contains(oldState) || (oldState == NKikimrPQ::TTransaction::PLANNING), + "PQ %" PRIu64 ", TxId %" PRIu64 ", State %s", + TabletID(), tx.TxId, NKikimrPQ::TTransaction_EState_Name(oldState).data()); + + if (oldState != NKikimrPQ::TTransaction::PLANNING) { + Y_ABORT_UNLESS(TxsOrder.contains(oldState), + "PQ %" PRIu64 ", TxId %" PRIu64 ", State %s", + TabletID(), tx.TxId, + NKikimrPQ::TTransaction_EState_Name(oldState).data()); + Y_ABORT_UNLESS(TxsOrder[oldState].front() == tx.TxId, + "PQ %" PRIu64 ", TxId %" PRIu64 ", State %s, FrontTxId %" PRIu64, + TabletID(), tx.TxId, + NKikimrPQ::TTransaction_EState_Name(oldState).data(), + TxsOrder[oldState].front()); + } + + ChangeTxState(tx, newState); + + if (oldState != NKikimrPQ::TTransaction::PLANNING) { + TxsOrder[oldState].pop_front(); + } + if (TxsOrder.contains(newState)) { + PushTxInQueue(tx, newState); + } + + PQ_LOG_D("TxId " << tx.TxId << " moved from " << + NKikimrPQ::TTransaction_EState_Name(oldState) << + " to " << + NKikimrPQ::TTransaction_EState_Name(newState)); + + return true; +} + +bool TPersQueue::CanExecute(const TDistributedTransaction& tx) +{ + if (tx.Pending) { + return false; + } + + if (!TxsOrder.contains(tx.State)) { + return true; + } + + auto& txQueue = TxsOrder[tx.State]; + Y_ABORT_UNLESS(!txQueue.empty(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", State %s", + TabletID(), tx.TxId, NKikimrPQ::TTransaction_EState_Name(tx.State).data()); + + PQ_LOG_D("TxId " << tx.TxId << + " State " << NKikimrPQ::TTransaction_EState_Name(tx.State) << + " FrontTxId " << txQueue.front()); + + return txQueue.front() == tx.TxId; +} + +void TPersQueue::TryExecuteTxs(const TActorContext& ctx, + TDistributedTransaction& tx) +{ + PQ_LOG_D("Try execute txs with state " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + + TDistributedTransaction::EState oldState = tx.State; + + CheckTxState(ctx, tx); + + if (!TxsOrder.contains(oldState)) { + // This transaction is either not scheduled or has already been completed. + return; + } + + if (oldState == tx.State) { + // The transaction status has not changed. There is no point in watching the transactions behind her. + PQ_LOG_D("TxId " << tx.TxId << " status has not changed"); + return; + } + + auto& txQueue = TxsOrder[oldState]; + while (!txQueue.empty()) { + PQ_LOG_D("There are " << txQueue.size() << " txs in the queue " << NKikimrPQ::TTransaction_EState_Name(oldState)); + ui64 txId = txQueue.front(); + Y_ABORT_UNLESS(Txs.contains(txId), "unknown TxId %" PRIu64, txId); + auto& tx = Txs.at(txId); + PQ_LOG_D("Try execute TxId " << tx.TxId << " Pending " << tx.Pending); + + if (!tx.Pending) { + // The transaction was not postponed for execution. + break; + } + tx.Pending = false; + + CheckTxState(ctx, tx); + + if (oldState == tx.State) { + // The transaction status has not changed. There is no point in watching the transactions behind her. + PQ_LOG_D("TxId " << tx.TxId << " status has not changed"); + break; + } + } +} + void TPersQueue::CheckTxState(const TActorContext& ctx, TDistributedTransaction& tx) { PQ_LOG_D("TxId " << tx.TxId << ", State " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + if (!CanExecute(tx)) { + PQ_LOG_D("Can't execute TxId " << tx.TxId << " Pending " << tx.Pending); + tx.Pending = true; + PQ_LOG_D("Wait for TxId " << tx.TxId); + return; + } + switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: Y_ABORT_UNLESS(tx.TxId != Max(), @@ -4050,9 +4225,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, WriteTx(tx, NKikimrPQ::TTransaction::PREPARED); ScheduleProposeTransactionResult(tx); - tx.State = NKikimrPQ::TTransaction::PREPARING; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + ChangeTxState(tx, NKikimrPQ::TTransaction::PREPARING); break; @@ -4065,9 +4238,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, // scheduled events will be sent to EndWriteTxs - tx.State = NKikimrPQ::TTransaction::PREPARED; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + ChangeTxState(tx, NKikimrPQ::TTransaction::PREPARED); break; @@ -4078,9 +4249,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, WriteTx(tx, NKikimrPQ::TTransaction::PLANNED); - tx.State = NKikimrPQ::TTransaction::PLANNING; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + ChangeTxState(tx, NKikimrPQ::TTransaction::PLANNING); break; @@ -4093,22 +4262,29 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, // scheduled events will be sent to EndWriteTxs - tx.State = NKikimrPQ::TTransaction::PLANNED; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + TryChangeTxState(tx, NKikimrPQ::TTransaction::PLANNED); + + if (tx.TxId != TxsOrder[tx.State].front()) { + break; + } [[fallthrough]]; case NKikimrPQ::TTransaction::PLANNED: + Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxsOrder[tx.State].front()); + PQ_LOG_D("TxQueue.size " << TxQueue.size()); - if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { - MoveTopTxToCalculating(tx, ctx); - } + MoveTopTxToCalculating(tx, ctx); break; case NKikimrPQ::TTransaction::CALCULATING: + Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxsOrder[tx.State].front()); Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected, "PQ %" PRIu64 ", TxId %" PRIu64 ", PartitionRepliesCount %" PRISZT ", PartitionRepliesExpected %" PRISZT, TabletID(), tx.TxId, @@ -4123,9 +4299,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::KIND_CONFIG: WriteTx(tx, NKikimrPQ::TTransaction::CALCULATED); - tx.State = NKikimrPQ::TTransaction::CALCULATED; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + TryChangeTxState(tx, NKikimrPQ::TTransaction::CALCULATED); break; @@ -4137,24 +4311,25 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, break; case NKikimrPQ::TTransaction::CALCULATED: - tx.State = NKikimrPQ::TTransaction::WAIT_RS; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxsOrder[tx.State].front()); - // - // the number of TEvReadSetAck sent should not be greater than the number of senders - // from TEvProposeTransaction - // - Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.PredicatesReceived.size(), - "PQ %" PRIu64 ", TxId %" PRIu64 ", ReadSetAcks.size %" PRISZT ", PredicatesReceived.size %" PRISZT, - TabletID(), tx.TxId, - tx.ReadSetAcks.size(), tx.PredicatesReceived.size()); + TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS); SendEvReadSetToReceivers(ctx, tx); + if (tx.TxId != TxsOrder[tx.State].front()) { + break; + } + [[fallthrough]]; case NKikimrPQ::TTransaction::WAIT_RS: + Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxsOrder[tx.State].front()); + PQ_LOG_D("HaveParticipantsDecision " << tx.HaveParticipantsDecision()); if (tx.HaveParticipantsDecision()) { @@ -4164,16 +4339,21 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, SendEvTxRollbackToPartitions(ctx, tx); } - tx.State = NKikimrPQ::TTransaction::EXECUTING; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + TryChangeTxState(tx, NKikimrPQ::TTransaction::EXECUTING); } else { break; } + if (tx.TxId != TxsOrder[tx.State].front()) { + break; + } + [[fallthrough]]; case NKikimrPQ::TTransaction::EXECUTING: + Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxsOrder[tx.State].front()); Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected, "PQ %" PRIu64 ", TxId %" PRIu64 ", PartitionRepliesCount %" PRISZT ", PartitionRepliesExpected %" PRISZT, TabletID(), tx.TxId, @@ -4183,14 +4363,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, ", Expected " << tx.PartitionRepliesExpected); if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { - Y_ABORT_UNLESS(!TxQueue.empty(), - "PQ %" PRIu64 ", TxId %" PRIu64, - TabletID(), tx.TxId); - Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId, - "PQ %" PRIu64 ", TxId %" PRIu64, - TabletID(), tx.TxId); - SendEvProposeTransactionResult(ctx, tx); + PQ_LOG_D("complete TxId " << tx.TxId); switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: @@ -4209,19 +4383,19 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, PQ_LOG_D("delete partitions for TxId " << tx.TxId); BeginDeletePartitions(tx); - tx.State = NKikimrPQ::TTransaction::EXECUTED; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + TryChangeTxState(tx, NKikimrPQ::TTransaction::EXECUTED); } break; case NKikimrPQ::TTransaction::EXECUTED: + Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxsOrder[tx.State].front()); + SendEvReadSetAckToSenders(ctx, tx); - tx.State = NKikimrPQ::TTransaction::WAIT_RS_ACKS; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS_ACKS); [[fallthrough]]; @@ -4238,8 +4412,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::DELETING: // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one. if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { - TxQueue.pop(); - TryStartTransaction(ctx); + TxQueue.pop_front(); } DeleteWriteId(tx.WriteId); @@ -4296,9 +4469,7 @@ void TPersQueue::DeleteTx(TDistributedTransaction& tx) DeleteTxs.insert(tx.TxId); - tx.State = NKikimrPQ::TTransaction::DELETING; - PQ_LOG_D("TxId " << tx.TxId << - ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + ChangeTxState(tx, NKikimrPQ::TTransaction::DELETING); tx.WriteInProgress = true; } @@ -4313,14 +4484,16 @@ void TPersQueue::SendReplies(const TActorContext& ctx) void TPersQueue::CheckChangedTxStates(const TActorContext& ctx) { - for (ui64 txId : ChangedTxs) { + for (const auto& p : ChangedTxs) { + ui64 txId = p.second; auto tx = GetTransaction(ctx, txId); Y_ABORT_UNLESS(tx, "PQ %" PRIu64 ", TxId %" PRIu64, TabletID(), txId); - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); } + ChangedTxs.clear(); } @@ -4374,7 +4547,8 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target, error->SetReason(reason); } - PQ_LOG_D("send TEvPersQueue::TEvProposeTransactionResult(" << + PQ_LOG_D("TxId: " << txId << + " send TEvPersQueue::TEvProposeTransactionResult(" << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event->Record.GetStatus()) << ")"); ctx.Send(target, std::move(event)); @@ -4452,7 +4626,9 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, { EnsurePartitionsAreNotDeleted(config); - Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited()); + Y_ABORT_UNLESS(ConfigInited, + "ConfigInited=%d", + static_cast(ConfigInited)); for (const auto& partition : config.GetPartitions()) { const TPartitionId partitionId(partition.GetPartitionId()); @@ -4486,36 +4662,121 @@ void TPersQueue::BeginInitTransactions() Txs.clear(); TxQueue.clear(); + InitTxsOrder(); + PlannedTxs.clear(); } +std::array GetTxsStatesDirectOrder() +{ + return { + NKikimrPQ::TTransaction::PLANNED, + NKikimrPQ::TTransaction::CALCULATING, + NKikimrPQ::TTransaction::CALCULATED, + NKikimrPQ::TTransaction::WAIT_RS, + NKikimrPQ::TTransaction::EXECUTING, + NKikimrPQ::TTransaction::EXECUTED + }; +} + +std::array GetTxsStatesReverseOrder() +{ + auto states = GetTxsStatesDirectOrder(); + std::reverse(states.begin(), states.end()); + return states; +} + +void TPersQueue::InitTxsOrder() +{ + TxsOrder.clear(); + + static const auto txStates = GetTxsStatesDirectOrder(); + + for (auto state : txStates) { + TxsOrder[state].clear(); + } +} + void TPersQueue::EndInitTransactions() { PQ_LOG_D("Txs.size=" << Txs.size() << ", PlannedTxs.size=" << PlannedTxs.size()); std::sort(PlannedTxs.begin(), PlannedTxs.end()); for (auto& item : PlannedTxs) { - TxQueue.push(item); + TxQueue.push_back(item); } if (!TxQueue.empty()) { PQ_LOG_D("top tx queue (" << TxQueue.front().first << ", " << TxQueue.front().second << ")"); } + + for (const auto& [_, txId] : TxQueue) { + Y_ABORT_UNLESS(Txs.contains(txId), + "PQ %" PRIu64 ", unknown TxId %" PRIu64, + TabletID(), txId); + auto& tx = Txs.at(txId); + + Y_ABORT_UNLESS(txId == tx.TxId); + + if (!TxsOrder.contains(tx.State)) { + PQ_LOG_D("TxsOrder: " << + txId << " " << NKikimrPQ::TTransaction_EState_Name(tx.State) << " skip"); + continue; + } + + PushTxInQueue(tx, tx.State); + + PQ_LOG_D("TxsOrder: " << + txId << " " << NKikimrPQ::TTransaction_EState_Name(tx.State) << " " << tx.Pending); + } } void TPersQueue::TryStartTransaction(const TActorContext& ctx) { - if (TxQueue.empty()) { - PQ_LOG_D("empty tx queue"); - return; + static const auto txStates = GetTxsStatesReverseOrder(); + + ResendEvReadSetToReceivers(ctx); + DeleteSupportivePartitions(ctx); + + for (auto state : txStates) { + const auto& txQueue = TxsOrder[state]; + if (txQueue.empty()) { + continue; + } + + auto next = GetTransaction(ctx, txQueue.front()); + Y_ABORT_UNLESS(next); + + TryExecuteTxs(ctx, *next); + + TryWriteTxs(ctx); } +} - auto next = GetTransaction(ctx, TxQueue.front().second); - Y_ABORT_UNLESS(next); +void TPersQueue::ResendEvReadSetToReceivers(const TActorContext& ctx) +{ + ResendEvReadSetToReceiversForState(ctx, NKikimrPQ::TTransaction::CALCULATED); + ResendEvReadSetToReceiversForState(ctx, NKikimrPQ::TTransaction::EXECUTED); +} - CheckTxState(ctx, *next); +void TPersQueue::ResendEvReadSetToReceiversForState(const TActorContext& ctx, NKikimrPQ::TTransaction::EState state) +{ + for (ui64 txId : TxsOrder[state]) { + auto tx = GetTransaction(ctx, txId); + Y_ABORT_UNLESS(tx, "unknown TxId %" PRIu64, txId); - TryWriteTxs(ctx); + SendEvReadSetToReceivers(ctx, *tx); + } +} + +void TPersQueue::DeleteSupportivePartitions(const TActorContext& ctx) +{ + for (ui64 txId : TxsOrder[NKikimrPQ::TTransaction::EXECUTED]) { + auto tx = GetTransaction(ctx, txId); + Y_ABORT_UNLESS(tx, "unknown TxId %" PRIu64, txId); + + BeginDeletePartitions(*tx); + } } void TPersQueue::OnInitComplete(const TActorContext& ctx) @@ -4629,6 +4890,10 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e } TTxWriteInfo& writeInfo = TxWrites.at(writeId); + PQ_LOG_D("TxWriteInfo: " << + "WriteId " << writeId << + ", TxId " << writeInfo.TxId << + ", Status " << NKikimrLongTxService::TEvLockStatus_EStatus_Name(writeInfo.LongTxSubscriptionStatus)); writeInfo.LongTxSubscriptionStatus = record.GetStatus(); if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) { @@ -4698,7 +4963,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon if (writeInfo.TxId.Defined()) { if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) { if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) { - CheckTxState(ctx, *tx); + TryExecuteTxs(ctx, *tx); } } } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 4101afe61fba..2fdc0d1bf16e 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -274,7 +274,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { // транзакции // THashMap Txs; - TQueue> TxQueue; + TDeque> TxQueue; ui64 PlanStep = 0; ui64 PlanTxId = 0; ui64 ExecStep = 0; @@ -284,9 +284,18 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { TDeque>> EvPlanStepQueue; THashMap WriteTxs; THashSet DeleteTxs; - THashSet ChangedTxs; + TSet> ChangedTxs; TMaybe TabletConfigTx; TMaybe BootstrapConfigTx; + + // PLANNED -> CALCULATING -> CALCULATED -> WAIT_RS -> EXECUTING -> EXECUTED + THashMap> TxsOrder; + + void PushTxInQueue(TDistributedTransaction& tx, TDistributedTransaction::EState state); + void ChangeTxState(TDistributedTransaction& tx, TDistributedTransaction::EState newState); + bool TryChangeTxState(TDistributedTransaction& tx, TDistributedTransaction::EState newState); + bool CanExecute(const TDistributedTransaction& tx); + bool WriteTxsInProgress = false; struct TReplyToActor; @@ -334,6 +343,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void CheckTxState(const TActorContext& ctx, TDistributedTransaction& tx); + void TryExecuteTxs(const TActorContext& ctx, + TDistributedTransaction& tx); void WriteTx(TDistributedTransaction& tx, NKikimrPQ::TTransaction::EState state); void DeleteTx(TDistributedTransaction& tx); @@ -515,6 +526,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void BeginInitTransactions(); void EndInitTransactions(); + void InitTxsOrder(); + void EndReadConfig(const TActorContext& ctx); void AddCmdReadTransactionRange(TEvKeyValue::TEvRequest& request, @@ -533,6 +546,11 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void DeleteWriteId(const TMaybe& writeId); void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const; + + void ResendEvReadSetToReceivers(const TActorContext& ctx); + void ResendEvReadSetToReceiversForState(const TActorContext& ctx, NKikimrPQ::TTransaction::EState state); + + void DeleteSupportivePartitions(const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 0e61fe93dba8..d3cf152c9c24 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -115,6 +115,8 @@ struct TDistributedTransaction { bool HasWriteOperations = false; size_t PredicateAcksCount = 0; + + bool Pending = false; }; } diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index f6388c452904..34169c7707c7 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -230,6 +230,12 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture { std::unique_ptr MakeGetOwnershipRequest(const TGetOwnershipRequestParams& params, const TActorId& pipe) const; + void TestMultiplePQTablets(const TString& consumer1, const TString& consumer2); + void TestParallelTransactions(const TString& consumer1, const TString& consumer2); + + void StartPQCalcPredicateObserver(size_t& received); + void WaitForPQCalcPredicate(size_t& received, size_t expected); + // // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait // @@ -802,10 +808,16 @@ NHelpers::TPQTabletMock* TPQTabletFixture::CreatePQTabletMock(ui64 tabletId) return mock; } -Y_UNIT_TEST_F(Multiple_PQTablets, TPQTabletFixture) +void TPQTabletFixture::TestMultiplePQTablets(const TString& consumer1, const TString& consumer2) { + TVector> consumers; + consumers.emplace_back(consumer1, true); + if (consumer1 != consumer2) { + consumers.emplace_back(consumer2, true); + } + NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(22222); - PQTabletPrepare({.partitions=1}, {}, *Ctx); + PQTabletPrepare({.partitions=1}, consumers, *Ctx); const ui64 txId_1 = 67890; const ui64 txId_2 = 67891; @@ -813,7 +825,7 @@ Y_UNIT_TEST_F(Multiple_PQTablets, TPQTabletFixture) SendProposeTransactionRequest({.TxId=txId_1, .Senders={22222}, .Receivers={22222}, .TxOps={ - {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=0, .Consumer=consumer1, .Begin=0, .End=0, .Path="/topic"}, }}); WaitProposeTransactionResponse({.TxId=txId_1, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); @@ -821,7 +833,7 @@ Y_UNIT_TEST_F(Multiple_PQTablets, TPQTabletFixture) SendProposeTransactionRequest({.TxId=txId_2, .Senders={22222}, .Receivers={22222}, .TxOps={ - {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=0, .Consumer=consumer2, .Begin=0, .End=0, .Path="/topic"}, }}); WaitProposeTransactionResponse({.TxId=txId_2, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); @@ -829,36 +841,125 @@ Y_UNIT_TEST_F(Multiple_PQTablets, TPQTabletFixture) SendPlanStep({.Step=100, .TxIds={txId_2}}); SendPlanStep({.Step=200, .TxIds={txId_1}}); - // - // TODO(abcdef): проверить, что в команде CmdWrite есть информация о транзакции - // - - WaitPlanStepAck({.Step=100, .TxIds={txId_2}}); // TEvPlanStepAck для координатора + WaitPlanStepAck({.Step=100, .TxIds={txId_2}}); // TEvPlanStepAck for Coordinator WaitPlanStepAccepted({.Step=100}); - WaitPlanStepAck({.Step=200, .TxIds={txId_1}}); // TEvPlanStepAck для координатора + WaitPlanStepAck({.Step=200, .TxIds={txId_1}}); // TEvPlanStepAck for Coordinator WaitPlanStepAccepted({.Step=200}); - // - // транзакция txId_2 - // WaitReadSet(*tablet, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + WaitReadSet(*tablet, {.Step=200, .TxId=txId_1, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); + tablet->SendReadSet(*Ctx->Runtime, {.Step=200, .TxId=txId_1, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + WaitProposeTransactionResponse({.TxId=txId_2, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); - tablet->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId}); - WaitReadSetAck(*tablet, {.Step=100, .TxId=txId_2, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); +} - // - // TODO(abcdef): проверить, что удалена информация о транзакции - // +Y_UNIT_TEST_F(Multiple_PQTablets_1, TPQTabletFixture) +{ + TestMultiplePQTablets("consumer", "consumer"); +} - // - // транзакция txId_1 - // - WaitReadSet(*tablet, {.Step=200, .TxId=txId_1, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); +Y_UNIT_TEST_F(Multiple_PQTablets_2, TPQTabletFixture) +{ + TestMultiplePQTablets("consumer-1", "consumer-2"); +} + +void TPQTabletFixture::TestParallelTransactions(const TString& consumer1, const TString& consumer2) +{ + TVector> consumers; + consumers.emplace_back(consumer1, true); + if (consumer1 != consumer2) { + consumers.emplace_back(consumer2, true); + } + + NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(22222); + PQTabletPrepare({.partitions=1}, consumers, *Ctx); + + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + + SendProposeTransactionRequest({.TxId=txId_1, + .Senders={22222}, .Receivers={22222}, + .TxOps={ + {.Partition=0, .Consumer=consumer1, .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendProposeTransactionRequest({.TxId=txId_2, + .Senders={22222}, .Receivers={22222}, + .TxOps={ + {.Partition=0, .Consumer=consumer2, .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + size_t calcPredicateResultCount = 0; + StartPQCalcPredicateObserver(calcPredicateResultCount); + + // Transactions are planned in reverse order + SendPlanStep({.Step=100, .TxIds={txId_2}}); + SendPlanStep({.Step=200, .TxIds={txId_1}}); + + WaitPlanStepAck({.Step=100, .TxIds={txId_2}}); // TEvPlanStepAck for Coordinator + WaitPlanStepAccepted({.Step=100}); + + WaitPlanStepAck({.Step=200, .TxIds={txId_1}}); // TEvPlanStepAck for Coordinator + WaitPlanStepAccepted({.Step=200}); + + // The PQ tablet sends to the TEvTxCalcPredicate partition for both transactions + WaitForPQCalcPredicate(calcPredicateResultCount, 2); + + // TEvReadSet messages arrive in any order + tablet->SendReadSet(*Ctx->Runtime, {.Step=200, .TxId=txId_1, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + + // Transactions will be executed in the order they were planned + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); +} + +void TPQTabletFixture::StartPQCalcPredicateObserver(size_t& received) +{ + received = 0; + + auto observer = [&received](TAutoPtr& event) { + if (auto* msg = event->CastAsLocal()) { + ++received; + } + + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + + Ctx->Runtime->SetObserverFunc(observer); +} + +void TPQTabletFixture::WaitForPQCalcPredicate(size_t& received, size_t expected) +{ + TDispatchOptions options; + options.CustomFinalCondition = [&received, expected]() { + return received >= expected; + }; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); +} + +Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture) +{ + TestParallelTransactions("consumer", "consumer"); +} + +Y_UNIT_TEST_F(Parallel_Transactions_2, TPQTabletFixture) +{ + TestParallelTransactions("consumer-1", "consumer-2"); } Y_UNIT_TEST_F(Single_PQTablet_And_Multiple_Partitions, TPQTabletFixture)