diff --git a/ydb/core/tx/replication/service/logging.h b/ydb/core/tx/replication/service/logging.h new file mode 100644 index 000000000000..7940d5b0e731 --- /dev/null +++ b/ydb/core/tx/replication/service/logging.h @@ -0,0 +1,11 @@ +#pragma once + +#include +#include + +#define LOG_T(stream) LOG_TRACE_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream) +#define LOG_I(stream) LOG_INFO_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream) +#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream) +#define LOG_W(stream) LOG_WARN_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream) +#define LOG_E(stream) LOG_ERROR_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream) diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index 3a549cac3cfd..18e2219b164d 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -1,3 +1,4 @@ +#include "logging.h" #include "topic_reader.h" #include "worker.h" @@ -12,6 +13,8 @@ class TRemoteTopicReader: public TActor { void Handle(TEvWorker::TEvHandshake::TPtr& ev) { Worker = ev->Sender; + LOG_D("Handshake" + << ": worker# " << Worker); Y_ABORT_UNLESS(!ReadSession); Send(YdbProxy, new TEvYdbProxy::TEvCreateTopicReaderRequest(Settings)); @@ -19,16 +22,23 @@ class TRemoteTopicReader: public TActor { void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) { ReadSession = ev->Get()->Result; + LOG_D("Create read session" + << ": session# " << ReadSession); Y_ABORT_UNLESS(Worker); Send(Worker, new TEvWorker::TEvHandshake()); } - void Handle(TEvWorker::TEvPoll::TPtr&) { + void Handle(TEvWorker::TEvPoll::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + Y_ABORT_UNLESS(ReadSession); Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest()); if (CommitOffset) { + LOG_D("Commit offset" + << ": offset# " << CommitOffset); + Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest( Settings.Topics_[0].Path_, Settings.Topics_[0].PartitionIds_[0], @@ -39,6 +49,8 @@ class TRemoteTopicReader: public TActor { } void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + auto& result = ev->Get()->Result; TVector records(Reserve(result.Messages.size())); @@ -53,12 +65,16 @@ class TRemoteTopicReader: public TActor { } void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + if (!ev->Get()->Result.IsSuccess()) { + LOG_N("Unsuccessful commit offset"); Leave(); } } void Leave() { + LOG_I("Leave"); Send(Worker, new TEvents::TEvGone()); PassAway(); } diff --git a/ydb/core/tx/replication/service/topic_reader_ut.cpp b/ydb/core/tx/replication/service/topic_reader_ut.cpp new file mode 100644 index 000000000000..ee77fcfb8677 --- /dev/null +++ b/ydb/core/tx/replication/service/topic_reader_ut.cpp @@ -0,0 +1,116 @@ +#include "topic_reader.h" +#include "worker.h" + +#include +#include +#include +#include + +#include + +namespace NKikimr::NReplication::NService { + +Y_UNIT_TEST_SUITE(RemoteTopicReader) { + template + TActorId CreateReader(Env& env, const NYdb::NTopic::TReadSessionSettings& settings) { + auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings)); + env.SendAsync(reader, new TEvWorker::TEvHandshake()); + + while (true) { + TAutoPtr handle; + auto result = env.GetRuntime().template GrabEdgeEventsRethrow(handle); + if (handle->Sender != reader) { + continue; + } + + if (auto* ev = std::get(result)) { + return reader; + } else if (std::get(result)) { + reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings)); + env.SendAsync(reader, new TEvWorker::TEvHandshake()); + continue; + } else { + UNIT_ASSERT("Unexpected event"); + } + } + } + + template + auto ReadData(Env& env, TActorId& reader, const NYdb::NTopic::TReadSessionSettings& settings) { + reader = CreateReader(env, settings); + env.SendAsync(reader, new TEvWorker::TEvPoll()); + + while (true) { + TAutoPtr handle; + auto result = env.GetRuntime().template GrabEdgeEventsRethrow(handle); + if (handle->Sender != reader) { + continue; + } + + if (auto* ev = std::get(result)) { + return ev->Records; + } else if (std::get(result)) { + reader = CreateReader(env, settings); + env.SendAsync(reader, new TEvWorker::TEvPoll()); + continue; + } + } + } + + Y_UNIT_TEST(ReadTopic) { + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); + + // create topic + { + auto settings = NYdb::NTopic::TCreateTopicSettings() + .BeginAddConsumer() + .ConsumerName("consumer") + .EndAddConsumer(); + + auto ev = env.Send( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + + auto settings = NYdb::NTopic::TReadSessionSettings() + .ConsumerName("consumer") + .AppendTopics(NYdb::NTopic::TTopicReadSettings() + .Path("/Root/topic") + .AppendPartitionIds(0) + ); + + TActorId reader; + + // write, create reader & read + UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1")); + { + auto records = ReadData(env, reader, settings); + UNIT_ASSERT_VALUES_EQUAL(records.size(), 1); + + const auto& record = records.at(0); + UNIT_ASSERT_VALUES_EQUAL(record.Offset, 0); + UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-1"); + } + + // trigger commit, write new data & kill reader + { + env.SendAsync(reader, new TEvWorker::TEvPoll()); + UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2")); + env.SendAsync(reader, new TEvents::TEvPoison()); + } + + // create reader again & read + { + auto records = ReadData(env, reader, settings); + UNIT_ASSERT_VALUES_EQUAL(records.size(), 1); + + const auto& record = records.at(0); + UNIT_ASSERT_VALUES_EQUAL(record.Offset, 1); + UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-2"); + } + } +} + +} diff --git a/ydb/core/tx/replication/service/ut/ya.make b/ydb/core/tx/replication/service/ut/ya.make new file mode 100644 index 000000000000..5dd6457d659e --- /dev/null +++ b/ydb/core/tx/replication/service/ut/ya.make @@ -0,0 +1,3 @@ +RECURSE( + ../ut_topic_reader +) diff --git a/ydb/core/tx/replication/service/ut_topic_reader/ya.make b/ydb/core/tx/replication/service/ut_topic_reader/ya.make new file mode 100644 index 000000000000..0f3385089f2f --- /dev/null +++ b/ydb/core/tx/replication/service/ut_topic_reader/ya.make @@ -0,0 +1,22 @@ +UNITTEST_FOR(ydb/core/tx/replication/service) + +FORK_SUBTESTS() + +SIZE(MEDIUM) + +TIMEOUT(600) + +PEERDIR( + ydb/core/tx/replication/ut_helpers + ydb/core/tx/replication/ydb_proxy + ydb/public/sdk/cpp/client/ydb_topic + library/cpp/testing/unittest +) + +SRCS( + topic_reader_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make index 41b6abd8ffe7..f44bf75d1c04 100644 --- a/ydb/core/tx/replication/service/ya.make +++ b/ydb/core/tx/replication/service/ya.make @@ -10,6 +10,7 @@ PEERDIR( ydb/core/io_formats/cell_maker ydb/core/tx/replication/ydb_proxy ydb/library/actors/core + ydb/library/services library/cpp/json ) diff --git a/ydb/core/tx/replication/ut/ya.make b/ydb/core/tx/replication/ut/ya.make index 5111ece9394a..bfbed1f09eac 100644 --- a/ydb/core/tx/replication/ut/ya.make +++ b/ydb/core/tx/replication/ut/ya.make @@ -1,3 +1,4 @@ RECURSE( + ../service/ut ../ydb_proxy/ut ) diff --git a/ydb/core/tx/replication/ut_helpers/test_env.h b/ydb/core/tx/replication/ut_helpers/test_env.h index d4ca801b454a..182d95d6a4d2 100644 --- a/ydb/core/tx/replication/ut_helpers/test_env.h +++ b/ydb/core/tx/replication/ut_helpers/test_env.h @@ -2,6 +2,7 @@ #include #include #include +#include #include @@ -134,6 +135,10 @@ class TEnv { return Sender; } + const TActorId& GetYdbProxy() const { + return YdbProxy; + } + private: TPortManager PortManager; Tests::TServerSettings Settings; diff --git a/ydb/core/tx/replication/ut_helpers/write_topic.h b/ydb/core/tx/replication/ut_helpers/write_topic.h new file mode 100644 index 000000000000..3fc8955d7b77 --- /dev/null +++ b/ydb/core/tx/replication/ut_helpers/write_topic.h @@ -0,0 +1,24 @@ +#include + +namespace NKikimr::NReplication { + +template +bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) { + NYdb::NTopic::TTopicClient client(env.GetDriver(), NYdb::NTopic::TTopicClientSettings() + .DiscoveryEndpoint(env.GetEndpoint()) + .Database(env.GetDatabase()) + ); + + auto session = client.CreateSimpleBlockingWriteSession(NYdb::NTopic::TWriteSessionSettings() + .Path(topicPath) + .ProducerId("producer") + .MessageGroupId("producer") + ); + + const auto result = session->Write(data); + session->Close(); + + return result; +} + +} diff --git a/ydb/core/tx/replication/ut_helpers/ya.make b/ydb/core/tx/replication/ut_helpers/ya.make index 43d2e7ab0264..d57bf0301616 100644 --- a/ydb/core/tx/replication/ut_helpers/ya.make +++ b/ydb/core/tx/replication/ut_helpers/ya.make @@ -4,11 +4,14 @@ PEERDIR( ydb/core/base ydb/core/protos ydb/core/testlib/default + ydb/core/tx/replication/ydb_proxy + ydb/public/sdk/cpp/client/ydb_topic library/cpp/testing/unittest ) SRCS( test_env.h + write_topic.h ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index 00de5a1f01c2..3c6b36ea0a25 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -22,6 +23,20 @@ using namespace NYdb::NScheme; using namespace NYdb::NTable; using namespace NYdb::NTopic; +void TEvYdbProxy::TReadTopicResult::TMessage::Out(IOutputStream& out) const { + out << "{" + << " Offset: " << Offset + << " Data: " << Data.size() << "b" + << " Codec: " << Codec + << " }"; +} + +void TEvYdbProxy::TReadTopicResult::Out(IOutputStream& out) const { + out << "{" + << " Messages [" << JoinSeq(",", Messages) << "]" + << " }"; +} + template class TBaseProxyActor: public TActor { class TRequest; @@ -465,3 +480,11 @@ IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const T } } + +Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::TEvYdbProxy::TReadTopicResult::TMessage, o, x) { + return x.Out(o); +} + +Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::TEvYdbProxy::TReadTopicResult, o, x) { + return x.Out(o); +} diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 2afc4f63ef32..ff384052b396 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -83,6 +83,18 @@ struct TEvYdbProxy { using TBase = TGenericRequest; }; + template + class THasOutFunc { + template + static constexpr std::false_type Detect(...); + + template ().Out(std::declval()))> + static constexpr std::true_type Detect(int); + + public: + static constexpr bool Value = decltype(Detect(0))::value; + }; + template struct TGenericResponse: public TEventLocal { using TResult = T; @@ -96,6 +108,14 @@ struct TEvYdbProxy { { } + TString ToString() const override { + auto ret = TStringBuilder() << this->ToStringHeader(); + if constexpr (THasOutFunc::Value) { + ret << " { Result: " << Result << " }"; + } + return ret; + } + using TBase = TGenericResponse; }; @@ -126,6 +146,7 @@ struct TEvYdbProxy { const TString& GetData() const { return Data; } TString& GetData() { return Data; } ECodec GetCodec() const { return Codec; } + void Out(IOutputStream& out) const; private: ui64 Offset; @@ -146,6 +167,8 @@ struct TEvYdbProxy { } } + void Out(IOutputStream& out) const; + TVector Messages; }; diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index 1fc3b8d97342..2a9161efd1dc 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -1,6 +1,7 @@ #include "ydb_proxy.h" #include +#include #include #include @@ -580,25 +581,6 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { return ev->Get()->Result; } - template - bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) { - NYdb::NTopic::TTopicClient client(env.GetDriver(), NYdb::NTopic::TTopicClientSettings() - .DiscoveryEndpoint(env.GetEndpoint()) - .Database(env.GetDatabase()) - ); - - auto session = client.CreateSimpleBlockingWriteSession(NYdb::NTopic::TWriteSessionSettings() - .Path(topicPath) - .ProducerId("producer") - .MessageGroupId("producer") - ); - - const auto result = session->Write(data); - session->Close(); - - return result; - } - template TEvYdbProxy::TReadTopicResult ReadTopicData(Env& env, TActorId& reader, const TString& topicPath) { env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index d9049aa6c895..788bde3f98e0 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -347,6 +347,7 @@ enum EServiceKikimr { // Replication REPLICATION_CONTROLLER = 1200; + REPLICATION_SERVICE = 1201; // Blob depot BLOB_DEPOT = 1300;