Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 119 additions & 87 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ struct TMirrorerInfo {
TTabletCountersBase Baseline;
};

template <class T>
T& TPartition::GetUserActionAndTransactionEventsFront()
{
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
auto* ptr = get_if<TSimpleSharedPtr<T>>(&UserActionAndTransactionEvents.front());
Y_ABORT_UNLESS(ptr);
return **ptr;
}

template <class T>
bool TPartition::UserActionAndTransactionEventsFrontIs() const
{
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
return get_if<TSimpleSharedPtr<T>>(&UserActionAndTransactionEvents.front());
}

const TString& TPartition::TopicName() const {
return TopicConverter->GetClientsideName();
}
Expand Down Expand Up @@ -192,9 +208,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui
TabletCounters.Populate(Counters);

if (!distrTxs.empty()) {
std::move(distrTxs.begin(), distrTxs.end(),
std::back_inserter(DistrTxs));
TxInProgress = DistrTxs.front().Predicate.Defined();
for (auto& tx : distrTxs) {
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(tx)));
}
TxInProgress = GetUserActionAndTransactionEventsFront<TTransaction>().Predicate.Defined();
}
}

Expand Down Expand Up @@ -566,7 +583,6 @@ void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorC


void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx) {

const TString& owner = ev->Get()->Owner;
const TActorId& pipeClient = ev->Get()->PipeClient;

Expand Down Expand Up @@ -854,7 +870,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
return;
}

if (ImmediateTxs.size() > MAX_TXS) {
if (ImmediateTxCount >= MAX_TXS) {
ReplyPropose(ctx,
event,
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED);
Expand Down Expand Up @@ -1397,55 +1413,61 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&

void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
{
DistrTxs.emplace_back(std::move(event));
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event)));
}

void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
{
DistrTxs.emplace_back(std::move(event), true);
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event), true));
}

void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
{
DistrTxs.emplace_front(std::move(event), false);
UserActionAndTransactionEvents.emplace_front(new TTransaction(std::move(event), false));
}

void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event)
{
DistrTxs.emplace_back(std::move(event));
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event)));
}

void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
{
ImmediateTxs.push_back(std::move(tx));
UserActionAndTransactionEvents.emplace_back(std::move(tx));
++ImmediateTxCount;
}

void TPartition::AddUserAct(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo> act)
{
UserActs.push_back(std::move(act));
++UserActCount[UserActs.back()->ClientId];
TString clientId = act->ClientId;
UserActionAndTransactionEvents.emplace_back(std::move(act));
++UserActCount[clientId];
}

void TPartition::RemoveImmediateTx()
{
Y_ABORT_UNLESS(!ImmediateTxs.empty());
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs<TEvPersQueue::TEvProposeTransaction>());

ImmediateTxs.pop_front();
UserActionAndTransactionEvents.pop_front();
--ImmediateTxCount;
}

void TPartition::RemoveUserAct()
{
Y_ABORT_UNLESS(!UserActs.empty());
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs<TEvPQ::TEvSetClientInfo>());

auto p = UserActCount.find(UserActs.front()->ClientId);
TString clientId = GetUserActionAndTransactionEventsFront<TEvPQ::TEvSetClientInfo>().ClientId;
auto p = UserActCount.find(clientId);
Y_ABORT_UNLESS(p != UserActCount.end());

Y_ABORT_UNLESS(p->second > 0);
if (!--p->second) {
UserActCount.erase(p);
}

UserActs.pop_front();
UserActionAndTransactionEvents.pop_front();
}

size_t TPartition::GetUserActCount(const TString& consumer) const
Expand All @@ -1459,7 +1481,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const

void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
{
if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty() && DistrTxs.empty()) || TxInProgress) {
if (UsersInfoWriteInProgress || UserActionAndTransactionEvents.empty() || TxInProgress) {
return;
}

Expand All @@ -1472,17 +1494,36 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)

void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
{
if (!DistrTxs.empty()) {
ProcessDistrTxs(ctx);
Y_ABORT_UNLESS(!UsersInfoWriteInProgress);
Y_ABORT_UNLESS(!TxInProgress);

if (!UserActionAndTransactionEvents.empty()) {
auto visitor = [this, &ctx](const auto& event) -> bool {
return this->ProcessUserActionOrTransaction(*event, ctx);
};

size_t index = UserActionAndTransactionEvents.front().index();
while (!UserActionAndTransactionEvents.empty()) {
auto& front = UserActionAndTransactionEvents.front();

if (index != front.index()) {
break;
}

if (!std::visit(visitor, front)) {
break;
}

if (TxInProgress) {
break;
}
}

if (TxInProgress) {
return;
}
}

ProcessUserActs(ctx);
ProcessImmediateTxs(ctx);

THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
request->Record.SetCookie(SET_OFFSET_COOKIE);

Expand All @@ -1499,19 +1540,52 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)

void TPartition::RemoveDistrTx()
{
Y_ABORT_UNLESS(!DistrTxs.empty());
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs<TTransaction>());

DistrTxs.pop_front();
UserActionAndTransactionEvents.pop_front();
PendingPartitionConfig = nullptr;
}

void TPartition::ProcessDistrTxs(const TActorContext& ctx)
bool TPartition::ProcessUserActionOrTransaction(TTransaction& t,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(!TxInProgress);

while (!TxInProgress && !DistrTxs.empty()) {
ProcessDistrTx(ctx);
if (t.Tx) {
t.Predicate = BeginTransaction(*t.Tx, ctx);

ctx.Send(Tablet,
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
t.Tx->TxId,
Partition,
*t.Predicate).Release());

TxInProgress = true;
} else if (t.ProposeConfig) {
t.Predicate = BeginTransaction(*t.ProposeConfig);

PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
//Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");

ctx.Send(Tablet,
MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
t.ProposeConfig->TxId,
Partition).Release());

TxInProgress = true;
} else {
Y_ABORT_UNLESS(!ChangeConfig);

ChangeConfig = t.ChangeConfig;
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
SendChangeConfigReply = t.SendReply;
BeginChangePartitionConfig(ChangeConfig->Config, ctx);

RemoveDistrTx();
}

return false;
}

bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
Expand Down Expand Up @@ -1602,8 +1676,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,

Y_ABORT_UNLESS(TxInProgress);

Y_ABORT_UNLESS(!DistrTxs.empty());
TTransaction& t = DistrTxs.front();
TTransaction& t = GetUserActionAndTransactionEventsFront<TTransaction>();

if (t.Tx) {
Y_ABORT_UNLESS(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
Expand Down Expand Up @@ -1649,8 +1722,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,

Y_ABORT_UNLESS(TxInProgress);

Y_ABORT_UNLESS(!DistrTxs.empty());
TTransaction& t = DistrTxs.front();
TTransaction& t = GetUserActionAndTransactionEventsFront<TTransaction>();

if (t.Tx) {
Y_ABORT_UNLESS(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
Expand All @@ -1666,7 +1738,6 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,
Y_ABORT_UNLESS(t.ChangeConfig);
}


RemoveDistrTx();
}

Expand Down Expand Up @@ -1717,7 +1788,6 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx) {
Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE);


if (ChangeConfig) {
EndChangePartitionConfig(ChangeConfig->Config,
ChangeConfig->TopicConverter,
Expand Down Expand Up @@ -1782,7 +1852,6 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
PendingPartitionConfig = nullptr;
}


ProcessTxsAndUserActs(ctx);

if (ChangeConfig && CurrentStateFunc() == &TThis::StateIdle) {
Expand Down Expand Up @@ -1847,56 +1916,17 @@ void TPartition::ResendPendingEvents(const TActorContext& ctx)
}
}

void TPartition::ProcessDistrTx(const TActorContext& ctx)
bool TPartition::ProcessUserActionOrTransaction(const TEvPersQueue::TEvProposeTransaction& event,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(!TxInProgress);

Y_ABORT_UNLESS(!DistrTxs.empty());
TTransaction& t = DistrTxs.front();

if (t.Tx) {
t.Predicate = BeginTransaction(*t.Tx, ctx);

ctx.Send(Tablet,
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
t.Tx->TxId,
Partition,
*t.Predicate).Release());

TxInProgress = true;
} else if (t.ProposeConfig) {
t.Predicate = BeginTransaction(*t.ProposeConfig);

PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
//Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");

ctx.Send(Tablet,
MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
t.ProposeConfig->TxId,
Partition).Release());

TxInProgress = true;
} else {
Y_ABORT_UNLESS(!ChangeConfig);

ChangeConfig = t.ChangeConfig;
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
SendChangeConfigReply = t.SendReply;
BeginChangePartitionConfig(ChangeConfig->Config, ctx);

RemoveDistrTx();
if (AffectedUsers.size() >= MAX_USERS) {
return false;
}
}

void TPartition::ProcessImmediateTxs(const TActorContext& ctx)
{
Y_ABORT_UNLESS(!UsersInfoWriteInProgress);

while (!ImmediateTxs.empty() && (AffectedUsers.size() < MAX_USERS)) {
ProcessImmediateTx(ImmediateTxs.front()->Record, ctx);
ProcessImmediateTx(event.Record, ctx);
RemoveImmediateTx();

RemoveImmediateTx();
}
return true;
}

void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
Expand Down Expand Up @@ -1948,15 +1978,17 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
NKikimrPQ::TEvProposeTransactionResult::COMPLETE);
}

void TPartition::ProcessUserActs(const TActorContext& ctx)
bool TPartition::ProcessUserActionOrTransaction(TEvPQ::TEvSetClientInfo& act,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(!UsersInfoWriteInProgress);
if (AffectedUsers.size() >= MAX_USERS) {
return false;
}

while (!UserActs.empty() && (AffectedUsers.size() < MAX_USERS)) {
ProcessUserAct(*UserActs.front(), ctx);
ProcessUserAct(act, ctx);
RemoveUserAct();

RemoveUserAct();
}
return true;
}

void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
Expand Down
Loading