diff --git a/ydb/core/tx/replication/service/json_change_record.cpp b/ydb/core/tx/replication/service/json_change_record.cpp index a1afe9b208f0..45fd172d0335 100644 --- a/ydb/core/tx/replication/service/json_change_record.cpp +++ b/ydb/core/tx/replication/service/json_change_record.cpp @@ -33,6 +33,40 @@ NChangeExchange::IChangeRecord::EKind TChangeRecord::GetKind() const { : EKind::CdcDataChange; } +static bool ParseKey(TVector& cells, + const NJson::TJsonValue::TArray& key, TLightweightSchema::TCPtr schema, TMemoryPool& pool, TString& error) +{ + cells.resize(key.size()); + + Y_ABORT_UNLESS(key.size() == schema->KeyColumns.size()); + for (ui32 i = 0; i < key.size(); ++i) { + if (!NFormats::MakeCell(cells[i], key[i], schema->KeyColumns[i], pool, error)) { + return false; + } + } + + return true; +} + +static bool ParseValue(TVector& tags, TVector& cells, + const NJson::TJsonValue::TMapType& value, TLightweightSchema::TCPtr schema, TMemoryPool& pool, TString& error) +{ + tags.reserve(value.size()); + cells.reserve(value.size()); + + for (const auto& [column, value] : value) { + auto it = schema->ValueColumns.find(column); + Y_ABORT_UNLESS(it != schema->ValueColumns.end()); + + tags.push_back(it->second.Tag); + if (!NFormats::MakeCell(cells.emplace_back(), value, it->second.Type, pool, error)) { + return false; + } + } + + return true; +} + void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const { record.SetSourceOffset(GetOrder()); // TODO: fill WriteTxId @@ -42,13 +76,10 @@ void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TC if (JsonBody.Has("key") && JsonBody["key"].IsArray()) { const auto& key = JsonBody["key"].GetArray(); - Y_ABORT_UNLESS(key.size() == Schema->KeyColumns.size()); + TVector cells; - TVector cells(key.size()); - for (ui32 i = 0; i < key.size(); ++i) { - auto res = NFormats::MakeCell(cells[i], key[i], Schema->KeyColumns[i], pool, error); - Y_ABORT_UNLESS(res); - } + auto res = ParseKey(cells, key, Schema, pool, error); + Y_ABORT_UNLESS(res); record.SetKey(TSerializedCellVec::Serialize(cells)); } else { @@ -57,18 +88,14 @@ void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TC if (JsonBody.Has("update") && JsonBody["update"].IsMap()) { const auto& update = JsonBody["update"].GetMap(); - auto& upsert = *record.MutableUpsert(); - - TVector cells(::Reserve(update.size())); - for (const auto& [column, value] : update) { - auto it = Schema->ValueColumns.find(column); - Y_ABORT_UNLESS(it != Schema->ValueColumns.end()); + TVector tags; + TVector cells; - upsert.AddTags(it->second.Tag); - auto res = NFormats::MakeCell(cells.emplace_back(), value, it->second.Type, pool, error); - Y_ABORT_UNLESS(res); - } + auto res = ParseValue(tags, cells, update, Schema, pool, error); + Y_ABORT_UNLESS(res); + auto& upsert = *record.MutableUpsert(); + *upsert.MutableTags() = {tags.begin(), tags.end()}; upsert.SetData(TSerializedCellVec::Serialize(cells)); } else if (JsonBody.Has("erase")) { record.MutableErase(); @@ -77,4 +104,26 @@ void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TC } } +TConstArrayRef TChangeRecord::GetKey() const { + if (!Key) { + TMemoryPool pool(256); + TString error; + + if (JsonBody.Has("key") && JsonBody["key"].IsArray()) { + const auto& key = JsonBody["key"].GetArray(); + TVector cells; + + auto res = ParseKey(cells, key, Schema, pool, error); + Y_ABORT_UNLESS(res); + + Key.ConstructInPlace(cells); + } else { + Y_ABORT("Malformed json record"); + } + } + + Y_ABORT_UNLESS(Key); + return *Key; +} + } diff --git a/ydb/core/tx/replication/service/json_change_record.h b/ydb/core/tx/replication/service/json_change_record.h index fa3969c58bd0..0cc272b44fa5 100644 --- a/ydb/core/tx/replication/service/json_change_record.h +++ b/ydb/core/tx/replication/service/json_change_record.h @@ -2,12 +2,14 @@ #include #include +#include #include #include #include #include +#include #include #include @@ -39,10 +41,14 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const; + TConstArrayRef GetKey() const; + private: NJson::TJsonValue JsonBody; TLightweightSchema::TCPtr Schema; + mutable TMaybe Key; + }; // TChangeRecord class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder { diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 6bd3cb08bd53..19b16d8c0aea 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -1,13 +1,132 @@ +#include "json_change_record.h" #include "table_writer.h" #include "worker.h" +#include #include +#include +#include #include +#include #include #include +#include + namespace NKikimr::NReplication::NService { +class TTablePartitionWriter: public TActorBootstrapped { + void GetProxyServices() { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest()); + Become(&TThis::StateGetProxyServices); + } + + STATEFN(StateGetProxyServices) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvGetProxyServicesResponse, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvTxUserProxy::TEvGetProxyServicesResponse::TPtr& ev) { + LeaderPipeCache = ev->Get()->Services.LeaderPipeCache; + Ready(); + } + + void Ready() { + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(TabletId)); + Become(&TThis::StateWaitingRecords); + } + + STATEFN(StateWaitingRecords) { + switch (ev->GetTypeRewrite()) { + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + default: + return StateBase(ev); + } + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { + auto event = MakeHolder(); + + auto& tableId = *event->Record.MutableTableId(); + tableId.SetOwnerId(TablePathId.OwnerId); + tableId.SetTableId(TablePathId.LocalPathId); + // TODO: SetSchemaVersion? + + for (auto recordPtr : ev->Get()->Records) { + const auto& record = *recordPtr->Get(); + record.Serialize(*event->Record.AddChanges()); + // TODO: set WriteTxId, Source + } + + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false)); + Become(&TThis::StateWaitingStatus); + } + + STATEFN(StateWaitingStatus) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDataShard::TEvApplyReplicationChangesResult, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvDataShard::TEvApplyReplicationChangesResult::TPtr&) { + // TODO: handle result + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + if (TabletId == ev->Get()->TabletId) { + Leave(); + } + } + + void Leave() { + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(TabletId)); + PassAway(); + } + + void Unlink() { + if (LeaderPipeCache) { + Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(TabletId)); + } + } + + void PassAway() override { + Unlink(); + TActorBootstrapped::PassAway(); + } + +public: + explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TPathId& tablePathId) + : Parent(parent) + , TabletId(tabletId) + , TablePathId(tablePathId) + { + } + + void Bootstrap() { + GetProxyServices(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const ui64 TabletId; + const TPathId TablePathId; + + TActorId LeaderPipeCache; + +}; // TTablePartitionWriter + class TLocalTableWriter : public TActor , public NChangeExchange::TBaseChangeSender @@ -61,7 +180,7 @@ class TLocalTableWriter } static TVector MakePartitionIds(const TVector& partitions) { - TVector result(Reserve(partitions.size())); + TVector result(::Reserve(partitions.size())); for (const auto& partition : partitions) { result.push_back(partition.ShardId); @@ -124,24 +243,29 @@ class TLocalTableWriter return; } - TVector keyColumnTypes; + auto schema = MakeIntrusive(); for (const auto& [_, column] : entry.Columns) { - if (column.KeyOrder < 0) { - continue; - } - - if (keyColumnTypes.size() <= static_cast(column.KeyOrder)) { - keyColumnTypes.resize(column.KeyOrder + 1); + if (column.KeyOrder >= 0) { + if (schema->KeyColumns.size() <= static_cast(column.KeyOrder)) { + schema->KeyColumns.resize(column.KeyOrder + 1); + } + + schema->KeyColumns[column.KeyOrder] = column.PType; + } else { + auto res = schema->ValueColumns.emplace(column.Name, TLightweightSchema::TColumn{ + .Tag = column.Id, + .Type = column.PType, + }); + Y_ABORT_UNLESS(res.second); } - - keyColumnTypes[column.KeyOrder] = column.PType; } + Schema = schema; KeyDesc = MakeHolder( entry.TableId, - GetFullRange(keyColumnTypes.size()).ToTableRange(), + GetFullRange(schema->KeyColumns.size()).ToTableRange(), TKeyDesc::ERowOperation::Update, - keyColumnTypes, + schema->KeyColumns, TVector() ); @@ -190,31 +314,69 @@ class TLocalTableWriter } IActor* CreateSender(ui64 partitionId) override { - Y_UNUSED(partitionId); - return nullptr; + return new TTablePartitionWriter(SelfId(), partitionId, PathId); } ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const override { - Y_UNUSED(record); - return 0; + Y_ABORT_UNLESS(KeyDesc); + Y_ABORT_UNLESS(KeyDesc->GetPartitions()); + + const auto range = TTableRange(record->Get()->GetKey()); + Y_ABORT_UNLESS(range.Point); + + TVector::const_iterator it = LowerBound( + KeyDesc->GetPartitions().begin(), KeyDesc->GetPartitions().end(), true, + [&](const TKeyDesc::TPartitionInfo& partition, bool) { + const int compares = CompareBorders( + partition.Range->EndKeyPrefix.GetCells(), range.From, + partition.Range->IsInclusive || partition.Range->IsPoint, + range.InclusiveFrom || range.Point, KeyDesc->KeyColumnTypes + ); + + return (compares < 0); + } + ); + + Y_ABORT_UNLESS(it != KeyDesc->GetPartitions().end()); + return it->ShardId; } void Handle(TEvWorker::TEvData::TPtr& ev) { - Worker = ev->Sender; - // TODO: enqueue records + Y_ABORT_UNLESS(PendingRecords.empty()); + + TVector records(::Reserve(ev->Get()->Records.size())); + for (auto& record : ev->Get()->Records) { + records.emplace_back(record.Offset, PathId, record.Data.size()); + auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilder() + .WithBody(std::move(record.Data)) + .WithSchema(Schema) + .Build() + ); + Y_ABORT_UNLESS(res.second); + } + + EnqueueRecords(std::move(records)); } void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { - Y_UNUSED(ev); - // TODO: send records - } + TVector records(::Reserve(ev->Get()->Records.size())); + for (const auto& record : ev->Get()->Records) { + auto it = PendingRecords.find(record.Order); + Y_ABORT_UNLESS(it != PendingRecords.end()); + records.emplace_back(it->second); + } - void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { - ProcessRecords(std::move(ev->Get()->Records)); + ProcessRecords(std::move(records)); } - void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { - ForgetRecords(std::move(ev->Get()->Records)); + void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) { + for (const auto& record : ev->Get()->Records) { + PendingRecords.erase(record); + } + + if (PendingRecords.empty()) { + Send(Worker, new TEvWorker::TEvPoll()); + } } void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { @@ -251,8 +413,7 @@ class TLocalTableWriter hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvData, Handle); hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); - hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); - hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle); hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); @@ -266,8 +427,11 @@ class TLocalTableWriter TActorId Worker; ui64 TableVersion = 0; THolder KeyDesc; + TLightweightSchema::TCPtr Schema; bool Resolving = false; + TMap PendingRecords; + }; // TLocalTableWriter IActor* CreateLocalTableWriter(const TPathId& tablePathId) {