Skip to content

Commit

Permalink
More test for autopartitionin of topics (ydb-platform#10613)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Oct 18, 2024
1 parent 2a74bac commit 7b9578c
Showing 1 changed file with 58 additions and 31 deletions.
89 changes: 58 additions & 31 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,63 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

ui64 GetBalancerTabletId(TTopicSdkTestSetup& setup, const TString& topicPath) {
auto pathDescr = setup.GetServer().AnnoyingClient->Ls(topicPath)->Record.GetPathDescription().GetSelf();
auto balancerTabletId = pathDescr.GetBalancerTabletID();
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
UNIT_ASSERT(balancerTabletId);
return balancerTabletId;
}

void SplitPartition(TTopicSdkTestSetup& setup, const TString& topicPath, ui32 partitionId) {
auto balancerTabletId = GetBalancerTabletId(setup, topicPath);
auto edge = setup.GetRuntime().AllocateEdgeActor();
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(partitionId, NKikimrPQ::EScaleStatus::NEED_SPLIT));
}

void AssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) {
auto client = setup.MakeClient();
auto describe = client.DescribeTopic(topicPath).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), expectedCount);
}

void WaitAndAssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) {
auto client = setup.MakeClient();
size_t partitionCount = 0;
for (size_t i = 0; i < 10; ++i) {
Sleep(TDuration::Seconds(1));
auto describe = client.DescribeTopic(topicPath).GetValueSync();
partitionCount = describe.GetTopicDescription().GetPartitions().size();
if (partitionCount == expectedCount) {
break;
}
}
UNIT_ASSERT_VALUES_EQUAL(partitionCount, expectedCount);
}

Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
auto tableClient = setup.MakeTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");

ExecuteQuery(session, R"(
--!syntax_v1
CREATE TOPIC `/Root/dir/origin`
WITH (
AUTO_PARTITIONING_STRATEGY = 'SCALE_UP',
MAX_ACTIVE_PARTITIONS = 50
);
)");

AssertPartitionCount(setup, "/Root/dir/origin", 1);
SplitPartition(setup, "/Root/dir/origin", 0);
WaitAndAssertPartitionCount(setup, "/Root/dir/origin", 3);
}

Y_UNIT_TEST(CDC_PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
auto client = setup.MakeClient();
auto tableClient = setup.MakeTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

Expand All @@ -954,36 +1008,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
);
)");

{
auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}

ui64 balancerTabletId;
{
auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/origin/feed/streamImpl")->Record.GetPathDescription().GetSelf();
balancerTabletId = pathDescr.GetBalancerTabletID();
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
UNIT_ASSERT(balancerTabletId);
}

{
const auto edge = setup.GetRuntime().AllocateEdgeActor();
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
}

{
size_t partitionCount = 0;
for (size_t i = 0; i < 10; ++i) {
Sleep(TDuration::Seconds(1));
auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
partitionCount = describe.GetTopicDescription().GetPartitions().size();
if (partitionCount == 3) {
break;
}
}
UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
}
AssertPartitionCount(setup, "/Root/origin/feed", 1);
SplitPartition(setup, "/Root/origin/feed/streamImpl", 0);
WaitAndAssertPartitionCount(setup, "/Root/origin/feed", 3);
}

Y_UNIT_TEST(MidOfRange) {
Expand Down

0 comments on commit 7b9578c

Please sign in to comment.