Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ class TBaseProxyActor: public TActor<TDerived> {

class TTopicReader: public TBaseProxyActor<TTopicReader> {
void Handle(TEvYdbProxy::TEvReadTopicRequest::TPtr& ev) {
WaitEvent(ev->Sender, ev->Cookie);
}

void WaitEvent(const TActorId& sender, ui64 cookie) {
auto request = MakeRequest(SelfId());
auto cb = [request, sender = ev->Sender, cookie = ev->Cookie](const NThreading::TFuture<void>&) {
auto cb = [request, sender, cookie](const NThreading::TFuture<void>&) {
if (auto r = request.lock()) {
r->Complete(new TEvPrivate::TEvTopicEventReady(sender, cookie));
}
Expand All @@ -173,7 +177,31 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
void Handle(TEvPrivate::TEvTopicEventReady::TPtr& ev) {
auto event = Session->GetEvent(true);
Y_ABORT_UNLESS(event.Defined());
Send(ev->Get()->Sender, new TEvYdbProxy::TEvReadTopicResponse(std::move(*event)), 0, ev->Get()->Cookie);

if (auto* x = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
x->Confirm();
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (auto* x = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
x->Confirm();
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (auto* x = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
return (void)Send(ev->Get()->Sender, new TEvYdbProxy::TEvReadTopicResponse(*x), 0, ev->Get()->Cookie);
} else if (std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) {
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (std::get_if<TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) {
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
return Leave(ev->Get()->Sender);
} else if (std::get_if<TSessionClosedEvent>(&*event)) {
return Leave(ev->Get()->Sender);
} else {
Y_ABORT("Unexpected event");
}
}

void Leave(const TActorId& client) {
Send(client, new TEvents::TEvGone());
PassAway();
}

void PassAway() override {
Expand Down Expand Up @@ -354,6 +382,10 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(RegisterWithSameMailbox(new TTopicReader(session))));
}

void Handle(TEvYdbProxy::TEvCommitOffsetRequest::TPtr& ev) {
Call<TEvYdbProxy::TEvCommitOffsetResponse>(ev, &TTopicClient::CommitOffset);
}

static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) {
return TCommonClientSettings()
.DiscoveryEndpoint(endpoint)
Expand Down Expand Up @@ -405,6 +437,7 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
hFunc(TEvYdbProxy::TEvDescribeTopicRequest, Handle);
hFunc(TEvYdbProxy::TEvDescribeConsumerRequest, Handle);
hFunc(TEvYdbProxy::TEvCreateTopicReaderRequest, Handle);
hFunc(TEvYdbProxy::TEvCommitOffsetRequest, Handle);

default:
return StateBase(ev);
Expand Down
54 changes: 53 additions & 1 deletion ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct TEvYdbProxy {
EV_REQUEST_RESPONSE(DescribeConsumer),
EV_REQUEST_RESPONSE(CreateTopicReader),
EV_REQUEST_RESPONSE(ReadTopic),
EV_REQUEST_RESPONSE(CommitOffset),

EvEnd,
};
Expand Down Expand Up @@ -98,6 +99,56 @@ struct TEvYdbProxy {
using TBase = TGenericResponse<TDerived, EventType, T>;
};

struct TReadTopicResult {
class TMessage {
using TDataEvent = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent;
using ECodec = NYdb::NTopic::ECodec;

explicit TMessage(const TDataEvent::TMessageBase& msg, ECodec codec)
: Offset(msg.GetOffset())
, Data(msg.GetData())
, Codec(codec)
{
}

public:
explicit TMessage(const TDataEvent::TMessage& msg)
: TMessage(msg, ECodec::RAW)
{
}

explicit TMessage(const TDataEvent::TCompressedMessage& msg)
: TMessage(msg, msg.GetCodec())
{
}

ui64 GetOffset() const { return Offset; }
const TString& GetData() const { return Data; }
TString& GetData() { return Data; }
ECodec GetCodec() const { return Codec; }

private:
ui64 Offset;
TString Data;
ECodec Codec;
};

explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
Messages.reserve(event.GetMessagesCount());
if (event.HasCompressedMessages()) {
for (const auto& msg : event.GetCompressedMessages()) {
Messages.emplace_back(msg);
}
} else {
for (const auto& msg : event.GetMessages()) {
Messages.emplace_back(msg);
}
}
}

TVector<TMessage> Messages;
};

#define DEFINE_GENERIC_REQUEST(name, ...) \
struct TEv##name##Request: public TGenericRequest<TEv##name##Request, Ev##name##Request, __VA_ARGS__> { \
using TBase::TBase; \
Expand Down Expand Up @@ -134,7 +185,8 @@ struct TEvYdbProxy {
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, NYdb::NTopic::TReadSessionSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, NYdb::NTopic::TReadSessionEvent::TEvent, void);
DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, void);
DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings);

#undef DEFINE_GENERIC_REQUEST_RESPONSE
#undef DEFINE_GENERIC_RESPONSE
Expand Down
139 changes: 49 additions & 90 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,21 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
Server.GetRuntime()->Send(new IEventHandle(recipient, Sender, ev));
}

template <typename TEvResponse>
auto Wait(bool rethrow = false) {
if (rethrow) {
return Server.GetRuntime()->GrabEdgeEventRethrow<TEvResponse>(Sender);
} else {
return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
}
}

template <typename TEvResponse>
auto Send(const TActorId& recipient, IEventBase* ev) {
SendAsync(recipient, ev);
return Wait<TEvResponse>();
return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
}

template <typename TEvResponse>
auto Send(IEventBase* ev) {
return Send<TEvResponse>(YdbProxy, ev);
}

auto& GetRuntime() {
return *Server.GetRuntime();
}

const NYdb::TDriver& GetDriver() const {
return Server.GetDriver();
}
Expand All @@ -140,6 +135,10 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
return Database;
}

const TActorId& GetSender() const {
return Sender;
}

private:
TPortManager PortManager;
Tests::TServerSettings Settings;
Expand Down Expand Up @@ -742,43 +741,24 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
return result;
}

template <typename TEvent>
TEvent ReadTopicAsync(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
const auto* event = std::get_if<TEvent>(&ev->Get()->Result);
UNIT_ASSERT_C(event, "Unexpected event: " << ev->Get()->Result.index());

return *event;
}

template <typename TEvent, typename Env>
TEvent ReadTopic(Env& env, const TActorId& reader) {
auto ev = env.template Send<TEvYdbProxy::TEvReadTopicResponse>(reader,
new TEvYdbProxy::TEvReadTopicRequest());
UNIT_ASSERT(ev);

return ReadTopicAsync<TEvent>(ev);
}

using TReadSessionEvent = NYdb::NTopic::TReadSessionEvent;

template <typename Env>
TReadSessionEvent::TDataReceivedEvent ReadTopicData(Env& env, TActorId& reader, const TString& topicPath) {
TEvYdbProxy::TReadTopicResult ReadTopicData(Env& env, TActorId& reader, const TString& topicPath) {
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());

while (true) {
auto ev = env.template Send<TEvYdbProxy::TEvReadTopicResponse>(reader,
new TEvYdbProxy::TEvReadTopicRequest());
UNIT_ASSERT(ev);
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvYdbProxy::TEvReadTopicResponse, TEvents::TEvGone>(handle);
if (handle->Recipient != env.GetSender()) {
continue;
}

switch (ev->Get()->Result.index()) {
case 0:
return ReadTopicAsync<TReadSessionEvent::TDataReceivedEvent>(ev);
case 5: // TPartitionSessionClosedEvent
env.SendAsync(reader, new TEvents::TEvPoison());
if (auto* ev = std::get<TEvYdbProxy::TEvReadTopicResponse*>(result)) {
return ev->Result;
} else if (std::get<TEvents::TEvGone*>(result)) {
reader = CreateTopicReader(env, topicPath);
ReadTopic<TReadSessionEvent::TStartPartitionSessionEvent>(env, reader).Confirm();
break;
default:
UNIT_ASSERT_C(false, "Unexpected event: " << ev->Get()->Result.index());
break;
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());
} else {
UNIT_ASSERT("Unexpected event");
}
}
}
Expand All @@ -803,15 +783,17 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {

UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1"));
{
ReadTopic<TReadSessionEvent::TStartPartitionSessionEvent>(env, reader).Confirm();

auto data = ReadTopicData(env, reader, "/Root/topic");
UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().at(0).GetData(), "message-1");
data.Commit();
UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1);

auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, reader);
UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 1);
const auto& msg = data.Messages.at(0);
UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), 0);
UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-1");

auto ev = env.Send<TEvYdbProxy::TEvCommitOffsetResponse>(
new TEvYdbProxy::TEvCommitOffsetRequest("/Root/topic", 0, "consumer", msg.GetOffset() + 1, {}));
UNIT_ASSERT(ev);
UNIT_ASSERT(ev->Get()->Result.IsSuccess());
}

// wait next event
Expand All @@ -821,53 +803,30 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
// wait next event
env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest());

bool stopped = false;
bool closed = false;
bool started = false;
while (!stopped || !closed || !started) {
// wait response from any reader
TEvYdbProxy::TEvReadTopicResponse::TPtr ev;
try {
ev = env.Wait<TEvYdbProxy::TEvReadTopicResponse>(true);
} catch (yexception&) {
// bad luck, previous session was not closed, close it manually
env.SendAsync(reader, new TEvents::TEvPoison());
stopped = closed = true;
continue;
}

if (ev->Sender == reader) {
if (!stopped) {
ReadTopicAsync<TReadSessionEvent::TStopPartitionSessionEvent>(ev).Confirm();
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());
stopped = true;
} else if (!closed) {
ReadTopicAsync<TReadSessionEvent::TPartitionSessionClosedEvent>(ev);
closed = true;
} else {
UNIT_ASSERT_C(false, "Unexpected event from previous reader");
}
} else if (ev->Sender == newReader) {
if (!started) {
ReadTopicAsync<TReadSessionEvent::TStartPartitionSessionEvent>(ev).Confirm();
started = true;
} else {
UNIT_ASSERT_C(false, "Unexpected event from new reader");
}
} else {
UNIT_ASSERT_C(false, "Unknown reader");
// wait event from previous session
try {
auto ev = env.GetRuntime().GrabEdgeEventRethrow<TEvents::TEvGone>(env.GetSender());
if (ev->Sender != reader) {
ythrow yexception();
}
} catch (yexception&) {
// bad luck, previous session was not closed, close it manually
env.SendAsync(reader, new TEvents::TEvPoison());
}

UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2"));
{
auto data = ReadTopicData(env, newReader, "/Root/topic");
UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().at(0).GetData(), "message-2");
data.Commit();
UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1);

const auto& msg = data.Messages.at(0);
UNIT_ASSERT_VALUES_EQUAL(msg.GetOffset(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg.GetData(), "message-2");

auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, newReader);
UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 2);
auto ev = env.Send<TEvYdbProxy::TEvCommitOffsetResponse>(
new TEvYdbProxy::TEvCommitOffsetRequest("/Root/topic", 0, "consumer", msg.GetOffset() + 1, {}));
UNIT_ASSERT(ev);
UNIT_ASSERT(ev->Get()->Result.IsSuccess());
}
}

Expand Down