Skip to content

Commit

Permalink
Send EndPartitionSession control message (ydb-platform#4351)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored and MrLolthe1st committed May 28, 2024
1 parent 68d7b4c commit 595ba23
Show file tree
Hide file tree
Showing 28 changed files with 503 additions and 64 deletions.
16 changes: 8 additions & 8 deletions ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk
continue;
}

auto tabletIdIt = topicIt->second.PartitionIdToTabletId.find(partitionRequest.PartitionIndex);
if (tabletIdIt == topicIt->second.PartitionIdToTabletId.end()) {
auto tabletIdIt = topicIt->second.Partitions.find(partitionRequest.PartitionIndex);
if (tabletIdIt == topicIt->second.Partitions.end()) {
AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx);
continue;
}

ui64 tabletId = tabletIdIt->second;
ui64 tabletId = tabletIdIt->second.TabletId;

if (!TabletIdToPipe.contains(tabletId)) {
NTabletPipe::TClientConfig clientConfig;
Expand All @@ -114,9 +114,9 @@ void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk
commit->SetStrict(true);

PendingResponses++;
KAFKA_LOG_D("Send commit request for group# " << Message->GroupId.value() <<
", topic# " << topicIt->second.TopicNameConverter->GetPrimaryPath() <<
", partition# " << partitionRequest.PartitionIndex <<
KAFKA_LOG_D("Send commit request for group# " << Message->GroupId.value() <<
", topic# " << topicIt->second.TopicNameConverter->GetPrimaryPath() <<
", partition# " << partitionRequest.PartitionIndex <<
", offset# " << partitionRequest.CommittedOffset);

TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
Expand Down Expand Up @@ -172,7 +172,7 @@ void TKafkaOffsetCommitActor::AddPartitionResponse(EKafkaErrors error, const TSt

void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
THashSet<TString> topicsToResolve;
for (auto topicReq: Message->Topics) {
for (auto topicReq: Message->Topics) {
topicsToResolve.insert(NormalizePath(Context->DatabasePath, topicReq.Name.value()));
}

Expand All @@ -183,7 +183,7 @@ void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
topicConverterFactory
);

auto topicsToConverter = topicHandler->GetReadTopicsList(topicsToResolve, false, Context->DatabasePath);
if (!topicsToConverter.IsValid) {
KAFKA_LOG_CRIT("Commit offsets failed. reason# topicsToConverter is not valid");
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
res->Record.SetCookie(*cookie);
}
res->Record.SetReadingFinished(readingFinished);
if (readingFinished) {
ui32 partitionId = Partition.OriginalPartitionId;

auto* node = PartitionGraph.GetPartition(partitionId);
for (auto* child : node->Children) {
res->Record.AddChildPartitionIds(child->Id);

for (auto* p : child->Parents) {
if (p->Id != partitionId) {
res->Record.AddAdjacentPartitionIds(p->Id);
}
}
}
}

return res;
}
Expand Down
38 changes: 27 additions & 11 deletions ydb/core/persqueue/read_balancer__balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ void TPartitionFamily::Release(const TActorContext& ctx, ETargetStatus targetSta

--Session->ActiveFamilyCount;
++Session->ReleasingFamilyCount;
--Consumer.ActiveFamilyCount;

for (auto partitionId : LockedPartitions) {
ctx.Send(Session->Sender, MakeEvReleasePartition(partitionId).release());
Expand Down Expand Up @@ -223,10 +222,6 @@ bool TPartitionFamily::Reset(const TActorContext& ctx) {
}

bool TPartitionFamily::Reset(ETargetStatus targetStatus, const TActorContext& ctx) {
if (IsActive()) {
--Consumer.ActiveFamilyCount;
}

Session->Families.erase(this->Id);
Session = nullptr;

Expand Down Expand Up @@ -310,7 +305,6 @@ void TPartitionFamily::StartReading(TSession& session, const TActorContext& ctx)
Session->InactivePartitionCount += InactivePartitionCount;

++Session->ActiveFamilyCount;
++Consumer.ActiveFamilyCount;

LastPipe = Session->Pipe;

Expand Down Expand Up @@ -411,11 +405,28 @@ void TPartitionFamily::Merge(TPartitionFamily* other) {
other->ChangePartitionCounters(-other->ActivePartitionCount, -other->InactivePartitionCount);

UpdateSpecialSessions();

if (other->IsActive()) {
--other->Session->ActiveFamilyCount;
}
}

TString TPartitionFamily::DebugStr() const {
return TStringBuilder() << "family=" << Id << " (Status=" << Status
<< ", Partitions=[" << JoinRange(", ", Partitions.begin(), Partitions.end()) << "], SpecialSessions=" << SpecialSessions.size() << ")";
TStringBuilder sb;
sb << "family=" << Id << " (Status=" << Status
<< ", Partitions=[" << JoinRange(", ", Partitions.begin(), Partitions.end()) << "]";
if (!WantedPartitions.empty()) {
sb << ", WantedPartitions=[" << JoinRange(", ", WantedPartitions.begin(), WantedPartitions.end()) << "]";
}
if (!SpecialSessions.empty()) {
sb << ", SpecialSessions=" << SpecialSessions.size();
}
if (Session) {
sb << ", Session=" << Session->DebugStr();
}
sb << ")";

return sb;
}

TPartition* TPartitionFamily::GetPartition(ui32 partitionId) {
Expand Down Expand Up @@ -549,7 +560,6 @@ TConsumer::TConsumer(TBalancer& balancer, const TString& consumerName)
: Balancer(balancer)
, ConsumerName(consumerName)
, NextFamilyId(0)
, ActiveFamilyCount(0)
, BalanceScheduled(false)
{
}
Expand Down Expand Up @@ -615,6 +625,7 @@ TPartitionFamily* TConsumer::CreateFamily(std::vector<ui32>&& partitions, TParti
auto [it, _] = Families.emplace(id, std::make_unique<TPartitionFamily>(*this, id, std::move(partitions)));
auto* family = it->second.get();

family->Status = status;
if (status == TPartitionFamily::EStatus::Free) {
UnreadableFamilies[id] = family;
}
Expand Down Expand Up @@ -702,6 +713,9 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d
f->LastPipe = family->LastPipe;
if (f->Session) {
f->Session->Families.try_emplace(f->Id, f);
if (f->IsActive()) {
++f->Session->ActiveFamilyCount;
}
}

newFamilies.push_back(f);
Expand Down Expand Up @@ -814,7 +828,7 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c

if (session->WithGroups()) {
for (auto& [_, family] : Families) {
if (session->AllPartitionsReadable(family->Partitions)) {
if (session->AllPartitionsReadable(family->Partitions) && session->AllPartitionsReadable(family->WantedPartitions)) {
family->SpecialSessions[session->Pipe] = session;
FamiliesRequireBalancing[family->Id] = family.get();
}
Expand Down Expand Up @@ -1180,6 +1194,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
if (sit == sessions.end()) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
GetPrefix() << "balancing of the " << family->DebugStr() << " failed because there are no suitable reading sessions.");

continue;
}

Expand Down Expand Up @@ -1264,7 +1279,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
}

bool hasGoodestSession = false;
size_t targetPartitionCount = family->Session->ActivePartitionCount - family->ActivePartitionCount;
size_t targetPartitionCount = family->Session->ActiveFamilyCount - 1;
for (auto [_, s] : family->SpecialSessions) {
if (s == family->Session) {
continue;
Expand All @@ -1281,6 +1296,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
} else {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
GetPrefix() << "skip balancing " << family->DebugStr() << " because it is already being read by the best session.");
++it;
}
}
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/read_balancer__balancing.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ struct TConsumer {

std::unordered_map<ui32, TPartition> Partitions;

size_t ActiveFamilyCount;
bool BalanceScheduled;

TConsumer(TBalancer& balancer, const TString& consumerName);
Expand Down
Loading

0 comments on commit 595ba23

Please sign in to comment.