diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f9bf6ad31f60..97f89dc978a2 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents { ES_HEALTH_CHECK, ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212 ES_YQ, // 4213 - ES_CHANGE_EXCHANGE, + ES_CHANGE_EXCHANGE_DATASHARD, ES_DATABASE_SERVICE, //4215 ES_SEQUENCESHARD, // 4216 ES_SEQUENCEPROXY, // 4217 @@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents { ES_PQ_PARTITION_CHOOSER, ES_GRAPH, ES_REPLICATION_SERVICE, + ES_CHANGE_EXCHANGE, }; }; diff --git a/ydb/core/change_exchange/change_exchange.cpp b/ydb/core/change_exchange/change_exchange.cpp new file mode 100644 index 000000000000..92df9b3cea72 --- /dev/null +++ b/ydb/core/change_exchange/change_exchange.cpp @@ -0,0 +1,125 @@ +#include "change_exchange.h" + +#include +#include + +namespace NKikimr::NChangeExchange { + +/// TEvEnqueueRecords +TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvEnqueueRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize) + : Order(order) + , PathId(pathId) + , BodySize(bodySize) +{ +} + +void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const { + out << "{" + << " Order: " << Order + << " PathId: " << PathId + << " BodySize: " << BodySize + << " }"; +} + +/// TEvRequestRecords +TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRequestRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize) + : Order(order) + , BodySize(bodySize) +{ +} + +bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const { + return Order < rhs.Order; +} + +void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const { + out << "{" + << " Order: " << Order + << " BodySize: " << BodySize + << " }"; +} + +/// TEvRemoveRecords +TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRemoveRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +/// TEvRecords +TEvChangeExchange::TEvRecords::TEvRecords(const TVector& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvRecords::TEvRecords(TVector&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +/// TEvForgetRecords +TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector& records) + : Records(records) +{ +} + +TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector&& records) + : Records(std::move(records)) +{ +} + +TString TEvChangeExchange::TEvForgetRecords::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Records [" << JoinSeq(",", Records) << "]" + << " }"; +} + +} diff --git a/ydb/core/change_exchange/change_exchange.h b/ydb/core/change_exchange/change_exchange.h new file mode 100644 index 000000000000..9cd000b4c23d --- /dev/null +++ b/ydb/core/change_exchange/change_exchange.h @@ -0,0 +1,101 @@ +#pragma once + +#include "change_record.h" + +#include +#include +#include + +#include + +namespace NKikimr::NChangeExchange { + +struct TEvChangeExchange { + enum EEv { + // Enqueue for sending + EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE), + // Request change record(s) by id + EvRequestRecords, + // Change record(s) + EvRecords, + // Remove change record(s) from local database + EvRemoveRecords, + // Already removed records that the sender should forget about + EvForgetRecods, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE)); + + struct TEvEnqueueRecords: public TEventLocal { + struct TRecordInfo { + ui64 Order; + TPathId PathId; + ui64 BodySize; + + TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize); + + void Out(IOutputStream& out) const; + }; + + TVector Records; + + explicit TEvEnqueueRecords(const TVector& records); + explicit TEvEnqueueRecords(TVector&& records); + TString ToString() const override; + }; + + struct TEvRequestRecords: public TEventLocal { + struct TRecordInfo { + ui64 Order; + ui64 BodySize; + + TRecordInfo(ui64 order, ui64 bodySize = 0); + + bool operator<(const TRecordInfo& rhs) const; + void Out(IOutputStream& out) const; + }; + + TVector Records; + + explicit TEvRequestRecords(const TVector& records); + explicit TEvRequestRecords(TVector&& records); + TString ToString() const override; + }; + + struct TEvRemoveRecords: public TEventLocal { + TVector Records; + + explicit TEvRemoveRecords(const TVector& records); + explicit TEvRemoveRecords(TVector&& records); + TString ToString() const override; + }; + + struct TEvRecords: public TEventLocal { + TVector Records; + + explicit TEvRecords(const TVector& records); + explicit TEvRecords(TVector&& records); + TString ToString() const override; + }; + + struct TEvForgetRecords: public TEventLocal { + TVector Records; + + explicit TEvForgetRecords(const TVector& records); + explicit TEvForgetRecords(TVector&& records); + TString ToString() const override; + }; + +}; // TEvChangeExchange + +} + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) { + return x.Out(o); +} + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) { + return x.Out(o); +} diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index 1ba88203fef7..a00055ba545c 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -1,11 +1,17 @@ LIBRARY() SRCS( + change_exchange.cpp change_record.cpp ) GENERATE_ENUM_SERIALIZATION(change_record.h) +PEERDIR( + ydb/core/base + ydb/core/scheme +) + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/core/tx/datashard/change_exchange.cpp b/ydb/core/tx/datashard/change_exchange.cpp index 99c7f6cec7cf..752667384f3f 100644 --- a/ydb/core/tx/datashard/change_exchange.cpp +++ b/ydb/core/tx/datashard/change_exchange.cpp @@ -1,127 +1,8 @@ #include "change_exchange.h" #include -#include -namespace NKikimr { -namespace NDataShard { - -/// TEvEnqueueRecords -TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvEnqueueRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize) - : Order(order) - , PathId(pathId) - , BodySize(bodySize) -{ -} - -void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const { - out << "{" - << " Order: " << Order - << " PathId: " << PathId - << " BodySize: " << BodySize - << " }"; -} - -/// TEvRequestRecords -TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRequestRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize) - : Order(order) - , BodySize(bodySize) -{ -} - -bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const { - return Order < rhs.Order; -} - -void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const { - out << "{" - << " Order: " << Order - << " BodySize: " << BodySize - << " }"; -} - -/// TEvRemoveRecords -TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRemoveRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -/// TEvRecords -TEvChangeExchange::TEvRecords::TEvRecords(const TVector& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvRecords::TEvRecords(TVector&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} - -/// TEvForgetRecords -TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector& records) - : Records(records) -{ -} - -TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector&& records) - : Records(std::move(records)) -{ -} - -TString TEvChangeExchange::TEvForgetRecords::ToString() const { - return TStringBuilder() << ToStringHeader() << " {" - << " Records [" << JoinSeq(",", Records) << "]" - << " }"; -} +namespace NKikimr::NDataShard { /// TEvAddSender TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId) @@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const { << " }"; } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h index 226a5125f5d8..548fe14e3163 100644 --- a/ydb/core/tx/datashard/change_exchange.h +++ b/ydb/core/tx/datashard/change_exchange.h @@ -2,14 +2,13 @@ #include "defs.h" -#include #include +#include #include #include -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { class TDataShard; @@ -17,7 +16,7 @@ struct TEvChangeExchange { enum EEv { /// Network exchange protocol // Handshake between sender & receiver - EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE), + EvHandshake = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD), // Apply change record(s) on receiver EvApplyRecords, // Handshake & application status @@ -27,30 +26,17 @@ struct TEvChangeExchange { EvActivateSenderAck, /// Local exchange (mostly using change's id) - // Enqueue for sending - EvEnqueueRecords, - // Request change record(s) by id - EvRequestRecords, - // Change record(s) - EvRecords, - // Remove change record(s) from local database - EvRemoveRecords, - // Add new change sender EvAddSender, // Remove existing change sender EvRemoveSender, - - // Already removed records that the sender should forget about - EvForgetRecods, - // Split/merge EvSplitAck, EvEnd, }; - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE)); + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE_DATASHARD)); /// Network events struct TEvHandshake: public TEventPB {}; @@ -60,66 +46,6 @@ struct TEvChangeExchange { struct TEvActivateSenderAck: public TEventPB {}; /// Local events - struct TEvEnqueueRecords: public TEventLocal { - struct TRecordInfo { - ui64 Order; - TPathId PathId; - ui64 BodySize; - - TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize); - - void Out(IOutputStream& out) const; - }; - - TVector Records; - - explicit TEvEnqueueRecords(const TVector& records); - explicit TEvEnqueueRecords(TVector&& records); - TString ToString() const override; - }; - - struct TEvRequestRecords: public TEventLocal { - struct TRecordInfo { - ui64 Order; - ui64 BodySize; - - TRecordInfo(ui64 order, ui64 bodySize = 0); - - bool operator<(const TRecordInfo& rhs) const; - void Out(IOutputStream& out) const; - }; - - TVector Records; - - explicit TEvRequestRecords(const TVector& records); - explicit TEvRequestRecords(TVector&& records); - TString ToString() const override; - }; - - struct TEvRemoveRecords: public TEventLocal { - TVector Records; - - explicit TEvRemoveRecords(const TVector& records); - explicit TEvRemoveRecords(TVector&& records); - TString ToString() const override; - }; - - struct TEvRecords: public TEventLocal { - TVector Records; - - explicit TEvRecords(const TVector& records); - explicit TEvRecords(TVector&& records); - TString ToString() const override; - }; - - struct TEvForgetRecords: public TEventLocal { - TVector Records; - - explicit TEvForgetRecords(const TVector& records); - explicit TEvForgetRecords(TVector&& records); - TString ToString() const override; - }; - enum class ESenderType { AsyncIndex, CdcStream, @@ -149,13 +75,4 @@ struct TEvChangeExchange { IActor* CreateChangeSender(const TDataShard* self); IActor* CreateChangeExchangeSplit(const TDataShard* self, const TVector& dstDataShards); -} // NDataShard -} // NKikimr - -Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) { - return x.Out(o); -} - -Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) { - return x.Out(o); } diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index 825ffe90806d..1f617d252e40 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -3,12 +3,12 @@ #include "change_sender_monitoring.h" #include "datashard_impl.h" -#include - +#include #include #include #include #include +#include #include #include @@ -19,7 +19,7 @@ namespace NKikimr::NDataShard { class TChangeSender: public TActor { using ESenderType = TEvChangeExchange::ESenderType; - using TEnqueuedRecord = TEvChangeExchange::TEvEnqueueRecords::TRecordInfo; + using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo; struct TSender { TTableId UserTableId; @@ -67,7 +67,7 @@ class TChangeSender: public TActor { sender.ActorId = RegisterChangeSender(pathId, sender.UserTableId, sender.Type); } - void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto& records = ev->Get()->Records; @@ -92,11 +92,11 @@ class TChangeSender: public TActor { } for (auto& [to, records] : forward) { - Send(to, new TEvChangeExchange::TEvEnqueueRecords(std::move(records))); + Send(to, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(records))); } if (remove) { - Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } } @@ -289,7 +289,7 @@ class TChangeSender: public TActor { STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); hFunc(TEvChangeExchange::TEvAddSender, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 9ba3b7c03782..d8356013456e 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -115,13 +115,13 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrappedGetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); default: return StateBase(ev); } } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto records = MakeHolder(); @@ -738,17 +738,17 @@ class TAsyncIndexChangeSenderMain return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IndexTablePathId, TagMap); } - void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); EnqueueRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ProcessRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ForgetRecords(std::move(ev->Get()->Records)); } @@ -771,7 +771,7 @@ class TAsyncIndexChangeSenderMain PassAway(); } - void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); RemoveRecords(std::move(ev->Get()->Records)); } @@ -804,9 +804,9 @@ class TAsyncIndexChangeSenderMain STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); - hFunc(TEvChangeExchange::TEvRecords, Handle); - hFunc(TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); hFunc(TEvChangeExchangePrivate::TEvReady, Handle); hFunc(TEvChangeExchangePrivate::TEvGone, Handle); @@ -817,7 +817,7 @@ class TAsyncIndexChangeSenderMain STFUNC(StatePendingRemove) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 0d82b1eedf0c..60e107569e6d 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -83,14 +83,14 @@ class TCdcChangeSenderPartition: public TActorBootstrappedGetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); sFunc(TEvPartitionWriter::TEvWriteResponse, Lost); default: return StateBase(ev); } } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); NKikimrClient::TPersQueueRequest request; @@ -703,17 +703,17 @@ class TCdcChangeSenderMain return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId, Stream); } - void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); EnqueueRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ProcessRecords(std::move(ev->Get()->Records)); } - void Handle(TEvChangeExchange::TEvForgetRecords::TPtr& ev) { + void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); ForgetRecords(std::move(ev->Get()->Records)); } @@ -736,7 +736,7 @@ class TCdcChangeSenderMain PassAway(); } - void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + void AutoRemove(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); RemoveRecords(std::move(ev->Get()->Records)); } @@ -768,9 +768,9 @@ class TCdcChangeSenderMain STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); - hFunc(TEvChangeExchange::TEvRecords, Handle); - hFunc(TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); hFunc(TEvChangeExchangePrivate::TEvReady, Handle); hFunc(TEvChangeExchangePrivate::TEvGone, Handle); @@ -781,7 +781,7 @@ class TCdcChangeSenderMain STFUNC(StatePendingRemove) { switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, AutoRemove); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index d46d467daaa1..1c57d3c17a7a 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -82,7 +82,7 @@ void TBaseChangeSender::KillSenders() { } } -void TBaseChangeSender::EnqueueRecords(TVector&& records) { +void TBaseChangeSender::EnqueueRecords(TVector&& records) { for (auto& record : records) { Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id" << ": expected# " << PathId @@ -117,7 +117,7 @@ bool TBaseChangeSender::RequestRecords() { return false; } - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRequestRecords(std::move(records))); + ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records))); return true; } @@ -290,7 +290,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { } Y_ABORT_UNLESS(sender.ActorId); - ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); + ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); } void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 45351ee5af33..6b418ebf7d89 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -4,7 +4,7 @@ #include "change_exchange_helpers.h" #include - +#include #include #include #include @@ -61,7 +61,7 @@ class IChangeSender { virtual IActor* CreateSender(ui64 partitionId) = 0; virtual void RemoveRecords() = 0; - virtual void EnqueueRecords(TVector&& records) = 0; + virtual void EnqueueRecords(TVector&& records) = 0; virtual void ProcessRecords(TVector&& records) = 0; virtual void ForgetRecords(TVector&& records) = 0; virtual void OnReady(ui64 partitionId) = 0; @@ -79,8 +79,8 @@ class IChangeSenderResolver { }; class TBaseChangeSender: public IChangeSender { - using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; - using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; struct TSender { TActorId ActorId; @@ -124,19 +124,19 @@ class TBaseChangeSender: public IChangeSender { remove.push_back(record.Order); } - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } template <> void RemoveRecords(TVector&& records) { - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); + ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records))); } void CreateSenders(const TVector& partitionIds, bool partitioningChanged = true) override; void KillSenders() override; void RemoveRecords() override; - void EnqueueRecords(TVector&& records) override; + void EnqueueRecords(TVector&& records) override; void ProcessRecords(TVector&& records) override; void ForgetRecords(TVector&& records) override; void OnReady(ui64 partitionId) override; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index c251597f7ce7..dcc2cec7abba 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -939,7 +939,7 @@ void TDataShard::EnqueueChangeRecords(TVectorTimeProvider->Now(); - TVector forward(Reserve(records.size())); + TVector forward(Reserve(records.size())); for (const auto& record : records) { forward.emplace_back(record.Order, record.PathId, record.BodySize); @@ -966,7 +966,7 @@ void TDataShard::EnqueueChangeRecords(TVector { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Send " << records.size() << " change records" << ": to# " << to << ", at tablet# " << Self->TabletID()); - ctx.Send(to, new TEvChangeExchange::TEvRecords(std::move(records))); + ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvRecords(std::move(records))); } size_t forgotten = 0; @@ -246,7 +246,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Forget " << records.size() << " change records" << ": to# " << to << ", at tablet# " << Self->TabletID()); - ctx.Send(to, new TEvChangeExchange::TEvForgetRecords(std::move(records))); + ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvForgetRecords(std::move(records))); } size_t left = Accumulate(Self->ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) { @@ -408,7 +408,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase }; // TTxChangeExchangeSplitAck /// Request -void TDataShard::Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) { +void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx) { ChangeRecordsRequested[ev->Sender].insert(ev->Get()->Records.begin(), ev->Get()->Records.end()); SetCounter(COUNTER_CHANGE_QUEUE_SIZE, Accumulate(ChangeRecordsRequested, (size_t)0, [](size_t sum, const auto& kv) { return sum + kv.second.size(); @@ -428,7 +428,7 @@ void TDataShard::Handle(TEvPrivate::TEvRequestChangeRecords::TPtr&, const TActor } /// Remove -void TDataShard::Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) { +void TDataShard::Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx) { ChangeRecordsToRemove.insert(ev->Get()->Records.begin(), ev->Get()->Records.end()); ScheduleRemoveChangeRecords(ctx); } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2d5ff957122f..872410ac2c18 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -1304,8 +1305,8 @@ class TDataShard void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); // change sending - void Handle(TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx); - void Handle(TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx); + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev, const TActorContext& ctx); + void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev, const TActorContext& ctx); void ScheduleRequestChangeRecords(const TActorContext& ctx); void ScheduleRemoveChangeRecords(const TActorContext& ctx); void Handle(TEvPrivate::TEvRequestChangeRecords::TPtr& ev, const TActorContext& ctx); @@ -2729,7 +2730,7 @@ class TDataShard } }; - using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; + using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo; // split/merge TChangeSenderActivator ChangeSenderActivator; @@ -2972,8 +2973,8 @@ class TDataShard HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted); IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable); - HFunc(TEvChangeExchange::TEvRequestRecords, Handle); - HFunc(TEvChangeExchange::TEvRemoveRecords, Handle); + HFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + HFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle); HFunc(TEvPrivate::TEvRequestChangeRecords, Handle); HFunc(TEvPrivate::TEvRemoveChangeRecords, Handle); HFunc(TEvChangeExchange::TEvHandshake, Handle); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index e370a9b9a4dc..a744dbfeb5f0 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -218,20 +218,20 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { runtime.SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get()->Records) { enqueued.insert(record.Order); } break; - case TEvChangeExchange::EvRequestRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvRequestRecords: + for (const auto& record : ev->Get()->Records) { requested.insert(record.Order); } break; - case TEvChangeExchange::EvRemoveRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get()->Records) { removed.insert(record); } break; @@ -307,14 +307,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { return TTestActorRuntime::EEventAction::PROCESS; } - case TEvChangeExchange::EvEnqueueRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get()->Records) { enqueued.insert(record.Order); } break; - case TEvChangeExchange::EvRemoveRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get()->Records) { removed.insert(record); } break; @@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { bool inited = false; runtime.SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: delayed.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -416,14 +416,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { THashSet removed; runtime.SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: + for (const auto& record : ev->Get()->Records) { enqueued.insert(record.Order); } break; - case TEvChangeExchange::EvRemoveRecords: - for (const auto& record : ev->Get()->Records) { + case NChangeExchange::TEvChangeExchange::EvRemoveRecords: + for (const auto& record : ev->Get()->Records) { removed.insert(record); } break; @@ -481,7 +481,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { runtime.SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { runtime.SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -1886,7 +1886,7 @@ Y_UNIT_TEST_SUITE(Cdc) { env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2143,7 +2143,7 @@ Y_UNIT_TEST_SUITE(Cdc) { env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2240,7 +2240,7 @@ Y_UNIT_TEST_SUITE(Cdc) { env.GetServer()->GetRuntime()->SetObserverFunc([&](TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: if (preventEnqueueing || (preventEnqueueingOnSpecificSender && *preventEnqueueingOnSpecificSender == ev->Recipient)) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2432,7 +2432,7 @@ Y_UNIT_TEST_SUITE(Cdc) { TVector> enqueued; auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; } @@ -2474,8 +2474,8 @@ Y_UNIT_TEST_SUITE(Cdc) { THashSet enqueued; runtime.SetObserverFunc([&](TAutoPtr& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchange::EvEnqueueRecords) { - for (const auto& record : ev->Get()->Records) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { + for (const auto& record : ev->Get()->Records) { enqueued.insert(record.Order); } @@ -2506,8 +2506,8 @@ Y_UNIT_TEST_SUITE(Cdc) { THashSet removed; runtime.SetObserverFunc([&](TAutoPtr& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchange::EvRemoveRecords) { - for (const auto& record : ev->Get()->Records) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvRemoveRecords) { + for (const auto& record : ev->Get()->Records) { removed.insert(record); } } @@ -2743,7 +2743,7 @@ Y_UNIT_TEST_SUITE(Cdc) { } break; - case TEvChangeExchange::EvEnqueueRecords: + case NChangeExchange::TEvChangeExchange::EvEnqueueRecords: delayed.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; diff --git a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp index 0d05b12d68ce..18bd1d0a45db 100644 --- a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp +++ b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp @@ -1,6 +1,6 @@ #include +#include #include -#include #include #include #include @@ -371,7 +371,7 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) { TVector> enqueued; runtime.SetObserverFunc([&](TAutoPtr& ev) { - if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchange::EvEnqueueRecords) { enqueued.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; }