From d59364c51f0e8a99af9471e271b41fb06b4f0b1f Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 11 Apr 2025 14:13:11 +0000 Subject: [PATCH] More test utils --- ydb/tests/functional/replication/utils.h | 53 ++++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/ydb/tests/functional/replication/utils.h b/ydb/tests/functional/replication/utils.h index 03c4d54be618..0fd899288c54 100644 --- a/ydb/tests/functional/replication/utils.h +++ b/ydb/tests/functional/replication/utils.h @@ -47,11 +47,21 @@ struct Checker : public IChecker { UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg); } - T Get(const ::Ydb::Value& value); + virtual T Get(const ::Ydb::Value& value); T Expected; }; +struct DateTimeChecker : public Checker { + DateTimeChecker(TInstant&& expected) + : Checker(std::move(expected)) { + } + + TInstant Get(const ::Ydb::Value& value) override { + return TInstant::Seconds(value.uint32_value()); + } +}; + template<> inline bool Checker::Get(const ::Ydb::Value& value) { return value.bool_value(); @@ -95,6 +105,14 @@ std::pair> _C(TString&& name, T&& expected) { }; } +template +std::pair> _T(TString&& name, T&& expected) { + return { + std::move(name), + std::make_shared(std::move(expected)) + }; +} + struct TMessage { TString Message; std::optional Partition = std::nullopt; @@ -150,6 +168,7 @@ struct MainTestCase { , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE")) , TopicName(TStringBuilder() << "Topic_" << Id) , SourceTableName(TStringBuilder() << "SourceTable_" << Id) + , ChangefeedName(TStringBuilder() << "cdc_" << Id) , TableName(TStringBuilder() << "Table_" << Id) , ReplicationName(TStringBuilder() << "Replication_" << Id) , TransferName(TStringBuilder() << "Transfer_" << Id) @@ -209,6 +228,16 @@ struct MainTestCase { ExecuteDDL(Sprintf("DROP TABLE `%s`", SourceTableName.data())); } + void AddChangefeed() { + ExecuteDDL(Sprintf(R"( + ALTER TABLE `%s` + ADD CHANGEFEED `%s` WITH ( + MODE = 'UPDATES', + FORMAT = 'JSON' + ) + )", SourceTableName.data(), ChangefeedName.data())); + } + void CreateTopic(size_t partitionCount = 10) { ExecuteDDL(Sprintf(R"( CREATE TOPIC `%s` @@ -230,14 +259,19 @@ struct MainTestCase { } struct CreateTransferSettings { + std::optional TopicName = std::nullopt; std::optional ConsumerName = std::nullopt; - std::optional FlushInterval; - std::optional BatchSizeBytes; + std::optional FlushInterval = TDuration::Seconds(1); + std::optional BatchSizeBytes = 8_MB; + + CreateTransferSettings() {}; - CreateTransferSettings() - : ConsumerName(std::nullopt) - , FlushInterval(TDuration::Seconds(1)) - , BatchSizeBytes(8_MB) {} + static CreateTransferSettings WithTopic(const TString& topicName, std::optional consumerName = std::nullopt) { + CreateTransferSettings result; + result.TopicName = topicName; + result.ConsumerName = consumerName; + return result; + } static CreateTransferSettings WithConsumerName(const TString& consumerName) { CreateTransferSettings result; @@ -265,6 +299,8 @@ struct MainTestCase { sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl; } + TString topicName = settings.TopicName.value_or(TopicName); + auto ddl = Sprintf(R"( %s; @@ -274,7 +310,7 @@ struct MainTestCase { CONNECTION_STRING = 'grpc://%s' %s ); - )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data(), sb.data()); + )", lambda.data(), TransferName.data(), topicName.data(), TableName.data(), ConnectionString.data(), sb.data()); ExecuteDDL(ddl); } @@ -558,6 +594,7 @@ struct MainTestCase { const TString TopicName; const TString SourceTableName; + const TString ChangefeedName; const TString TableName; const TString ReplicationName; const TString TransferName;