Skip to content

Commit

Permalink
Fix balancing of partitions for special reading sessions (#10779)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Oct 23, 2024
1 parent 22600a9 commit 8a7394e
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 23 deletions.
85 changes: 62 additions & 23 deletions ydb/core/persqueue/read_balancer__balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,12 @@ bool TPartitionFamily::Reset(ETargetStatus targetStatus, const TActorContext& ct
GetPrefix() << " has been released for merge but target family is not exists.");
return true;
}
Consumer.MergeFamilies(it->second.get(), this, ctx);
auto* targetFamily = it->second.get();
if (targetFamily->CanAttach(Partitions) && targetFamily->CanAttach(WantedPartitions)) {
Consumer.MergeFamilies(targetFamily, this, ctx);
} else {
WantedPartitions.clear();
}

return true;
}
Expand Down Expand Up @@ -477,6 +482,23 @@ bool TPartitionFamily::PossibleForBalance(TSession* session) {
return session->Pipe != LastPipe;
}

template<typename TCollection>
bool TPartitionFamily::CanAttach(const TCollection& partitionsIds) {
if (partitionsIds.empty()) {
return true;
}

if (Consumer.WithCommonSessions) {
return true;
}

return AnyOf(SpecialSessions, [&](const auto& s) {
return s.second->AllPartitionsReadable(partitionsIds);
});
}

template bool TPartitionFamily::CanAttach(const std::unordered_set<ui32>& partitionsIds);
template bool TPartitionFamily::CanAttach(const std::vector<ui32>& partitionsIds);

void TPartitionFamily::ClassifyPartitions() {
auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(Partitions);
Expand Down Expand Up @@ -586,6 +608,7 @@ TConsumer::TConsumer(TBalancer& balancer, const TString& consumerName)
: Balancer(balancer)
, ConsumerName(consumerName)
, NextFamilyId(0)
, WithCommonSessions(false)
, BalanceScheduled(false)
{
}
Expand Down Expand Up @@ -881,6 +904,7 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c
}
} else {
OrderedSessions.reset();
WithCommonSessions = true;
}
}

Expand All @@ -901,6 +925,9 @@ void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext&
Sessions.erase(session->Pipe);
if (!session->WithGroups()) {
OrderedSessions.reset();
WithCommonSessions = AnyOf(Sessions, [](const auto s) {
return !s.second->WithGroups();
});
}

for (auto* family : Snapshot(Families)) {
Expand All @@ -920,6 +947,11 @@ void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext&
}
}
}

if (!family->CanAttach(family->WantedPartitions)) {
targetStatus = TPartitionFamily::ETargetStatus::Destroy;
}

if (family->Reset(targetStatus, ctx)) {
UnreadableFamilies[family->Id] = family;
FamiliesRequireBalancing.erase(family->Id);
Expand Down Expand Up @@ -1020,34 +1052,41 @@ bool TConsumer::ProccessReadingFinished(ui32 partitionId, bool wasInactive, cons
});

if (partition.NeedReleaseChildren()) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
GetPrefix() << "Attache partitions [" << JoinRange(", ", newPartitions.begin(), newPartitions.end()) << "] to " << family->DebugStr());
for (auto id : newPartitions) {
auto* node = GetPartitionGraph().GetPartition(id);
bool allParentsMerged = true;
if (node->Parents.size() > 1) {
// The partition was obtained as a result of the merge.
for (auto* c : node->Parents) {
auto* other = FindFamily(c->Id);
if (!other) {
allParentsMerged = false;
continue;
}
if (family->CanAttach(std::vector{id})) {
auto* node = GetPartitionGraph().GetPartition(id);
bool allParentsMerged = true;
if (node->Parents.size() > 1) {
// The partition was obtained as a result of the merge.
for (auto* c : node->Parents) {
auto* other = FindFamily(c->Id);
if (!other) {
allParentsMerged = false;
continue;
}

if (other != family) {
auto [f, v] = MergeFamilies(family, other, ctx);
allParentsMerged = v;
family = f;
if (other != family) {
auto [f, v] = MergeFamilies(family, other, ctx);
allParentsMerged = v;
family = f;
}
}
}
}

if (allParentsMerged) {
auto* other = FindFamily(id);
if (other && other != family) {
auto [f, _] = MergeFamilies(family, other, ctx);
family = f;
} else {
family->AttachePartitions({id}, ctx);
if (allParentsMerged) {
auto* other = FindFamily(id);
if (other && other != family) {
auto [f, _] = MergeFamilies(family, other, ctx);
family = f;
} else {
family->AttachePartitions({id}, ctx);
}
}
} else {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
GetPrefix() << "Can't attache partition " << id << " to " << family->DebugStr());
}
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/read_balancer__balancing.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ struct TPartitionFamily {
void InactivatePartition(ui32 partitionId);

bool PossibleForBalance(TSession* session);
template<typename TCollection>
bool CanAttach(const TCollection& partitionsIds);

TString DebugStr() const;

Expand Down Expand Up @@ -186,6 +188,7 @@ struct TConsumer {
// All reading sessions in which the family is currently being read.
std::unordered_map<TActorId, TSession*> Sessions;
std::optional<TOrderedSessions> OrderedSessions;
bool WithCommonSessions;

// Families is not reading now.
std::unordered_map<size_t, TPartitionFamily*> UnreadableFamilies;
Expand Down
37 changes: 37 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
readSession2->Close();
}

Y_UNIT_TEST(ReBalancingAfterSplit_sessionsWithPartition) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 2, 100);

TTopicClient client = setup.MakeClient();

auto writeSession = CreateWriteSession(client, "producer-1", 0);
UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2)));
writeSession->Close();

ui64 txId = 1023;
SplitPartition(setup, ++txId, 0, "a");

auto readSession1 = CreateTestReadSession({ .Name="Session-1", .Setup=setup, .Sdk = SdkVersion::Topic, .AutoCommit = false, .Partitions = {1}, .AutoPartitioningSupport = true });
auto readSession0 = CreateTestReadSession({ .Name="Session-0", .Setup=setup, .Sdk = SdkVersion::Topic, .ExpectedMessagesCount = 1, .AutoCommit = false, .Partitions = {0}, .AutoPartitioningSupport = true });

readSession0->WaitAndAssertPartitions({0}, "Must read partition 0");
readSession0->WaitAllMessages();

for(size_t i = 0; i < 10; ++i) {
auto events = readSession0->GetEndedPartitionEvents();
if (events.empty()) {
Sleep(TDuration::Seconds(1));
continue;
}
readSession0->Commit();
break;
}

readSession0->Close();

readSession0 = CreateTestReadSession({ .Name="Session-0", .Setup=setup, .Sdk = SdkVersion::Topic, .AutoCommit = false, .Partitions = {0}, .AutoPartitioningSupport = true });
readSession0->WaitAndAssertPartitions({0}, "Must read partition 0 because no more readers of it");

readSession0->Close();
}

Y_UNIT_TEST(MidOfRange) {
auto AsString = [](std::vector<ui16> vs) {
TStringBuilder a;
Expand Down

0 comments on commit 8a7394e

Please sign in to comment.