Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ydb/core/tx/replication/service/logging.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>

#define LOG_T(stream) LOG_TRACE_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream)
#define LOG_D(stream) LOG_DEBUG_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream)
#define LOG_I(stream) LOG_INFO_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream)
#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream)
#define LOG_W(stream) LOG_WARN_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream)
#define LOG_E(stream) LOG_ERROR_S (*TlsActivationContext, NKikimrServices::REPLICATION_SERVICE, "" << SelfId() << " " << stream)
18 changes: 17 additions & 1 deletion ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "logging.h"
#include "topic_reader.h"
#include "worker.h"

Expand All @@ -12,23 +13,32 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {

void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
Worker = ev->Sender;
LOG_D("Handshake"
<< ": worker# " << Worker);

Y_ABORT_UNLESS(!ReadSession);
Send(YdbProxy, new TEvYdbProxy::TEvCreateTopicReaderRequest(Settings));
}

void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) {
ReadSession = ev->Get()->Result;
LOG_D("Create read session"
<< ": session# " << ReadSession);

Y_ABORT_UNLESS(Worker);
Send(Worker, new TEvWorker::TEvHandshake());
}

void Handle(TEvWorker::TEvPoll::TPtr&) {
void Handle(TEvWorker::TEvPoll::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

Y_ABORT_UNLESS(ReadSession);
Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest());

if (CommitOffset) {
LOG_D("Commit offset"
<< ": offset# " << CommitOffset);

Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(
Settings.Topics_[0].Path_,
Settings.Topics_[0].PartitionIds_[0],
Expand All @@ -39,6 +49,8 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
}

void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

auto& result = ev->Get()->Result;
TVector<TEvWorker::TEvData::TRecord> records(Reserve(result.Messages.size()));

Expand All @@ -53,12 +65,16 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
}

void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

if (!ev->Get()->Result.IsSuccess()) {
LOG_N("Unsuccessful commit offset");
Leave();
}
}

void Leave() {
LOG_I("Leave");
Send(Worker, new TEvents::TEvGone());
PassAway();
}
Expand Down
116 changes: 116 additions & 0 deletions ydb/core/tx/replication/service/topic_reader_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "topic_reader.h"
#include "worker.h"

#include <ydb/core/tx/replication/ut_helpers/test_env.h>
#include <ydb/core/tx/replication/ut_helpers/write_topic.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NReplication::NService {

Y_UNIT_TEST_SUITE(RemoteTopicReader) {
template <typename Env>
TActorId CreateReader(Env& env, const NYdb::NTopic::TReadSessionSettings& settings) {
auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvHandshake, TEvents::TEvGone>(handle);
if (handle->Sender != reader) {
continue;
}

if (auto* ev = std::get<TEvWorker::TEvHandshake*>(result)) {
return reader;
} else if (std::get<TEvents::TEvGone*>(result)) {
reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());
continue;
} else {
UNIT_ASSERT("Unexpected event");
}
}
}

template <typename Env>
auto ReadData(Env& env, TActorId& reader, const NYdb::NTopic::TReadSessionSettings& settings) {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvData, TEvents::TEvGone>(handle);
if (handle->Sender != reader) {
continue;
}

if (auto* ev = std::get<TEvWorker::TEvData*>(result)) {
return ev->Records;
} else if (std::get<TEvents::TEvGone*>(result)) {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());
continue;
}
}
}

Y_UNIT_TEST(ReadTopic) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);

// create topic
{
auto settings = NYdb::NTopic::TCreateTopicSettings()
.BeginAddConsumer()
.ConsumerName("consumer")
.EndAddConsumer();

auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings));
UNIT_ASSERT(ev);
UNIT_ASSERT(ev->Get()->Result.IsSuccess());
}

auto settings = NYdb::NTopic::TReadSessionSettings()
.ConsumerName("consumer")
.AppendTopics(NYdb::NTopic::TTopicReadSettings()
.Path("/Root/topic")
.AppendPartitionIds(0)
);

TActorId reader;

// write, create reader & read
UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1"));
{
auto records = ReadData(env, reader, settings);
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);

const auto& record = records.at(0);
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 0);
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-1");
}

// trigger commit, write new data & kill reader
{
env.SendAsync(reader, new TEvWorker::TEvPoll());
UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2"));
env.SendAsync(reader, new TEvents::TEvPoison());
}

// create reader again & read
{
auto records = ReadData(env, reader, settings);
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);

const auto& record = records.at(0);
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 1);
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-2");
}
}
}

}
3 changes: 3 additions & 0 deletions ydb/core/tx/replication/service/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
RECURSE(
../ut_topic_reader
)
22 changes: 22 additions & 0 deletions ydb/core/tx/replication/service/ut_topic_reader/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
UNITTEST_FOR(ydb/core/tx/replication/service)

FORK_SUBTESTS()

SIZE(MEDIUM)

TIMEOUT(600)

PEERDIR(
ydb/core/tx/replication/ut_helpers
ydb/core/tx/replication/ydb_proxy
ydb/public/sdk/cpp/client/ydb_topic
library/cpp/testing/unittest
)

SRCS(
topic_reader_ut.cpp
)

YQL_LAST_ABI_VERSION()

END()
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PEERDIR(
ydb/core/io_formats/cell_maker
ydb/core/tx/replication/ydb_proxy
ydb/library/actors/core
ydb/library/services
library/cpp/json
)

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
RECURSE(
../service/ut
../ydb_proxy/ut
)
5 changes: 5 additions & 0 deletions ydb/core/tx/replication/ut_helpers/test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ydb/core/protos/replication.pb.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>

#include <library/cpp/testing/unittest/registar.h>

Expand Down Expand Up @@ -134,6 +135,10 @@ class TEnv {
return Sender;
}

const TActorId& GetYdbProxy() const {
return YdbProxy;
}

private:
TPortManager PortManager;
Tests::TServerSettings Settings;
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/replication/ut_helpers/write_topic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NReplication {

template <typename Env>
bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) {
NYdb::NTopic::TTopicClient client(env.GetDriver(), NYdb::NTopic::TTopicClientSettings()
.DiscoveryEndpoint(env.GetEndpoint())
.Database(env.GetDatabase())
);

auto session = client.CreateSimpleBlockingWriteSession(NYdb::NTopic::TWriteSessionSettings()
.Path(topicPath)
.ProducerId("producer")
.MessageGroupId("producer")
);

const auto result = session->Write(data);
session->Close();

return result;
}

}
3 changes: 3 additions & 0 deletions ydb/core/tx/replication/ut_helpers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ PEERDIR(
ydb/core/base
ydb/core/protos
ydb/core/testlib/default
ydb/core/tx/replication/ydb_proxy
ydb/public/sdk/cpp/client/ydb_topic
library/cpp/testing/unittest
)

SRCS(
test_env.h
write_topic.h
)

YQL_LAST_ABI_VERSION()
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/appdata.h>

#include <util/generic/hash_set.h>
#include <util/string/join.h>

#include <memory>
#include <mutex>
Expand All @@ -22,6 +23,20 @@ using namespace NYdb::NScheme;
using namespace NYdb::NTable;
using namespace NYdb::NTopic;

void TEvYdbProxy::TReadTopicResult::TMessage::Out(IOutputStream& out) const {
out << "{"
<< " Offset: " << Offset
<< " Data: " << Data.size() << "b"
<< " Codec: " << Codec
<< " }";
}

void TEvYdbProxy::TReadTopicResult::Out(IOutputStream& out) const {
out << "{"
<< " Messages [" << JoinSeq(",", Messages) << "]"
<< " }";
}

template <typename TDerived>
class TBaseProxyActor: public TActor<TDerived> {
class TRequest;
Expand Down Expand Up @@ -465,3 +480,11 @@ IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const T
}

}

Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::TEvYdbProxy::TReadTopicResult::TMessage, o, x) {
return x.Out(o);
}

Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::TEvYdbProxy::TReadTopicResult, o, x) {
return x.Out(o);
}
23 changes: 23 additions & 0 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ struct TEvYdbProxy {
using TBase = TGenericRequest<TDerived, EventType, void>;
};

template <typename T>
class THasOutFunc {
template <typename U>
static constexpr std::false_type Detect(...);

template <typename U, typename = decltype(std::declval<U>().Out(std::declval<IOutputStream&>()))>
static constexpr std::true_type Detect(int);

public:
static constexpr bool Value = decltype(Detect<T>(0))::value;
};

template <typename TDerived, ui32 EventType, typename T>
struct TGenericResponse: public TEventLocal<TDerived, EventType> {
using TResult = T;
Expand All @@ -96,6 +108,14 @@ struct TEvYdbProxy {
{
}

TString ToString() const override {
auto ret = TStringBuilder() << this->ToStringHeader();
if constexpr (THasOutFunc<TResult>::Value) {
ret << " { Result: " << Result << " }";
}
return ret;
}

using TBase = TGenericResponse<TDerived, EventType, T>;
};

Expand Down Expand Up @@ -126,6 +146,7 @@ struct TEvYdbProxy {
const TString& GetData() const { return Data; }
TString& GetData() { return Data; }
ECodec GetCodec() const { return Codec; }
void Out(IOutputStream& out) const;

private:
ui64 Offset;
Expand All @@ -146,6 +167,8 @@ struct TEvYdbProxy {
}
}

void Out(IOutputStream& out) const;

TVector<TMessage> Messages;
};

Expand Down
Loading