Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 50 additions & 53 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1369,20 +1369,30 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

client.CreateTopic(TEST_TOPIC, createSettings).Wait();

auto msg = TString(1_MB, 'a');
auto commit = [&](const std::string& sessionId, ui64 offset) {
return setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, offset, sessionId);
};

auto getConsumerState = [&](ui32 partition) {
auto description = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);

auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats();
UNIT_ASSERT(stats);
return stats;
};

auto msg = TString("msg-value-1");

auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, std::string{TEST_TOPIC}, false);
auto seqNo = 1;
{
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
Sleep(TDuration::Seconds(5));

auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}

{
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
Expand All @@ -1392,15 +1402,21 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
writeSession_2->Close();
Sleep(TDuration::Seconds(15));
}

{
ui64 txId = 1006;
SplitPartition(setup, ++txId, 0, "a");

auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
}

auto writeSession_3 = CreateWriteSession(client, "producer-2", 1, std::string{TEST_TOPIC}, false);
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));

{
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
}
auto reader = client.CreateReadSession(
TReadSessionSettings()
.AutoPartitioningSupport(true)
Expand Down Expand Up @@ -1429,61 +1445,40 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
if (message.GetSeqNo() == 6) {
if (!commitSent) {
commitSent = true;
Sleep(TDuration::MilliSeconds(300));

readSessionId = message.GetPartitionSession()->GetReadSessionId();
TCommitOffsetSettings commitSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 8, commitSettings).GetValueSync();
UNIT_ASSERT(status.IsSuccess());

{
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());

auto description = result.GetConsumerDescription();
auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8);
UNIT_ASSERT(status.IsSuccess());

auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
UNIT_ASSERT(stats);
Sleep(TDuration::MilliSeconds(500));

UNIT_ASSERT(stats->GetCommittedOffset() == 8);
auto stats = getConsumerState(0);
UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
}

// must be ignored, because commit to past
TCommitOffsetSettings commitToPastSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
auto commitToPastStatus = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitToPastSettings).GetValueSync();
UNIT_ASSERT(commitToPastStatus.IsSuccess());

{
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());

auto description = result.GetConsumerDescription();
// must be ignored, because commit to past
auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0);
UNIT_ASSERT(status.IsSuccess());

auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
UNIT_ASSERT(stats);
Sleep(TDuration::MilliSeconds(500));

UNIT_ASSERT(stats->GetCommittedOffset() == 8);
auto stats = getConsumerState(0);
UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
}

TCommitOffsetSettings commitSettingsWrongSession {.ReadSessionId_ = "random_session"};
auto statusWrongSession = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitSettingsWrongSession).GetValueSync();
UNIT_ASSERT(!statusWrongSession.IsSuccess());

/* TODO uncomment this
{
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());

auto description = result.GetConsumerDescription();
// must be ignored, because wrong sessionid
auto status = commit("random session", 0);
UNIT_ASSERT(!status.IsSuccess());

auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
UNIT_ASSERT(stats);
Sleep(TDuration::MilliSeconds(500));

UNIT_ASSERT(stats->GetCommittedOffset() == 8);
auto stats = getConsumerState(0);
UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
}

*/
} else {
UNIT_ASSERT(false);
}
Expand All @@ -1493,28 +1488,30 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
x->Confirm();
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
x->Confirm();
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
x->Confirm();
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
x->Confirm();
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
x->Confirm();
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
x->Confirm();
} else if (auto* sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
Cerr << sessionClosedEvent->DebugString() << Endl << Flush;
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else {
Cerr << "SESSION EVENT unhandled \n";
Cerr << "SESSION EVENT unhandled " << x->DebugString() << Endl << Flush;
}
}
Sleep(TDuration::MilliSeconds(250));
}

writeSession_3->Close();
}

Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_NotSplitedTopic) {
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ Y_UNIT_TEST_SUITE(WithSDK) {
}
UNIT_ASSERT_C(endTime > TInstant::Now(), "Unable wait");
}

session->Close(TDuration::Seconds(1));
}

// Check describe for topic wich contains messages, has commited offset of first message and read second message
Expand All @@ -158,8 +160,94 @@ Y_UNIT_TEST_SUITE(WithSDK) {
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset());
}
}

Y_UNIT_TEST(CommitWithWrongSessionId) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1);

setup.Write("message-1");
setup.Write("message-2");
setup.Write("message-3");

{
auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1, "wrong-read-session-id");
UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");

auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
UNIT_ASSERT_VALUES_EQUAL(0, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
}
}

Y_UNIT_TEST(CommitToPastWithWrongSessionId) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1);

setup.Write("message-1");
setup.Write("message-2");
setup.Write("message-3");

{
auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 2);
UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");

auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
}

{
auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id");
UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");

auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
}
}

/* TODO Uncomment this test
Y_UNIT_TEST(CommitToParentPartitionWithWrongSessionId) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopicWithAutoscale();

setup.Write("message-1", 0);

{
ui64 txId = 1006;
SplitPartition(setup, ++txId, 0, "a");
}

setup.Write("message-2", 1);

Cerr << ">>>>> BEGIN 0" << Endl << Flush;
{
auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1);
UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");

auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
}

Cerr << ">>>>> BEGIN 1" << Endl << Flush;
{
auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 1, 1);
UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");

auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(1).GetPartitionConsumerStats()->GetCommittedOffset());
}

Cerr << ">>>>> BEGIN 2" << Endl << Flush;
{
auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id");
UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");

auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
UNIT_ASSERT_VALUES_EQUAL_C(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset(), "Offset doesn`t changed");
}
Cerr << ">>>>> END" << Endl << Flush;

}
*/
}

} // namespace NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,26 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, c
return status.GetConsumerDescription();
}

TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset) {
void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId) {
TTopicClient client(MakeDriver());

return client.CommitOffset(path, partitionId, consumerName, offset).GetValueSync();
TWriteSessionSettings settings;
settings.Path(TEST_TOPIC);
settings.PartitionId(partitionId);
settings.DeduplicationEnabled(false);
auto session = client.CreateSimpleBlockingWriteSession(settings);

TWriteMessage msg(TStringBuilder() << message);
UNIT_ASSERT(session->Write(std::move(msg)));

session->Close(TDuration::Seconds(5));
}

TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId) {
TTopicClient client(MakeDriver());

TCommitOffsetSettings commitSettings {.ReadSessionId_ = sessionId};
return client.CommitOffset(path, partitionId, consumerName, offset, commitSettings).GetValueSync();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class TTopicSdkTestSetup {
TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC});
TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER});

TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset);
void Write(const std::string& message, ui32 partitionId = 0);
TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId = std::nullopt);

TString GetEndpoint() const;
TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const;
Expand Down
9 changes: 7 additions & 2 deletions ydb/services/persqueue_v1/actors/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,19 +408,21 @@ struct TEvPQProxy {


struct TEvCommitDone : public NActors::TEventLocal<TEvCommitDone, EvCommitDone> {
explicit TEvCommitDone(const ui64 assignId, const ui64 startCookie, const ui64 lastCookie, const ui64 offset, const ui64 endOffset)
explicit TEvCommitDone(const ui64 assignId, const ui64 startCookie, const ui64 lastCookie, const ui64 offset, const ui64 endOffset, const bool readingFinishedSent)
: AssignId(assignId)
, StartCookie(startCookie)
, LastCookie(lastCookie)
, Offset(offset)
, EndOffset(endOffset)
, ReadingFinishedSent(readingFinishedSent)
{ }

ui64 AssignId;
ui64 StartCookie;
ui64 LastCookie;
ui64 Offset;
ui64 EndOffset;
bool ReadingFinishedSent;
};

struct TEvParentCommitedToFinish : public NActors::TEventLocal<TEvParentCommitedToFinish, EvParentCommitedToFinish> {
Expand Down Expand Up @@ -645,12 +647,13 @@ struct TEvPQProxy {
};

struct TEvReadingFinished : public TEventLocal<TEvReadingFinished, EvReadingFinished> {
TEvReadingFinished(const TString& topic, ui32 partitionId, bool first, std::vector<ui32>&& adjacentPartitionIds, std::vector<ui32> childPartitionIds)
TEvReadingFinished(const TString& topic, ui32 partitionId, bool first, std::vector<ui32>&& adjacentPartitionIds, std::vector<ui32> childPartitionIds, ui64 endOffset)
: Topic(topic)
, PartitionId(partitionId)
, FirstMessage(first)
, AdjacentPartitionIds(std::move(adjacentPartitionIds))
, ChildPartitionIds(std::move(childPartitionIds))
, EndOffset(endOffset)
{}

TString Topic;
Expand All @@ -659,6 +662,8 @@ struct TEvPQProxy {

std::vector<ui32> AdjacentPartitionIds;
std::vector<ui32> ChildPartitionIds;

ui64 EndOffset;
};

struct TEvAlterTopicResponse : public TEventLocal<TEvAlterTopicResponse, EvAlterTopicResponse>
Expand Down
Loading
Loading