Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,

TString TPartition::GetKeyConfig() const
{
return Sprintf("_config_%u", Partition.OriginalPartitionId);
return Sprintf("_config_%u", Partition.InternalPartitionId);
}

void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
Y_ABORT("bad status");
};

// There should be no consumers in the configuration of the background partition. When creating a partition,
// the PQ tablet specifically removes all consumer settings from the config.
Y_ABORT_UNLESS(!Partition()->IsSupportive() ||
(Partition()->Config.GetConsumers().empty() && Partition()->TabletConfig.GetConsumers().empty()));

Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition.OriginalPartitionId);
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);

Expand Down Expand Up @@ -395,6 +400,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
case NKikimrProto::OVERRUN: {
auto& sourceIdStorage = Partition()->SourceIdStorage;
auto& usersInfoStorage = Partition()->UsersInfoStorage;
const bool isSupportive = Partition()->IsSupportive();

for (ui32 i = 0; i < range.PairSize(); ++i) {
const auto& pair = range.GetPair(i);
Expand All @@ -416,9 +422,9 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
} else if (type == TKeyPrefix::MarkProtoSourceId) {
sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
} else if (type == TKeyPrefix::MarkUser) {
} else if ((type == TKeyPrefix::MarkUser) && !isSupportive) {
usersInfoStorage->Parse(*key, pair.GetValue(), ctx);
} else if (type == TKeyPrefix::MarkUserDeprecated) {
} else if ((type == TKeyPrefix::MarkUserDeprecated) && !isSupportive) {
usersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
}
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ void TPartition::SendReadingFinished(const TString& consumer) {
}

void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
if (IsSupportive()) {
return;
}

TSet<TString> hasReadRule;

for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ NKikimrPQ::TPQTabletConfig TPersQueue::MakeSupportivePartitionConfig() const
partitionConfig.MutableReadRuleServiceTypes()->Clear();
partitionConfig.MutableReadRuleVersions()->Clear();
partitionConfig.MutableReadRuleGenerations()->Clear();
partitionConfig.MutableConsumers()->Clear();

return partitionConfig;
}
Expand Down
Binary file not shown.
Binary file not shown.
109 changes: 109 additions & 0 deletions ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <library/cpp/logger/stream.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/streams/bzip2/bzip2.h>

namespace NYdb::NTopic::NTests {

Expand Down Expand Up @@ -87,6 +88,7 @@ class TFixture : public NUnitTest::TBaseFixture {
TTopicDescription DescribeTopic(const TString& path);

void AddConsumer(const TString& topicPath, const TVector<TString>& consumers);
void DropConsumer(const TString& topicPath, const TVector<TString>& consumers);
void AlterAutoPartitioning(const TString& topicPath,
ui64 minActivePartitions,
ui64 maxActivePartitions,
Expand Down Expand Up @@ -148,6 +150,9 @@ class TFixture : public NUnitTest::TBaseFixture {
void RestartLongTxService();
void RestartPQTablet(const TString& topicPath, ui32 partition);
void DumpPQTabletKeys(const TString& topicName, ui32 partition);
void PQTabletPrepareFromResource(const TString& topicPath,
ui32 partitionId,
const TString& resourceName);

void DeleteSupportivePartition(const TString& topicName,
ui32 partition);
Expand Down Expand Up @@ -483,6 +488,20 @@ void TFixture::AddConsumer(const TString& topicPath,
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TFixture::DropConsumer(const TString& topicPath,
const TVector<TString>& consumers)
{
NTopic::TTopicClient client(GetDriver());
NTopic::TAlterTopicSettings settings;

for (const auto& consumer : consumers) {
settings.AppendDropConsumers(consumer);
}

auto result = client.AlterTopic(topicPath, settings).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TFixture::AlterAutoPartitioning(const TString& topicPath,
ui64 minActivePartitions,
ui64 maxActivePartitions,
Expand Down Expand Up @@ -1843,6 +1862,51 @@ void TFixture::DumpPQTabletKeys(const TString& topicName)
}
}

void TFixture::PQTabletPrepareFromResource(const TString& topicPath,
ui32 partitionId,
const TString& resourceName)
{
auto& runtime = Setup->GetRuntime();
TActorId edge = runtime.AllocateEdgeActor();
ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicPath, partitionId);

auto request = MakeHolder<TEvKeyValue::TEvRequest>();
size_t count = 0;

for (TStringStream stream(NResource::Find(resourceName)); true; ++count) {
TString key, encoded;

if (!stream.ReadTo(key, ' ')) {
break;
}
encoded = stream.ReadLine();

auto decoded = Base64Decode(encoded);
TStringInput decodedStream(decoded);
TBZipDecompress decompressor(&decodedStream);

auto* cmd = request->Record.AddCmdWrite();
cmd->SetKey(key);
cmd->SetValue(decompressor.ReadAll());
}

runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries());

TAutoPtr<IEventHandle> handle;
auto* response = runtime.GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
UNIT_ASSERT(response);
UNIT_ASSERT(response->Record.HasStatus());
UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);

UNIT_ASSERT_VALUES_EQUAL(response->Record.WriteResultSize(), count);

for (size_t i = 0; i < response->Record.WriteResultSize(); ++i) {
const auto &result = response->Record.GetWriteResult(i);
UNIT_ASSERT(result.HasStatus());
UNIT_ASSERT_EQUAL(result.GetStatus(), NKikimrProto::OK);
}
}

void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d)
{
for (auto& topic : d.Topics) {
Expand Down Expand Up @@ -3059,6 +3123,51 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
}
}

Y_UNIT_TEST_F(The_Configuration_Is_Changing_As_We_Write_To_The_Topic, TFixture)
{
// To test that you can change the topic configuration while writing to the partition

CreateTopic("topic_A", TEST_CONSUMER, 2);

AddConsumer("topic_A", {"consumer1", "consumer2"});

auto session = CreateTableSession();
auto tx = BeginTx(session);

WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(100, 'x'), &tx, 0);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(100, 'x'), &tx, 1);

WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);

RestartPQTablet("topic_A", 0);

DropConsumer("topic_A", {"consumer1"});

WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_3, TString(100, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_3);

RestartPQTablet("topic_A", 0);

CommitTx(tx);

CheckTabletKeys("topic_A");
}

Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixture)
{
// In the test, we check the compatibility between versions `24-4-2` and `24-4-*/25-1-*`. To do this, the data
// obtained on the `24-4-2` version is loaded into the PQ tablets.

CreateTopic("topic_A", TEST_CONSUMER, 2);

PQTabletPrepareFromResource("topic_A", 0, "topic_A_partition_0_v24-4-2.dat");
PQTabletPrepareFromResource("topic_A", 1, "topic_A_partition_1_v24-4-2.dat");

RestartPQTablet("topic_A", 0);
RestartPQTablet("topic_A", 1);
}

}

}
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/src/client/topic/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ SRCS(
trace_ut.cpp
)

RESOURCE(
ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_0_v24-4-2.dat topic_A_partition_0_v24-4-2.dat
ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_1_v24-4-2.dat topic_A_partition_1_v24-4-2.dat
)

END()
Loading