Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ydb/core/backup/impl/local_partition_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "local_partition_reader.h"
#include "logging.h"

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/services/services.pb.h>

#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/core/tx/replication/service/worker.h>
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/services/services.pb.h>

using namespace NActors;
using namespace NKikimr::NReplication::NService;
Expand Down Expand Up @@ -131,11 +131,11 @@ class TLocalPartitionReader
}

auto gotOffset = Offset;
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));
TVector<NReplication::TTopicMessage> records(::Reserve(readResult.ResultSize()));

for (auto& result : readResult.GetResult()) {
gotOffset = std::max(gotOffset, result.GetOffset());
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
}
SentOffset = gotOffset + 1;

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/backup/impl/local_partition_reader_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ Y_UNIT_TEST_SUITE(LocalPartitionReader) {
auto data = runtime.GrabEdgeEventRethrow<TEvWorker::TEvData>(handle);
UNIT_ASSERT_VALUES_EQUAL(data->Source, PARTITION_STR);
UNIT_ASSERT_VALUES_EQUAL(data->Records.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Offset, INITIAL_OFFSET + dataPatternCookie * 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Data, Sprintf("1-%d", dataPatternCookie));
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Offset, INITIAL_OFFSET + dataPatternCookie * 2 + 1);
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Data, Sprintf("2-%d", dataPatternCookie));
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetData(), Sprintf("1-%d", dataPatternCookie));
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2 + 1);
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetData(), Sprintf("2-%d", dataPatternCookie));
}

TEvPersQueue::TEvResponse* GenerateData(ui32 dataPatternCookie) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/replication/service/base_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -434,8 +435,8 @@ class TLocalTableWriter
TSet<TRowVersion> versionsWithoutTxId;

for (auto& r : ev->Get()->Records) {
auto offset = r.Offset;
auto& data = r.Data;
auto offset = r.GetOffset();
auto& data = r.GetData();

auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/replication/service/common_ut.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#pragma once

#include "worker.h"
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>

namespace NKikimr::NReplication::NService {

struct TRecord: public TEvWorker::TEvData::TRecord {
struct TRecord: public TTopicMessage {
explicit TRecord(ui64 offset, const TString& data)
: TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42)
: TTopicMessage(offset, data)
{}
};

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/replication/service/s3_writer_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "common_ut.h"
#include "s3_writer.h"
#include "worker.h"

Expand Down Expand Up @@ -60,8 +61,7 @@ Y_UNIT_TEST_SUITE(S3Writer) {
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"),
R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})");

using TRecord = TEvWorker::TEvData::TRecord;
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
Expand All @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(S3Writer) {
R"({"key":[2], "update":{"value":"20"}})" "\n"
R"({"key":[3], "update":{"value":"30"}})" "\n");

auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData({}));
auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData(0, "TestSource", {}));

UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, TEvWorker::TEvGone::DONE);
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/service/table_writer_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common_ut.h"
#include "service.h"
#include "table_writer.h"
#include "common_ut.h"
#include "worker.h"

#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "topic_reader.h"
#include "worker.h"

#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -54,11 +55,11 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
LOG_D("Handle " << ev->Get()->ToString());

auto& result = ev->Get()->Result;
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(result.Messages.size()));
TVector<TTopicMessage> records(::Reserve(result.Messages.size()));

for (auto& msg : result.Messages) {
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
records.push_back(std::move(msg));
}

Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/replication/service/topic_reader_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);

const auto& record = records.at(0);
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 0);
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-1");
UNIT_ASSERT_VALUES_EQUAL(record.GetOffset(), 0);
UNIT_ASSERT_VALUES_EQUAL(record.GetData(), "message-1");
}

// trigger commit, write new data & kill reader
Expand All @@ -103,8 +103,8 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);

const auto& record = records.at(0);
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 1);
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-2");
UNIT_ASSERT_VALUES_EQUAL(record.GetOffset(), 1);
UNIT_ASSERT_VALUES_EQUAL(record.GetData(), "message-2");
}
}
}
Expand Down
28 changes: 14 additions & 14 deletions ydb/core/tx/replication/service/transfer_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
#include "transfer_writer.h"
#include "worker.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>

#include <ydb/public/lib/scheme_types/scheme_type_id.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
#include <ydb/core/kqp/runtime/kqp_write_table.h>
#include <ydb/core/persqueue/purecalc/purecalc.h>
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
#include <ydb/core/persqueue/purecalc/purecalc.h> // should be after topic_message
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>

#include <yql/essentials/providers/common/schema/parser/yql_type_parser.h>
#include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
Expand Down Expand Up @@ -593,7 +593,7 @@ class TTransferWriter
ProcessData(ev->Get()->PartitionId, ev->Get()->Records);
}

void ProcessData(const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord>& records) {
void ProcessData(const ui32 partitionId, const TVector<TTopicMessage>& records) {
if (!records) {
Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE));
return;
Expand All @@ -603,20 +603,20 @@ class TTransferWriter

for (auto& message : records) {
NYdb::NTopic::NPurecalc::TMessage input;
input.Data = std::move(message.Data);
input.MessageGroupId = std::move(message.MessageGroupId);
input.Data = std::move(message.GetData());
input.MessageGroupId = std::move(message.GetMessageGroupId());
input.Partition = partitionId;
input.ProducerId = std::move(message.ProducerId);
input.Offset = message.Offset;
input.SeqNo = message.SeqNo;
input.ProducerId = std::move(message.GetProducerId());
input.Offset = message.GetOffset();
input.SeqNo = message.GetSeqNo();

try {
auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
while (auto* m = result->Fetch()) {
TableState->AddData(m->Data);
}
} catch (const yexception& e) {
ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what();
ProcessingError = TStringBuilder() << "Error transform message: " << e.what();
break;
}
}
Expand Down Expand Up @@ -730,7 +730,7 @@ class TTransferWriter

std::optional<TActorId> PendingWorker;
ui32 PendingPartitionId = 0;
std::optional<TVector<TEvWorker::TEvData::TRecord>> PendingRecords;
std::optional<TVector<TTopicMessage>> PendingRecords;

ui32 Attempt = 0;
TDuration Delay = TDuration::Minutes(1);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/transfer_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace NKikimr {
struct TPathId;
}

namespace NKikimr::NReplication::NService {

IActor* CreateTransferWriter(const TString& transformLambda, const TPathId& tablePathId,
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/service/transfer_writer_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common_ut.h"
#include "service.h"
#include "transfer_writer.h"
#include "common_ut.h"
#include "worker.h"

#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/ut_s3_writer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SIZE(MEDIUM)

PEERDIR(
ydb/core/tx/replication/ut_helpers
ydb/core/tx/replication/ydb_proxy
library/cpp/string_utils/base64
library/cpp/testing/unittest
)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/ut_table_writer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SIZE(MEDIUM)
PEERDIR(
ydb/core/tx/datashard/ut_common
ydb/core/tx/replication/ut_helpers
ydb/core/tx/replication/ydb_proxy
library/cpp/string_utils/base64
library/cpp/testing/unittest
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SIZE(MEDIUM)
PEERDIR(
ydb/core/tx/datashard/ut_common
ydb/core/tx/replication/ut_helpers
ydb/core/tx/replication/ydb_proxy
library/cpp/string_utils/base64
library/cpp/testing/unittest
)
Expand Down
37 changes: 5 additions & 32 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "worker.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>
Expand All @@ -13,48 +14,20 @@

namespace NKikimr::NReplication::NService {

TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo)
: Offset(offset)
, Data(data)
, CreateTime(createTime)
, MessageGroupId(messageGroupId)
, ProducerId(producerId)
, SeqNo(seqNo)
{
}

TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo)
: Offset(offset)
, Data(std::move(data))
, CreateTime(createTime)
, MessageGroupId(std::move(messageGroupId))
, ProducerId(std::move(producerId))
, SeqNo(seqNo)
{
}

TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records)
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records)
: PartitionId(partitionId)
, Source(source)
, Records(records)
{
}

TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records)
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TTopicMessage>&& records)
: PartitionId(partitionId)
, Source(source)
, Records(std::move(records))
{
}

void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
out << "{"
<< " Offset: " << Offset
<< " Data: " << Data.size() << "b"
<< " CreateTime: " << CreateTime.ToStringUpToSeconds()
<< " }";
}

TString TEvWorker::TEvData::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Source: " << Source
Expand Down Expand Up @@ -189,11 +162,11 @@ class TWorker: public TActorBootstrapped<TWorker> {
if (InFlightData) {
const auto& records = InFlightData->Records;
auto it = MinElementBy(records, [](const auto& record) {
return record.CreateTime;
return record.GetCreateTime();
});

if (it != records.end()) {
Lag = TlsActivationContext->Now() - it->CreateTime;
Lag = TlsActivationContext->Now() - it->GetCreateTime();
}
}

Expand Down
32 changes: 10 additions & 22 deletions ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

#include <functional>

namespace NKikimr::NReplication::NService {
namespace NKikimr::NReplication {

class TTopicMessage;

namespace NService {

struct TEvWorker {
enum EEv {
Expand All @@ -30,25 +34,12 @@ struct TEvWorker {
struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};

struct TEvData: public TEventLocal<TEvData, EvData> {
struct TRecord {
ui64 Offset;
TString Data;
TInstant CreateTime;
TString MessageGroupId;
TString ProducerId;
ui64 SeqNo;

explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo);
explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo);
void Out(IOutputStream& out) const;
};

ui32 PartitionId;
TString Source;
TVector<TRecord> Records;
TVector<TTopicMessage> Records;

explicit TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records);
explicit TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records);
explicit TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records);
explicit TEvData(ui32 partitionId, const TString& source, TVector<TTopicMessage>&& records);
TString ToString() const override;
};

Expand Down Expand Up @@ -89,8 +80,5 @@ IActor* CreateWorker(
std::function<IActor*(void)>&& createReaderFn,
std::function<IActor*(void)>&& createWriterFn);

}

Y_DECLARE_OUT_SPEC(inline, NKikimr::NReplication::NService::TEvWorker::TEvData::TRecord, o, x) {
return x.Out(o);
}
} // NService
} // NKikimr::NReplication
Loading
Loading