diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index d192a702ffc8..93b717207aa3 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -171,6 +171,7 @@ struct TKikimrEvents : TEvents { ES_TABLE_CREATOR, ES_PQ_PARTITION_CHOOSER, ES_GRAPH, + ES_CHANGE_EXCHANGE_READER, }; }; diff --git a/ydb/core/change_mirroring/reader.h b/ydb/core/change_mirroring/reader.h new file mode 100644 index 000000000000..75992fce0108 --- /dev/null +++ b/ydb/core/change_mirroring/reader.h @@ -0,0 +1,206 @@ +#pragma once + +#include + +#include + +#include +#include +#include +#include +#include + +namespace NKikimr::NChangeMirroring { + +template +class TDerivedMixin { +public: + TDerived& Derived() { return static_cast(*this); } + const TDerived& Derived() const { return static_cast(*this); } + TDerived* DerivedPtr() { return static_cast(this); } + const TDerived* DerivedPtr() const { return static_cast(this); } +}; + +#define DEFINE_DERIVED_STATEFN() \ + template \ + using TFn = void (TStateBase::*)(TAutoPtr& ev); \ + template fn> \ + void DerivedStateFn(TAutoPtr& ev) { \ + (static_cast(this)->*fn)(ev); \ + } \ + +/* AI stands for actor interface */ +class AIReader { +public: + struct TEvReader { + enum { + EvPoll = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE_READER), + EvPollResult, + EvEnd, + }; + + struct TEvPoll: public NActors::TEventLocal { + }; + + struct TEvPollResult : public NActors::TEventLocal { + TEvPollResult(TVector data) + : Data(data) + {} + TVector Data; + }; + }; + + struct Tag {}; + struct TReaderActorHandle { + const NActors::TActorId ActorId; + }; + + class IClient { + public: + virtual ~IClient() {}; + virtual void PollResult(Tag, const TEvReader::TEvPollResult& result) { Y_UNUSED(result); }; + }; + + class IServer { + public: + virtual ~IServer() {}; + virtual void Poll(Tag, const AIReader::TEvReader::TEvPoll::TPtr& result) { Y_UNUSED(result); }; + }; + + template + class TClientDerivedCaller + : public TDerivedMixin + { + public: + void OnPollResult(const TEvReader::TEvPollResult::TPtr &ev) { + this->DerivedPtr()->PollResult(Tag{}, *ev->Get()); + } + public: + ~TClientDerivedCaller() { + static_assert(std::derived_from, "TDerived should be derived from IClient"); + } + }; + + class TClientEventDispatcherBase {}; + + template + class TClientEventDispatcher + : public TClientEventDispatcherBase + , public TImpl + { + public: + using TClientEventDispatcherType = TClientEventDispatcher; + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvReader::TEvPollResult, TImpl::OnPollResult); + } + } + }; + + template + using TClientEventDerivedDispatcher = TClientEventDispatcher>; + + template + class TDefaultClientBase + : public IClient + , public TClientEventDerivedDispatcher + { + public: + using TBase = TDefaultClientBase; + + TDefaultClientBase(const TReaderActorHandle impl) + : Impl(impl.ActorId) + {} + + void BecomeStateWork() { + auto ptr = &TDerived::template DerivedStateFn< + typename TDefaultClientBase::template TClientEventDispatcher>, + &TDefaultClientBase::template TClientEventDispatcher>::StateWork>; + this->Derived().Become(ptr); + } + + template + void Reply(const TIn& in, const TOut& out) { + this->Derived().Send(in->Sender, out); + } + + NActors::IActorOps& ActorOps() { + return static_cast(this->Derived()); + } + + std::unique_ptr Poll() const { + return std::make_unique(); + } + + NActors::TActorId ReaderActor() const { + return Impl; + } + + private: + NActors::TActorId Impl; + }; + + template + class TServerDerivedCaller + : public TDerivedMixin + { + public: + void OnPoll(TEvReader::TEvPoll::TPtr &ev) { + this->DerivedPtr()->Poll(Tag{}, ev); + } + public: + ~TServerDerivedCaller() { + static_assert(std::derived_from, "TDerived should be derived from IServer"); + } + }; + + class TServerEventDispatcherBase {}; + + template + class TServerEventDispatcher + : public TServerEventDispatcherBase + , public TImpl + { + public: + using TServerEventDispatcherType = TServerEventDispatcher; + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvReader::TEvPoll, TImpl::OnPoll); + } + } + }; + + template + using TServerEventDerivedDispatcher = TServerEventDispatcher>; + + template + class TDefaultServerBase + : public IServer + , public TServerEventDerivedDispatcher + { + public: + using TBase = TDefaultServerBase; + + void BecomeStateWork() { + auto ptr = &TDerived::template DerivedStateFn< + typename TDefaultServerBase::template TServerEventDispatcher>, + &TDefaultServerBase::template TServerEventDispatcher>::StateWork>; + this->Derived().Become(ptr); + } + + template + void Reply(const TIn& in, const TOut& out) { + this->Derived().Send(in->Sender, out); + } + + NActors::IActorOps& ActorOps() { + return static_cast(this->Derived()); + } + + std::unique_ptr PollResult(TVector data) const { + return std::make_unique(data); + } + }; +}; + +} // namespace NKikimr::NChangeMirroring diff --git a/ydb/core/change_mirroring/reader_mock.h b/ydb/core/change_mirroring/reader_mock.h new file mode 100644 index 000000000000..9daeb7fd5e85 --- /dev/null +++ b/ydb/core/change_mirroring/reader_mock.h @@ -0,0 +1,63 @@ +#pragma once + +#include "reader.h" + +#include + +namespace NKikimr::NChangeMirroring { + +class TReaderClientMock + : public NActors::TActorBootstrapped + , public AIReader::TDefaultClientBase +{ +public: + using TBase::TBase; + + /* both can be reduced by patching actorlib */ + friend TBase; + DEFINE_DERIVED_STATEFN(); + + void Bootstrap() { + if (OnBootstrap) { + OnBootstrap(*this); + } + } + + void PollResult(AIReader::Tag, const AIReader::TEvReader::TEvPollResult& result) override { + if (OnPollResult) { + OnPollResult(*this, result); + } + } + + std::function OnBootstrap; + std::function OnPollResult; +}; + +class TReaderServerMock + : public NActors::TActorBootstrapped + , public AIReader::TDefaultServerBase +{ +public: + using TBase::TBase; + + /* both can be reduced by patching actorlib */ + friend TBase; + DEFINE_DERIVED_STATEFN(); + + void Bootstrap() { + if (OnBootstrap) { + OnBootstrap(*this); + } + } + + void Poll(AIReader::Tag, const AIReader::TEvReader::TEvPoll::TPtr& result) override { + if (OnPoll) { + OnPoll(*this, result); + } + } + + std::function OnBootstrap; + std::function OnPoll; +}; + +} // namespace NKikimr::NChangeMirroring diff --git a/ydb/core/change_mirroring/reader_ut.cpp b/ydb/core/change_mirroring/reader_ut.cpp new file mode 100644 index 000000000000..ccbe7ea48fd0 --- /dev/null +++ b/ydb/core/change_mirroring/reader_ut.cpp @@ -0,0 +1,223 @@ +#include "reader.h" +#include "reader_mock.h" + +#include + +#include + +#include +#include +#include +#include + +namespace NKikimr::NChangeMirroring { + +using namespace NActors; + +class TExampleReaderClientImpl + : public TReaderClientMock +{ +public: + using TReaderClientMock::TReaderClientMock; + + void PollResult(AIReader::Tag tag, const AIReader::TEvReader::TEvPollResult& result) override { + Data = result.Data; + Offset = 0; + Polled = true; + TReaderClientMock::PollResult(tag, result); + } + + bool HasNext() const { + return Offset < Data.size(); + } + + ui64 GetRemaining() const { + return Data.size() - Offset; + } + + ui64 GetGlobalOffset() const { + return GlobalOffset; + } + + TRcBuf ReadNext() { + Y_VERIFY(Offset < Data.size()); + GlobalOffset++; + return Data[Offset++]; + } + + bool NeedPoll() const { + return Polled && !Eof() && (Data.size() == Offset); + } + + bool Eof() const { + return Polled && (Data.size() == 0); + } + +private: + TVector Data; + ui64 Offset = 0; + bool Polled = false; + ui64 GlobalOffset = 0; +}; + +class TExampleReaderServerImpl + : public TReaderServerMock +{ +public: + void Poll(AIReader::Tag tag, const AIReader::TEvReader::TEvPoll::TPtr& request) override { + if (Offset != Data.size()) { + Reply(request, PollResult(Data[Offset++]).release()); + } else { + Reply(request, PollResult({}).release()); + } + TReaderServerMock::Poll(tag, request); + } + + void SetData(TVector> data) { + Data = data; + } + +private: + TVector> Data; + ui64 Offset = 0; +}; + +TVector Slice(const TString& str) { + TVector slicedData; + auto buf = TRcBuf(str); + // slice data into pieces [1] [2, 3] [4, 5, 6] ... + for(ui64 i = 1, offset = 0; offset < str.size(); ++i) { + TRcBuf piece(buf); + piece.TrimFront(str.size() - offset); + ui64 size = std::min(i, str.size() - offset); + piece.TrimBack(size); + Y_ABORT_UNLESS(piece.size() == i || piece.size() == str.size() - offset); + offset += i; + slicedData.push_back(piece); + } + return slicedData; +} + +Y_UNIT_TEST_SUITE(Reader) { + Y_UNIT_TEST(TestActorReaderClient) { + TTestActorRuntime runtime; + runtime.Initialize(NKikimr::TAppPrepare().Unwrap()); + const auto edge = runtime.AllocateEdgeActor(0); + + TString result; + + auto* readerClient = new TExampleReaderClientImpl(AIReader::TReaderActorHandle{edge}); + readerClient->OnBootstrap = [&](auto&) { + readerClient->BecomeStateWork(); + readerClient->ActorOps().Send(readerClient->ReaderActor(), readerClient->Poll().release()); + }; + readerClient->OnPollResult = [&](auto&, const AIReader::TEvReader::TEvPollResult&) { + while (!readerClient->Eof()) { + if (readerClient->NeedPoll()) { + readerClient->ActorOps().Send(readerClient->ReaderActor(), readerClient->Poll().release()); + return; + } + + while (readerClient->HasNext()) { + TRcBuf piece = readerClient->ReadNext(); + result += TString(piece.Data(), piece.size()); + } + } + + readerClient->ActorOps().Send(edge, new TEvents::TEvWakeup()); + }; + runtime.Register(readerClient); + + TAutoPtr handle; + TString expectedResult; + + TString data1 = "Lorem ipsum dolor sit amet," + " consectetur adipiscing elit, sed do eiusmod tempor" + " incididunt ut labore et dolore magna aliqua."; + expectedResult += data1; + auto slicedData1 = Slice(data1); + + runtime.GrabEdgeEventRethrow(handle); + runtime.Send(new IEventHandle( + handle->Sender, + handle->Recipient, + new AIReader::TEvReader::TEvPollResult(slicedData1), + 0, // flags + handle->Cookie + ), 0); + + TString data2 = "Another non-empty string"; + expectedResult += data2; + auto slicedData2 = Slice(data2); + + runtime.GrabEdgeEventRethrow(handle); + runtime.Send(new IEventHandle( + handle->Sender, + handle->Recipient, + new AIReader::TEvReader::TEvPollResult(slicedData2), + 0, // flags + handle->Cookie + ), 0); + + runtime.GrabEdgeEventRethrow(handle); + runtime.Send(new IEventHandle( + handle->Sender, + handle->Recipient, + new AIReader::TEvReader::TEvPollResult({}), + 0, // flags + handle->Cookie + ), 0); + + runtime.GrabEdgeEventRethrow(handle); + + UNIT_ASSERT_VALUES_EQUAL(expectedResult, result); + } + + Y_UNIT_TEST(TestActorReaderServer) { + TTestActorRuntime runtime; + runtime.Initialize(NKikimr::TAppPrepare().Unwrap()); + const auto edge = runtime.AllocateEdgeActor(0); + + TString expectedResult; + TString data1 = "Lorem ipsum dolor sit amet," + " consectetur adipiscing elit, sed do eiusmod tempor" + " incididunt ut labore et dolore magna aliqua."; + expectedResult += data1; + auto slicedData1 = Slice(data1); + TString data2 = "Another non-empty string"; + expectedResult += data2; + auto slicedData2 = Slice(data2); + + auto* readerServer = new TExampleReaderServerImpl(); + readerServer->OnBootstrap = [&](auto& mock) { + readerServer->BecomeStateWork(); + mock.ActorOps().Send(edge, new TEvents::TEvWakeup); + }; + readerServer->SetData({slicedData1, slicedData2}); + auto readerServerId = runtime.Register(readerServer); + + TAutoPtr handle; + + runtime.GrabEdgeEventRethrow(handle); + + TString result; + NKikimr::NChangeMirroring::AIReader::TEvReader::TEvPollResult* pollResult = nullptr; + do { + runtime.Send(new IEventHandle( + readerServerId, + edge, + new AIReader::TEvReader::TEvPoll(), + 0, // flags + 0 + ), 0); + pollResult = runtime.GrabEdgeEventRethrow(handle); + for (auto& data : pollResult->Data) { + result += TString(data.data(), data.size()); + } + } while(pollResult && pollResult->Data.size() != 0); + + UNIT_ASSERT_VALUES_EQUAL(expectedResult, result); + } +} + +} // namespace NKikimr::NChangeMirroring diff --git a/ydb/core/change_mirroring/ut/ya.make b/ydb/core/change_mirroring/ut/ya.make new file mode 100644 index 000000000000..276ca16616ce --- /dev/null +++ b/ydb/core/change_mirroring/ut/ya.make @@ -0,0 +1,18 @@ +UNITTEST_FOR(ydb/core/change_mirroring) + +SIZE(SMALL) + +PEERDIR( + library/cpp/testing/unittest + ydb/core/testlib + # temporary hack to fix linkage + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/sql/pg + ydb/library/yql/sql +) + +SRCS( + reader_ut.cpp +) + +END() diff --git a/ydb/core/change_mirroring/ya.make b/ydb/core/change_mirroring/ya.make new file mode 100644 index 000000000000..7863419acf8a --- /dev/null +++ b/ydb/core/change_mirroring/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +PEERDIR( + ydb/library/actors/core +) + +SRCS() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/ya.make b/ydb/core/ya.make index ddc2239a6b06..0e74c09eb119 100644 --- a/ydb/core/ya.make +++ b/ydb/core/ya.make @@ -5,6 +5,7 @@ RECURSE( blob_depot blockstore change_exchange + change_mirroring client cms control