diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp similarity index 95% rename from ydb/core/tx/datashard/change_sender_common_ops.cpp rename to ydb/core/change_exchange/change_sender_common_ops.cpp index 1c57d3c17a7a..c3887dfec623 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -1,13 +1,15 @@ #include "change_sender_common_ops.h" #include "change_sender_monitoring.h" +#include + #include #include #include #include -namespace NKikimr::NDataShard { +namespace NKikimr::NChangeExchange { void TBaseChangeSender::LazyCreateSender(THashMap& senders, ui64 partitionId) { auto res = senders.emplace(partitionId, TSender{}); @@ -82,7 +84,7 @@ void TBaseChangeSender::KillSenders() { } } -void TBaseChangeSender::EnqueueRecords(TVector&& records) { +void TBaseChangeSender::EnqueueRecords(TVector&& records) { for (auto& record : records) { Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id" << ": expected# " << PathId @@ -117,11 +119,11 @@ bool TBaseChangeSender::RequestRecords() { return false; } - ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records))); + ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records))); return true; } -void TBaseChangeSender::ProcessRecords(TVector&& records) { +void TBaseChangeSender::ProcessRecords(TVector&& records) { for (auto& record : records) { auto it = PendingBody.find(record->GetOrder()); if (it == PendingBody.end()) { @@ -290,7 +292,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { } Y_ABORT_UNLESS(sender.ActorId); - ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); + ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); } void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { @@ -306,7 +308,7 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { } } -TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record) { +TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(IChangeRecord::TPtr record) { Y_ABORT_UNLESS(record->IsBroadcast()); auto it = Broadcasting.find(record->GetOrder()); @@ -430,17 +432,17 @@ void TBaseChangeSender::RemoveRecords() { } TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, - const TDataShardId& dataShard, const TPathId& pathId) + const TActorId& changeServer, const TPathId& pathId) : ActorOps(actorOps) , Resolver(resolver) - , DataShard(dataShard) + , ChangeServer(changeServer) , PathId(pathId) , MemLimit(192_KB) , MemUsage(0) { } -void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, +void TBaseChangeSender::RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { const auto& cgi = ev->Get()->Cgi(); @@ -468,7 +470,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TStringStream html; HTML(html) { - Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId); + Header(html, "Change sender", tabletId); SimplePanel(html, "Info", [this](IOutputStream& html) { HTML(html) { @@ -479,7 +481,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon } }); - SimplePanel(html, "Partition senders", [this](IOutputStream& html) { + SimplePanel(html, "Partition senders", [this, tabletId](IOutputStream& html) { HTML(html) { TABLE_CLASS("table table-hover") { TABLEHEAD() { @@ -503,7 +505,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TABLED() { html << sender.Pending.size(); } TABLED() { html << sender.Prepared.size(); } TABLED() { html << sender.Broadcasting.size(); } - TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); } + TABLED() { ActorLink(html, tabletId, PathId, partitionId); } } } } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h similarity index 72% rename from ydb/core/tx/datashard/change_sender_common_ops.h rename to ydb/core/change_exchange/change_sender_common_ops.h index 6b418ebf7d89..fe516176a80f 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -1,10 +1,7 @@ #pragma once #include "change_exchange.h" -#include "change_exchange_helpers.h" -#include -#include #include #include #include @@ -14,8 +11,7 @@ #include #include -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NChangeExchange { struct TEvChangeExchangePrivate { enum EEv { @@ -61,8 +57,8 @@ class IChangeSender { virtual IActor* CreateSender(ui64 partitionId) = 0; virtual void RemoveRecords() = 0; - virtual void EnqueueRecords(TVector&& records) = 0; - virtual void ProcessRecords(TVector&& records) = 0; + virtual void EnqueueRecords(TVector&& records) = 0; + virtual void ProcessRecords(TVector&& records) = 0; virtual void ForgetRecords(TVector&& records) = 0; virtual void OnReady(ui64 partitionId) = 0; virtual void OnGone(ui64 partitionId) = 0; @@ -75,18 +71,18 @@ class IChangeSenderResolver { virtual void Resolve() = 0; virtual bool IsResolving() const = 0; virtual bool IsResolved() const = 0; - virtual ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const = 0; + virtual ui64 GetPartitionId(IChangeRecord::TPtr record) const = 0; }; class TBaseChangeSender: public IChangeSender { - using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; - using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; struct TSender { TActorId ActorId; bool Ready = false; TVector Pending; - TVector Prepared; + TVector Prepared; TVector Broadcasting; }; @@ -108,7 +104,7 @@ class TBaseChangeSender: public IChangeSender { void SendPreparedRecords(ui64 partitionId); void ReEnqueueRecords(const TSender& sender); - TBroadcast& EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record); + TBroadcast& EnsureBroadcast(IChangeRecord::TPtr record); bool AddBroadcastPartition(ui64 order, ui64 partitionId); bool RemoveBroadcastPartition(ui64 order, ui64 partitionId); bool CompleteBroadcastPartition(ui64 order, ui64 partitionId); @@ -124,35 +120,35 @@ class TBaseChangeSender: public IChangeSender { remove.push_back(record.Order); } - ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } template <> void RemoveRecords(TVector&& records) { - ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records))); + ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); } void CreateSenders(const TVector& partitionIds, bool partitioningChanged = true) override; void KillSenders() override; void RemoveRecords() override; - void EnqueueRecords(TVector&& records) override; - void ProcessRecords(TVector&& records) override; + void EnqueueRecords(TVector&& records) override; + void ProcessRecords(TVector&& records) override; void ForgetRecords(TVector&& records) override; void OnReady(ui64 partitionId) override; void OnGone(ui64 partitionId) override; explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, - const TDataShardId& dataShard, const TPathId& pathId); + const TActorId& changeServer, const TPathId& pathId); - void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx); + void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx); private: IActorOps* const ActorOps; IChangeSenderResolver* const Resolver; protected: - const TDataShardId DataShard; + const TActorId ChangeServer; const TPathId PathId; private: @@ -162,12 +158,11 @@ class TBaseChangeSender: public IChangeSender { THashMap Senders; // ui64 is partition id TSet Enqueued; TSet PendingBody; - TMap PendingSent; // ui64 is order + TMap PendingSent; // ui64 is order THashMap Broadcasting; // ui64 is order TVector GonePartitions; }; // TBaseChangeSender -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_sender_monitoring.cpp b/ydb/core/change_exchange/change_sender_monitoring.cpp similarity index 98% rename from ydb/core/tx/datashard/change_sender_monitoring.cpp rename to ydb/core/change_exchange/change_sender_monitoring.cpp index 9cc4ba1b6bc3..9671bb787b57 100644 --- a/ydb/core/tx/datashard/change_sender_monitoring.cpp +++ b/ydb/core/change_exchange/change_sender_monitoring.cpp @@ -5,7 +5,7 @@ #include #include -namespace NKikimr::NDataShard { +namespace NKikimr::NChangeExchange { void Panel(IOutputStream& str, std::function title, std::function body) { HTML(str) { diff --git a/ydb/core/tx/datashard/change_sender_monitoring.h b/ydb/core/change_exchange/change_sender_monitoring.h similarity index 97% rename from ydb/core/tx/datashard/change_sender_monitoring.h rename to ydb/core/change_exchange/change_sender_monitoring.h index fe9bf40b9a91..83205bd0e70a 100644 --- a/ydb/core/tx/datashard/change_sender_monitoring.h +++ b/ydb/core/change_exchange/change_sender_monitoring.h @@ -6,7 +6,7 @@ #include -namespace NKikimr::NDataShard { +namespace NKikimr::NChangeExchange { template static void Link(IOutputStream& str, const TStringBuf path, const T& title) { diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index a00055ba545c..6e82179d832c 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -3,6 +3,8 @@ LIBRARY() SRCS( change_exchange.cpp change_record.cpp + change_sender_common_ops.cpp + change_sender_monitoring.cpp ) GENERATE_ENUM_SERIALIZATION(change_record.h) @@ -10,6 +12,9 @@ GENERATE_ENUM_SERIALIZATION(change_record.h) PEERDIR( ydb/core/base ydb/core/scheme + ydb/library/actors/core + ydb/library/yverify_stream + library/cpp/monlib/service/pages ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index 0bcd44390edf..93e6d68a3b8d 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -1,6 +1,5 @@ #include "change_exchange.h" #include "change_exchange_helpers.h" -#include "change_sender_common_ops.h" #include "datashard_impl.h" #include diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index 1f617d252e40..6150c00d0be6 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -1,9 +1,9 @@ #include "change_exchange.h" #include "change_exchange_impl.h" -#include "change_sender_monitoring.h" #include "datashard_impl.h" #include +#include #include #include #include @@ -165,6 +165,8 @@ class TChangeSender: public TActor { } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { + using namespace NChangeExchange; + const auto& cgi = ev->Get()->Cgi(); if (const auto& str = cgi.Get("pathId")) { if (const auto& pathId = ParsePathId(str)) { diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index d8356013456e..98a225f55434 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -1,16 +1,16 @@ #include "change_exchange.h" #include "change_exchange_impl.h" #include "change_record.h" -#include "change_sender_common_ops.h" -#include "change_sender_monitoring.h" #include "datashard_impl.h" #include -#include +#include +#include #include #include #include #include +#include #include #include @@ -107,7 +107,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped - , public TBaseChangeSender - , public IChangeSenderResolver + , public NChangeExchange::TBaseChangeSender + , public NChangeExchange::IChangeSenderResolver , private NSchemeCache::TSchemeCacheHelpers { TStringBuf GetLogPrefix() const { @@ -753,12 +755,12 @@ class TAsyncIndexChangeSenderMain ForgetRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchangePrivate::TEvReady::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnReady(ev->Get()->PartitionId); } - void Handle(TEvChangeExchangePrivate::TEvGone::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnGone(ev->Get()->PartitionId); } @@ -777,7 +779,7 @@ class TAsyncIndexChangeSenderMain } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { - RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx); + RenderHtmlPage(DataShard.TabletId, ev, ctx); } void PassAway() override { @@ -792,7 +794,8 @@ class TAsyncIndexChangeSenderMain explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, dataShard, indexPathId) + , TBaseChangeSender(this, this, dataShard.ActorId, indexPathId) + , DataShard(dataShard) , UserTableId(userTableId) , IndexTableVersion(0) { @@ -808,8 +811,8 @@ class TAsyncIndexChangeSenderMain hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); - hFunc(TEvChangeExchangePrivate::TEvReady, Handle); - hFunc(TEvChangeExchangePrivate::TEvGone, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } @@ -825,6 +828,7 @@ class TAsyncIndexChangeSenderMain } private: + const TDataShardId DataShard; const TTableId UserTableId; mutable TMaybe LogPrefix; diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 60e107569e6d..111f1c74c690 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -2,10 +2,10 @@ #include "change_exchange_impl.h" #include "change_record.h" #include "change_record_cdc_serializer.h" -#include "change_sender_common_ops.h" -#include "change_sender_monitoring.h" #include "datashard_user_table.h" +#include +#include #include #include #include @@ -16,6 +16,7 @@ #include #include #include + #include namespace NKikimr::NDataShard { @@ -75,7 +76,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped - , public TBaseChangeSender - , public IChangeSenderResolver + , public NChangeExchange::TBaseChangeSender + , public NChangeExchange::IChangeSenderResolver , private NSchemeCache::TSchemeCacheHelpers { struct TPQPartitionInfo { @@ -718,12 +721,12 @@ class TCdcChangeSenderMain ForgetRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchangePrivate::TEvReady::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnReady(ev->Get()->PartitionId); } - void Handle(TEvChangeExchangePrivate::TEvGone::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnGone(ev->Get()->PartitionId); } @@ -742,7 +745,7 @@ class TCdcChangeSenderMain } void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { - RenderHtmlPage(ESenderType::CdcStream, ev, ctx); + RenderHtmlPage(DataShard.TabletId, ev, ctx); } void PassAway() override { @@ -757,7 +760,8 @@ class TCdcChangeSenderMain explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, dataShard, streamPathId) + , TBaseChangeSender(this, this, dataShard.ActorId, streamPathId) + , DataShard(dataShard) , TopicVersion(0) { } @@ -772,8 +776,8 @@ class TCdcChangeSenderMain hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); - hFunc(TEvChangeExchangePrivate::TEvReady, Handle); - hFunc(TEvChangeExchangePrivate::TEvGone, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } @@ -789,6 +793,7 @@ class TCdcChangeSenderMain } private: + const TDataShardId DataShard; mutable TMaybe LogPrefix; TUserTable::TCdcStream Stream; diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index cefaa714b0fc..5cc688f117c8 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -26,8 +26,6 @@ SRCS( change_sender.cpp change_sender_async_index.cpp change_sender_cdc_stream.cpp - change_sender_common_ops.cpp - change_sender_monitoring.cpp check_commit_writes_tx_unit.cpp check_data_tx_unit.cpp check_distributed_erase_tx_unit.cpp