diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b996667d5386..52cb5034fce0 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -50,6 +50,22 @@ struct TMirrorerInfo { TTabletCountersBase Baseline; }; +template +T& TPartition::GetUserActionAndTransactionEventsFront() +{ + Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty()); + auto* ptr = get_if>(&UserActionAndTransactionEvents.front()); + Y_ABORT_UNLESS(ptr); + return **ptr; +} + +template +bool TPartition::UserActionAndTransactionEventsFrontIs() const +{ + Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty()); + return get_if>(&UserActionAndTransactionEvents.front()); +} + const TString& TPartition::TopicName() const { return TopicConverter->GetClientsideName(); } @@ -192,9 +208,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui TabletCounters.Populate(Counters); if (!distrTxs.empty()) { - std::move(distrTxs.begin(), distrTxs.end(), - std::back_inserter(DistrTxs)); - TxInProgress = DistrTxs.front().Predicate.Defined(); + for (auto& tx : distrTxs) { + UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(tx))); + } + TxInProgress = GetUserActionAndTransactionEventsFront().Predicate.Defined(); } } @@ -566,7 +583,6 @@ void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorC void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx) { - const TString& owner = ev->Get()->Owner; const TActorId& pipeClient = ev->Get()->PipeClient; @@ -854,7 +870,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc return; } - if (ImmediateTxs.size() > MAX_TXS) { + if (ImmediateTxCount >= MAX_TXS) { ReplyPropose(ctx, event, NKikimrPQ::TEvProposeTransactionResult::OVERLOADED); @@ -1397,47 +1413,53 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& void TPartition::PushBackDistrTx(TSimpleSharedPtr event) { - DistrTxs.emplace_back(std::move(event)); + UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event))); } void TPartition::PushBackDistrTx(TSimpleSharedPtr event) { - DistrTxs.emplace_back(std::move(event), true); + UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event), true)); } void TPartition::PushFrontDistrTx(TSimpleSharedPtr event) { - DistrTxs.emplace_front(std::move(event), false); + UserActionAndTransactionEvents.emplace_front(new TTransaction(std::move(event), false)); } void TPartition::PushBackDistrTx(TSimpleSharedPtr event) { - DistrTxs.emplace_back(std::move(event)); + UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event))); } void TPartition::AddImmediateTx(TSimpleSharedPtr tx) { - ImmediateTxs.push_back(std::move(tx)); + UserActionAndTransactionEvents.emplace_back(std::move(tx)); + ++ImmediateTxCount; } void TPartition::AddUserAct(TSimpleSharedPtr act) { - UserActs.push_back(std::move(act)); - ++UserActCount[UserActs.back()->ClientId]; + TString clientId = act->ClientId; + UserActionAndTransactionEvents.emplace_back(std::move(act)); + ++UserActCount[clientId]; } void TPartition::RemoveImmediateTx() { - Y_ABORT_UNLESS(!ImmediateTxs.empty()); + Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty()); + Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs()); - ImmediateTxs.pop_front(); + UserActionAndTransactionEvents.pop_front(); + --ImmediateTxCount; } void TPartition::RemoveUserAct() { - Y_ABORT_UNLESS(!UserActs.empty()); + Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty()); + Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs()); - auto p = UserActCount.find(UserActs.front()->ClientId); + TString clientId = GetUserActionAndTransactionEventsFront().ClientId; + auto p = UserActCount.find(clientId); Y_ABORT_UNLESS(p != UserActCount.end()); Y_ABORT_UNLESS(p->second > 0); @@ -1445,7 +1467,7 @@ void TPartition::RemoveUserAct() UserActCount.erase(p); } - UserActs.pop_front(); + UserActionAndTransactionEvents.pop_front(); } size_t TPartition::GetUserActCount(const TString& consumer) const @@ -1459,7 +1481,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) { - if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty() && DistrTxs.empty()) || TxInProgress) { + if (UsersInfoWriteInProgress || UserActionAndTransactionEvents.empty() || TxInProgress) { return; } @@ -1472,17 +1494,36 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx) { - if (!DistrTxs.empty()) { - ProcessDistrTxs(ctx); + Y_ABORT_UNLESS(!UsersInfoWriteInProgress); + Y_ABORT_UNLESS(!TxInProgress); + + if (!UserActionAndTransactionEvents.empty()) { + auto visitor = [this, &ctx](const auto& event) -> bool { + return this->ProcessUserActionOrTransaction(*event, ctx); + }; + + size_t index = UserActionAndTransactionEvents.front().index(); + while (!UserActionAndTransactionEvents.empty()) { + auto& front = UserActionAndTransactionEvents.front(); + + if (index != front.index()) { + break; + } + + if (!std::visit(visitor, front)) { + break; + } + + if (TxInProgress) { + break; + } + } if (TxInProgress) { return; } } - ProcessUserActs(ctx); - ProcessImmediateTxs(ctx); - THolder request(new TEvKeyValue::TEvRequest); request->Record.SetCookie(SET_OFFSET_COOKIE); @@ -1499,19 +1540,52 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx) void TPartition::RemoveDistrTx() { - Y_ABORT_UNLESS(!DistrTxs.empty()); + Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty()); + Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs()); - DistrTxs.pop_front(); + UserActionAndTransactionEvents.pop_front(); PendingPartitionConfig = nullptr; } -void TPartition::ProcessDistrTxs(const TActorContext& ctx) +bool TPartition::ProcessUserActionOrTransaction(TTransaction& t, + const TActorContext& ctx) { Y_ABORT_UNLESS(!TxInProgress); - while (!TxInProgress && !DistrTxs.empty()) { - ProcessDistrTx(ctx); + if (t.Tx) { + t.Predicate = BeginTransaction(*t.Tx, ctx); + + ctx.Send(Tablet, + MakeHolder(t.Tx->Step, + t.Tx->TxId, + Partition, + *t.Predicate).Release()); + + TxInProgress = true; + } else if (t.ProposeConfig) { + t.Predicate = BeginTransaction(*t.ProposeConfig); + + PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition); + //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); + + ctx.Send(Tablet, + MakeHolder(t.ProposeConfig->Step, + t.ProposeConfig->TxId, + Partition).Release()); + + TxInProgress = true; + } else { + Y_ABORT_UNLESS(!ChangeConfig); + + ChangeConfig = t.ChangeConfig; + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition); + SendChangeConfigReply = t.SendReply; + BeginChangePartitionConfig(ChangeConfig->Config, ctx); + + RemoveDistrTx(); } + + return false; } bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, @@ -1602,8 +1676,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event, Y_ABORT_UNLESS(TxInProgress); - Y_ABORT_UNLESS(!DistrTxs.empty()); - TTransaction& t = DistrTxs.front(); + TTransaction& t = GetUserActionAndTransactionEventsFront(); if (t.Tx) { Y_ABORT_UNLESS(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); @@ -1649,8 +1722,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event, Y_ABORT_UNLESS(TxInProgress); - Y_ABORT_UNLESS(!DistrTxs.empty()); - TTransaction& t = DistrTxs.front(); + TTransaction& t = GetUserActionAndTransactionEventsFront(); if (t.Tx) { Y_ABORT_UNLESS(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); @@ -1666,7 +1738,6 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event, Y_ABORT_UNLESS(t.ChangeConfig); } - RemoveDistrTx(); } @@ -1717,7 +1788,6 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx) { Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE); - if (ChangeConfig) { EndChangePartitionConfig(ChangeConfig->Config, ChangeConfig->TopicConverter, @@ -1782,7 +1852,6 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC PendingPartitionConfig = nullptr; } - ProcessTxsAndUserActs(ctx); if (ChangeConfig && CurrentStateFunc() == &TThis::StateIdle) { @@ -1847,56 +1916,17 @@ void TPartition::ResendPendingEvents(const TActorContext& ctx) } } -void TPartition::ProcessDistrTx(const TActorContext& ctx) +bool TPartition::ProcessUserActionOrTransaction(const TEvPersQueue::TEvProposeTransaction& event, + const TActorContext& ctx) { - Y_ABORT_UNLESS(!TxInProgress); - - Y_ABORT_UNLESS(!DistrTxs.empty()); - TTransaction& t = DistrTxs.front(); - - if (t.Tx) { - t.Predicate = BeginTransaction(*t.Tx, ctx); - - ctx.Send(Tablet, - MakeHolder(t.Tx->Step, - t.Tx->TxId, - Partition, - *t.Predicate).Release()); - - TxInProgress = true; - } else if (t.ProposeConfig) { - t.Predicate = BeginTransaction(*t.ProposeConfig); - - PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition); - //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); - - ctx.Send(Tablet, - MakeHolder(t.ProposeConfig->Step, - t.ProposeConfig->TxId, - Partition).Release()); - - TxInProgress = true; - } else { - Y_ABORT_UNLESS(!ChangeConfig); - - ChangeConfig = t.ChangeConfig; - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition); - SendChangeConfigReply = t.SendReply; - BeginChangePartitionConfig(ChangeConfig->Config, ctx); - - RemoveDistrTx(); + if (AffectedUsers.size() >= MAX_USERS) { + return false; } -} -void TPartition::ProcessImmediateTxs(const TActorContext& ctx) -{ - Y_ABORT_UNLESS(!UsersInfoWriteInProgress); - - while (!ImmediateTxs.empty() && (AffectedUsers.size() < MAX_USERS)) { - ProcessImmediateTx(ImmediateTxs.front()->Record, ctx); + ProcessImmediateTx(event.Record, ctx); + RemoveImmediateTx(); - RemoveImmediateTx(); - } + return true; } void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx, @@ -1948,15 +1978,17 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx, NKikimrPQ::TEvProposeTransactionResult::COMPLETE); } -void TPartition::ProcessUserActs(const TActorContext& ctx) +bool TPartition::ProcessUserActionOrTransaction(TEvPQ::TEvSetClientInfo& act, + const TActorContext& ctx) { - Y_ABORT_UNLESS(!UsersInfoWriteInProgress); + if (AffectedUsers.size() >= MAX_USERS) { + return false; + } - while (!UserActs.empty() && (AffectedUsers.size() < MAX_USERS)) { - ProcessUserAct(*UserActs.front(), ctx); + ProcessUserAct(act, ctx); + RemoveUserAct(); - RemoveUserAct(); - } + return true; } void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index dd0fa7e36eae..2b21f8e09345 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -23,6 +23,7 @@ #include +#include namespace NKikimr::NPQ { @@ -635,12 +636,22 @@ class TPartition : public TActorBootstrapped { TMaybe UsersInfoStorage; + template T& GetUserActionAndTransactionEventsFront(); + template bool UserActionAndTransactionEventsFrontIs() const; + + bool ProcessUserActionOrTransaction(TEvPQ::TEvSetClientInfo& event, const TActorContext& ctx); + bool ProcessUserActionOrTransaction(const TEvPersQueue::TEvProposeTransaction& event, const TActorContext& ctx); + bool ProcessUserActionOrTransaction(TTransaction& tx, const TActorContext& ctx); + // // user actions and transactions // - std::deque> UserActs; - std::deque> ImmediateTxs; - std::deque DistrTxs; + using TUserActionAndTransactionEvent = + std::variant, // user actions + TSimpleSharedPtr, // immediate transaction + TSimpleSharedPtr>; // distributed transaction or update config + std::deque UserActionAndTransactionEvents; + size_t ImmediateTxCount = 0; THashMap UserActCount; THashMap PendingUsersInfo; TVector>> Replies; diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 2ea5da2d6a67..1ab06bd0d817 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -950,6 +950,65 @@ Y_UNIT_TEST_F(SetOffset, TPartitionFixture) WaitProxyResponse({.Cookie=5, .Status=NMsgBusProxy::MSTATUS_OK}); } +Y_UNIT_TEST_F(TooManyImmediateTxs, TPartitionFixture) +{ + const ui32 partition = 0; + const ui64 begin = 0; + const ui64 end = 2'000; + const TString client = "client"; + const TString session = "session"; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end}); + + CreateSession(client, session); + + for (ui64 txId = 1; txId <= 1'002; ++txId) { + SendProposeTransactionRequest(partition, + txId - 1, txId, // range + client, + "topic-path", + true, + txId); + } + + // + // the first command in the queue will start writing + // + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=1}}}}); + + // + // messages from 2 to 1001 will be queued and the OVERLOADED error will be returned to the last one + // + WaitProposeTransactionResponse({.TxId=1'002, .Status=NKikimrPQ::TEvProposeTransactionResult::OVERLOADED}); + + // + // the writing has ended + // + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitProposeTransactionResponse({.TxId=1, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + + // + // the commands from the queue will be executed as one + // + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=1'001}}}}); + + // + // while the writing is in progress, another command has arrived + // + SendProposeTransactionRequest(partition, + 1'001, 1'002, // range + client, + "topic-path", + true, + 1'003); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + // + // it will be processed + // + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=1'002}}}}); +} + Y_UNIT_TEST_F(CommitOffsetRanges, TPartitionFixture) { const ui32 partition = 0; @@ -1278,13 +1337,10 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) SendCommitTx(step, txId_1); // - // consumer 'client-2' was deleted + // update config // - WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); - SendRollbackTx(step, txId_2); - WaitCmdWrite({.Count=8, - .PlanStep=step, .TxId=txId_2, + .PlanStep=step, .TxId=txId_1, .UserInfos={ {1, {.Consumer="client-1", .Session="session-1", .Offset=2}}, {3, {.Consumer="client-3", .Session="", .Offset=0, .ReadRuleGeneration=7}} @@ -1295,6 +1351,12 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitPartitionConfigChanged({.Partition=partition}); + + // + // consumer 'client-2' was deleted + // + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId_2); } Y_UNIT_TEST_F(TabletConfig_Is_Newer_That_PartitionConfig, TPartitionFixture)