From 0f2515d69efee1b6074fc30f6a134750b1fcff61 Mon Sep 17 00:00:00 2001 From: Ildar Khisambeev Date: Mon, 18 Dec 2023 10:51:09 +0000 Subject: [PATCH 1/6] Propose read policy settings for fed sdk api --- .../ydb_federated_topic/federated_topic.h | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index 121097e39d03..a748ec32a749 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -368,18 +368,25 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! See description in TFederatedEventHandlers class. FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers); - enum class EReadPolicy { - READ_ALL = 0, - READ_ORIGINAL, - READ_MIRRORED + //! Default variant. + //! Read original topics specified in NTopic::TReadSessionSettings::Topics from all specified databases. + struct TReadOriginal { + //! Empty vector means read from all available databases; + //! Vector {"local"} means read only from local database, which is determined by client location; + //! Otherwise read from specified databases if available. + std::vector Databases; }; - //! Policy for federated reading. - //! - //! READ_ALL: read will be done from all topic instances from all databases. - //! READ_ORIGINAL: - //! READ_MIRRORED: - FLUENT_SETTING_DEFAULT(EReadPolicy, ReadPolicy, EReadPolicy::READ_ALL); + //! Read original topics specified in NTopic::TReadSessionSettings::Topics and their mirrors from other databases + //! from one specified database. + struct TReadMirrored { + TString Database; + }; + + using TReadPolicy = std::variant; + + //! Policy for reading original and mirrored topics, see variants above. + FLUENT_SETTING_DEFAULT(TReadPolicy, ReadPolicy, TReadOriginal{}); }; From 3351f283ceb7e1c0eaf3ffb9d0a088d6d7e7e2b3 Mon Sep 17 00:00:00 2001 From: Ildar Khisambeev Date: Tue, 19 Dec 2023 07:00:52 +0000 Subject: [PATCH 2/6] better api --- .../ydb_federated_topic/federated_topic.h | 44 ++++++++++++------- .../impl/federated_topic.cpp | 34 ++++++++++++++ 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index a748ec32a749..8e41e1461a5a 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -273,7 +273,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { bool GracefulStopAfterCommit; }; - //! Set simple handler with data processing and also //! set other handlers with default behaviour. //! They automatically commit data after processing @@ -290,7 +289,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! commitDataAfterProcessing: automatically commit data after calling of dataHandler. //! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming //! partition session destroy. - TSimpleDataHandlers SimpleDataHandlers_; TSelf& SimpleDataHandlers(std::function dataHandler, @@ -368,25 +366,39 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! See description in TFederatedEventHandlers class. FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers); - //! Default variant. - //! Read original topics specified in NTopic::TReadSessionSettings::Topics from all specified databases. - struct TReadOriginal { - //! Empty vector means read from all available databases; - //! Vector {"local"} means read only from local database, which is determined by client location; - //! Otherwise read from specified databases if available. + + //! Read policy settings + + //! Databases to read from. + //! Default (empty) value means reading from all available databases. + //! Adding duplicates or unavailable databases is okay, they will be ignored. + struct TReadOriginalSettings { + //! Add reading from specified database if it's available. + TReadOriginalSettings& AddDatabase(TString database); + + //! Add reading from several specified databases, if available. + TReadOriginalSettings& AddDatabases(std::vector databases); + + //! Add reading from database(s) with the same location as client. + TReadOriginalSettings& AddLocal(); + std::vector Databases; }; - //! Read original topics specified in NTopic::TReadSessionSettings::Topics and their mirrors from other databases - //! from one specified database. - struct TReadMirrored { - TString Database; - }; + //! Default variant. + //! Read original topics specified in NTopic::TReadSessionSettings::Topics from databases, specified in settings. + //! Discards previously set ReadOriginal and ReadMirrored settings. + TSelf& ReadOriginal(TReadOriginalSettings settings); - using TReadPolicy = std::variant; + //! Read original and mirrored topics specified in NTopic::TReadSessionSettings::Topics + //! from one specified database. + //! Discards previously set ReadOriginal and ReadMirrored settings. + TSelf& ReadMirrored(TString database); - //! Policy for reading original and mirrored topics, see variants above. - FLUENT_SETTING_DEFAULT(TReadPolicy, ReadPolicy, TReadOriginal{}); +private: + // Read policy settings, set via helpers above + bool ReadMirroredEnabled = false; + std::vector Databases; }; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp index ca076d602d12..4fc8b9e7f1a2 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp @@ -3,6 +3,40 @@ namespace NYdb::NFederatedTopic { +// TFederatedReadSessionSettings +// Read policy settings + +using TReadOriginalSettings = TFederatedReadSessionSettings::TReadOriginalSettings; +TReadOriginalSettings& TReadOriginalSettings::AddDatabase(TString database) { + Databases.push_back(std::move(database)); + return *this; +} + +TReadOriginalSettings& TReadOriginalSettings::AddDatabases(std::vector databases) { + std::move(std::begin(databases), std::end(databases), std::back_inserter(Databases)); + return *this; +} + +TReadOriginalSettings& TReadOriginalSettings::AddLocal() { + Databases.push_back("_local"); + return *this; +} + +TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadOriginal(TReadOriginalSettings settings) { + std::swap(Databases, settings.Databases); + ReadMirroredEnabled = false; + return *this; +} + +TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(TString database) { + Databases.clear(); + Databases.push_back(std::move(database)); + ReadMirroredEnabled = true; + return *this; +} + +// TFederatedTopicClient + NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings) { return NTopic::TTopicClientSettings() .DefaultCompressionExecutor(settings.DefaultCompressionExecutor_) From c068d172453bac71cefaaca4a0b146b3ff052029 Mon Sep 17 00:00:00 2001 From: Ildar Khisambeev Date: Tue, 19 Dec 2023 18:43:37 +0000 Subject: [PATCH 3/6] implementation --- .../ydb_federated_topic/federated_topic.h | 7 +- .../impl/federated_read_session.cpp | 64 ++++++++++++++++--- .../impl/federated_read_session.h | 4 +- .../impl/federated_topic.cpp | 8 +-- 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index 8e41e1461a5a..bf83964065bc 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -4,6 +4,8 @@ #include +#include + namespace NYdb::NFederatedTopic { using NTopic::TPrintable; @@ -382,7 +384,7 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! Add reading from database(s) with the same location as client. TReadOriginalSettings& AddLocal(); - std::vector Databases; + std::unordered_set Databases; }; //! Default variant. @@ -395,10 +397,9 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! Discards previously set ReadOriginal and ReadMirrored settings. TSelf& ReadMirrored(TString database); -private: // Read policy settings, set via helpers above bool ReadMirroredEnabled = false; - std::vector Databases; + std::unordered_set Databases; }; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index 7563f2c85ae7..aaf8fc949bbc 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -83,9 +83,8 @@ void TFederatedReadSessionImpl::Start() { }); } -void TFederatedReadSessionImpl::OpenSubSessionsImpl() { - for (const auto& db : FederationState->DbInfos) { - // TODO check if available +void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vector>& dbInfos) { + for (const auto& db : dbInfos) { NTopic::TTopicClientSettings settings = SubClientSetttings; settings .Database(db->path()) @@ -102,11 +101,60 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { CloseImpl(); return; } - // 1) compare old info and new info; - // result: list of subsessions to open + list of subsessions to close - // 2) OpenSubSessionsImpl, CloseSubSessionsImpl - OpenSubSessionsImpl(); - // 3) TODO LATER reschedule OnFederatedStateUpdate + if (Settings.ReadMirroredEnabled) { + Y_ABORT_UNLESS(Settings.Databases.size() == 1); + // add -mirrored-from- topics to Settings + + // how to get mirrors in general case??? + std::vector dcNames = {"sas", "vla", "klg", "vlx"}; + auto topics = Settings.Topics_; + for (const auto& topic : topics) { + for (const auto& dc : dcNames) { + auto mirroredTopic = topic; + mirroredTopic.PartitionIds_.clear(); + mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dc); + Settings.AppendTopics(mirroredTopic); + } + } + } + + std::vector> databases; + + for (const auto& db : FederationState->DbInfos) { + if (IsDatabaseEligibleForRead(db)) { + databases.push_back(db); + } + } + + if (databases.empty()) { + CloseImpl(); + return; + } + + OpenSubSessionsImpl(databases); +} + +bool TFederatedReadSessionImpl::IsDatabaseEligibleForRead(const std::shared_ptr& db) { + if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE && + db->status() != TDbInfo::Status::DatabaseInfo_Status_READ_ONLY) { + return false; + } + + if (Settings.Databases.empty()) { + return true; + } + + for (const auto& dbFromSettings : Settings.Databases) { + if (AsciiEqualsIgnoreCase(db->name(), dbFromSettings) || + AsciiEqualsIgnoreCase(db->id(), dbFromSettings)) { + return true; + } + if (dbFromSettings == "_local" && + AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) { + return true; + } + } + return false; } NThreading::TFuture TFederatedReadSessionImpl::WaitEvent() { diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index c0444f15a2b1..791219b5b583 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -51,7 +51,9 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext>& dbInfos); + + bool IsDatabaseEligibleForRead(const std::shared_ptr& db); void OnFederatedStateUpdateImpl(); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp index 4fc8b9e7f1a2..15e2f972c40d 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp @@ -8,17 +8,17 @@ namespace NYdb::NFederatedTopic { using TReadOriginalSettings = TFederatedReadSessionSettings::TReadOriginalSettings; TReadOriginalSettings& TReadOriginalSettings::AddDatabase(TString database) { - Databases.push_back(std::move(database)); + Databases.insert(std::move(database)); return *this; } TReadOriginalSettings& TReadOriginalSettings::AddDatabases(std::vector databases) { - std::move(std::begin(databases), std::end(databases), std::back_inserter(Databases)); + std::move(std::begin(databases), std::end(databases), std::inserter(Databases, Databases.end())); return *this; } TReadOriginalSettings& TReadOriginalSettings::AddLocal() { - Databases.push_back("_local"); + Databases.insert("_local"); return *this; } @@ -30,7 +30,7 @@ TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadOriginal(TRead TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(TString database) { Databases.clear(); - Databases.push_back(std::move(database)); + Databases.insert(std::move(database)); ReadMirroredEnabled = true; return *this; } From d9a994f0f8f8a133bbb9061c15e453d6a3f0d24c Mon Sep 17 00:00:00 2001 From: Ildar Khisambeev Date: Tue, 9 Jan 2024 16:28:05 +0000 Subject: [PATCH 4/6] issues (except logging) --- .../ydb_federated_topic/federated_topic.h | 13 +++++++++- .../impl/federated_read_session.cpp | 25 +++++++++++++------ .../impl/federated_read_session.h | 6 ++--- .../impl/federated_topic.cpp | 9 ++++--- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index bf83964065bc..3ed90dd2cac9 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -4,6 +4,8 @@ #include +#include + #include namespace NYdb::NFederatedTopic { @@ -397,9 +399,18 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! Discards previously set ReadOriginal and ReadMirrored settings. TSelf& ReadMirrored(TString database); + bool IsReadMirroredEnabled() { + return ReadMirroredEnabled; + } + + auto GetDatabasesToReadFrom() { + return DatabasesToReadFrom; + } + +private: // Read policy settings, set via helpers above bool ReadMirroredEnabled = false; - std::unordered_set Databases; + std::unordered_set DatabasesToReadFrom; }; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index aaf8fc949bbc..70761e76cd8f 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -64,6 +64,7 @@ TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSession , Observer(std::move(observer)) , AsyncInit(Observer->WaitForFirstState()) , FederationState(nullptr) + , Log(Connections->GetLog()) , SessionId(CreateGuidAsString()) { } @@ -101,15 +102,17 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { CloseImpl(); return; } - if (Settings.ReadMirroredEnabled) { - Y_ABORT_UNLESS(Settings.Databases.size() == 1); - // add -mirrored-from- topics to Settings + if (Settings.IsReadMirroredEnabled()) { + Y_ABORT_UNLESS(Settings.GetDatabasesToReadFrom().size() == 1); + auto dbToReadFrom = *Settings.GetDatabasesToReadFrom().begin(); - // how to get mirrors in general case??? - std::vector dcNames = {"sas", "vla", "klg", "vlx"}; + std::vector dcNames = GetAllFederationLocations(); auto topics = Settings.Topics_; for (const auto& topic : topics) { for (const auto& dc : dcNames) { + if (AsciiEqualsIgnoreCase(dc, dbToReadFrom)) { + continue; + } auto mirroredTopic = topic; mirroredTopic.PartitionIds_.clear(); mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dc); @@ -134,17 +137,25 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { OpenSubSessionsImpl(databases); } +std::vector TFederatedReadSessionImpl::GetAllFederationLocations() { + std::vector result; + for (const auto& db : FederationState->DbInfos) { + result.push_back(db->location()); + } + return result; +} + bool TFederatedReadSessionImpl::IsDatabaseEligibleForRead(const std::shared_ptr& db) { if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE && db->status() != TDbInfo::Status::DatabaseInfo_Status_READ_ONLY) { return false; } - if (Settings.Databases.empty()) { + if (Settings.GetDatabasesToReadFrom().empty()) { return true; } - for (const auto& dbFromSettings : Settings.Databases) { + for (const auto& dbFromSettings : Settings.GetDatabasesToReadFrom()) { if (AsciiEqualsIgnoreCase(db->name(), dbFromSettings) || AsciiEqualsIgnoreCase(db->id(), dbFromSettings)) { return true; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index 791219b5b583..f1630d396dbb 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -46,13 +46,14 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext>& dbInfos); + std::vector GetAllFederationLocations(); + bool IsDatabaseEligibleForRead(const std::shared_ptr& db); void OnFederatedStateUpdateImpl(); @@ -70,8 +71,7 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext AsyncInit; std::shared_ptr FederationState; - // TODO - // TLog Log; + TLog Log; const TString SessionId; const TInstant StartSessionTime = TInstant::Now(); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp index 15e2f972c40d..75d7abb173bb 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp @@ -23,14 +23,17 @@ TReadOriginalSettings& TReadOriginalSettings::AddLocal() { } TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadOriginal(TReadOriginalSettings settings) { - std::swap(Databases, settings.Databases); + std::swap(DatabasesToReadFrom, settings.Databases); ReadMirroredEnabled = false; return *this; } TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(TString database) { - Databases.clear(); - Databases.insert(std::move(database)); + if (database == "_local") { + ythrow TContractViolation("Reading from local database not supported, use specific database"); + } + DatabasesToReadFrom.clear(); + DatabasesToReadFrom.insert(std::move(database)); ReadMirroredEnabled = true; return *this; } From 9e62913184227d3164aadb07fa4581c2f1629942 Mon Sep 17 00:00:00 2001 From: Ildar Khisambeev Date: Tue, 23 Jan 2024 23:14:58 +0000 Subject: [PATCH 5/6] provide topic origin info --- .../ydb_federated_topic/federated_topic.h | 76 +++++++----- .../impl/federated_read_session.cpp | 46 +++++--- .../impl/federated_read_session.h | 108 +++++++++++++++++- .../impl/federated_read_session_event.cpp | 79 +++++-------- .../impl/federation_observer.h | 13 ++- 5 files changed, 228 insertions(+), 94 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index 3ed90dd2cac9..59575527d996 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -20,10 +20,17 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintable; public: - TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr db) + TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, + std::shared_ptr db, + std::shared_ptr originDb = nullptr, + TString originPath = "") : PartitionSession(partitionSession) - , Db(std::move(db)) - {} + , ReadSourceDatabase(std::move(db)) + , TopicOriginDatabase(originDb ? std::move(originDb) : ReadSourceDatabase) + , TopicOriginPath(originPath ? std::move(originPath) : PartitionSession->GetTopicPath()) + { + Y_ABORT_UNLESS(ReadSourceDatabase); + } //! Request partition session status. //! Result will come to TPartitionSessionStatusEvent. @@ -43,7 +50,7 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintableGetTopicPath(); + return TopicOriginPath; } //! Partition id. @@ -52,33 +59,59 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintablename(); + return GetTopicOriginDatabaseName(); } const TString& GetDatabasePath() const { - return Db->path(); + return GetTopicOriginDatabasePath(); } const TString& GetDatabaseId() const { - return Db->id(); + return GetTopicOriginDatabaseId(); + } + + const TString& GetReadSourceDatabaseName() const { + return ReadSourceDatabase->name(); + } + + const TString& GetReadSourceDatabasePath() const { + return ReadSourceDatabase->path(); + } + + const TString& GetReadSourceDatabaseId() const { + return ReadSourceDatabase->id(); + } + + const TString& GetTopicOriginDatabaseName() const { + return TopicOriginDatabase->name(); + } + + const TString& GetTopicOriginDatabasePath() const { + return TopicOriginDatabase->path(); + } + + const TString& GetTopicOriginDatabaseId() const { + return TopicOriginDatabase->id(); } private: NTopic::TPartitionSession::TPtr PartitionSession; - std::shared_ptr Db; + std::shared_ptr ReadSourceDatabase; + std::shared_ptr TopicOriginDatabase; + TString TopicOriginPath; }; //! Events for read session. struct TReadSessionEvent { class TFederatedPartitionSessionAccessor { public: - TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession) + explicit TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession) : FederatedPartitionSession(std::move(partitionSession)) {} - TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db) - : FederatedPartitionSession(MakeIntrusive(partitionSession, std::move(db))) - {} + // TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db) + // : FederatedPartitionSession(MakeIntrusive(partitionSession, std::move(db))) + // {} inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const { return FederatedPartitionSession; @@ -92,8 +125,8 @@ struct TReadSessionEvent { struct TFederated : public TFederatedPartitionSessionAccessor, public TEvent, public TPrintable> { using TPrintable>::DebugString; - TFederated(TEvent event, std::shared_ptr db) - : TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db) + TFederated(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) + : TFederatedPartitionSessionAccessor(std::move(federatedPartitionSession)) , TEvent(std::move(event)) {} @@ -113,10 +146,10 @@ struct TReadSessionEvent { using TCompressedMessage = TFederated; public: - TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr db); + TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession); - TDataReceivedEvent(TVector messages, TVector compressedMessages, - NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db); + // TDataReceivedEvent(TVector messages, TVector compressedMessages, + // NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db); const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override { ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead"; @@ -181,15 +214,6 @@ struct TReadSessionEvent { TSessionClosedEvent>; }; -template -TReadSessionEvent::TFederated Federate(TEvent event, std::shared_ptr db) { - return {std::move(event), std::move(db)}; -} - -TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr db); - -TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr db); - //! Set of offsets to commit. //! Class that could store offsets in order to commit them later. //! This class is not thread safe. diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index 70761e76cd8f..b662e46628b1 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -3,6 +3,10 @@ #include #include +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + #include #include @@ -11,24 +15,24 @@ namespace NYdb::NFederatedTopic { NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings); template -typename std::function WrapFederatedHandler(std::function outerHandler, std::shared_ptr db) { +typename std::function WrapFederatedHandler(std::function outerHandler, std::shared_ptr db, std::shared_ptr federator) { if (outerHandler) { - return [outerHandler, db = std::move(db)](TEvent& ev) { - auto fev = Federate(std::move(ev), db); + return [outerHandler, db = std::move(db), &federator](TEvent& ev) { + auto fev = federator->LocateFederate(ev, std::move(db)); return outerHandler(fev); }; } return {}; } -NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr& db) { +NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr& db, std::shared_ptr federator) { NTopic::TReadSessionSettings SubsessionSettings = settings; SubsessionSettings.EventHandlers_.MaxMessagesBytes(settings.EventHandlers_.MaxMessagesBytes_); SubsessionSettings.EventHandlers_.HandlersExecutor(settings.EventHandlers_.HandlersExecutor_); #define MAYBE_CONVERT_HANDLER(type, name) \ SubsessionSettings.EventHandlers_.name( \ - WrapFederatedHandler(settings.FederatedEventHandlers_.name##_, db) \ + WrapFederatedHandler(settings.FederatedEventHandlers_.name##_, db, federator) \ ); MAYBE_CONVERT_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler); @@ -46,7 +50,7 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) { SubsessionSettings.EventHandlers_.SimpleDataHandlers( WrapFederatedHandler( - settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db), + settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db, federator), settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing, settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit); } @@ -56,19 +60,24 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings, std::shared_ptr connections, - const TFederatedTopicClientSettings& clientSetttings, + const TFederatedTopicClientSettings& clientSettings, std::shared_ptr observer) : Settings(settings) , Connections(std::move(connections)) - , SubClientSetttings(FromFederated(clientSetttings)) + , SubClientSetttings(FromFederated(clientSettings)) , Observer(std::move(observer)) , AsyncInit(Observer->WaitForFirstState()) , FederationState(nullptr) + , EventFederator(std::make_shared()) , Log(Connections->GetLog()) , SessionId(CreateGuidAsString()) { } +TStringBuilder TFederatedReadSessionImpl::GetLogPrefix() const { + return TStringBuilder() << GetDatabaseLogPrefix(SubClientSetttings.Database_.GetOrElse("")) << "[" << SessionId << "] "; +} + void TFederatedReadSessionImpl::Start() { AsyncInit.Subscribe([selfCtx = SelfContext](const auto& f){ Y_UNUSED(f); @@ -91,7 +100,7 @@ void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vectorpath()) .DiscoveryEndpoint(db->endpoint()); auto subclient = make_shared(Connections, settings); - auto subsession = subclient->CreateReadSession(FromFederated(Settings, db)); + auto subsession = subclient->CreateReadSession(FromFederated(Settings, db, EventFederator)); SubSessions.emplace_back(subsession, db); } SubsessionIndex = 0; @@ -99,23 +108,27 @@ void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vectorStatus.IsSuccess()) { + LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Federated state update failed."); CloseImpl(); return; } + + EventFederator->SetFederationState(FederationState); + if (Settings.IsReadMirroredEnabled()) { Y_ABORT_UNLESS(Settings.GetDatabasesToReadFrom().size() == 1); auto dbToReadFrom = *Settings.GetDatabasesToReadFrom().begin(); - std::vector dcNames = GetAllFederationLocations(); + std::vector dbNames = GetAllFederationDatabaseNames(); auto topics = Settings.Topics_; for (const auto& topic : topics) { - for (const auto& dc : dcNames) { - if (AsciiEqualsIgnoreCase(dc, dbToReadFrom)) { + for (const auto& dbName : dbNames) { + if (AsciiEqualsIgnoreCase(dbName, dbToReadFrom)) { continue; } auto mirroredTopic = topic; mirroredTopic.PartitionIds_.clear(); - mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dc); + mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dbName); Settings.AppendTopics(mirroredTopic); } } @@ -130,6 +143,7 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { } if (databases.empty()) { + LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "No available databases to read."); CloseImpl(); return; } @@ -137,10 +151,10 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { OpenSubSessionsImpl(databases); } -std::vector TFederatedReadSessionImpl::GetAllFederationLocations() { +std::vector TFederatedReadSessionImpl::GetAllFederationDatabaseNames() { std::vector result; for (const auto& db : FederationState->DbInfos) { - result.push_back(db->location()); + result.push_back(db->name()); } return result; } @@ -210,7 +224,7 @@ TVector TFederatedReadSessionImpl::GetEvents(bool blo do { auto sub = SubSessions[SubsessionIndex]; for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) { - result.push_back(Federate(std::move(ev), sub.DbInfo)); + result.push_back(EventFederator->LocateFederate(std::move(ev), sub.DbInfo)); } SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size(); } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index f1630d396dbb..286060b1db8e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -9,6 +9,111 @@ namespace NYdb::NFederatedTopic { +class TEventFederator { +public: + auto LocateTopicOrigin(const NTopic::TReadSessionEvent::TEvent& event) { + std::shared_ptr topicOriginDbInfo; + TString topicOriginPath = ""; + + auto topicPath = std::visit([](auto&& arg) -> TStringBuf { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return ""; + } else { + return arg.GetPartitionSession()->GetTopicPath(); + } + }, event); + + if (topicPath.Contains("-mirrored-from-")) { + TStringBuf leftPart, rightPart; + auto res = topicPath.TryRSplit("-mirrored-from-", leftPart, rightPart); + Y_ABORT_UNLESS(res); + + // no additional validation required: TryGetDbInfo just returns nullptr for any bad input + topicOriginDbInfo = FederationState->TryGetDbInfo(TString(rightPart)); + if (topicOriginDbInfo) { + topicOriginPath = leftPart; + } + } + + return std::make_tuple(topicOriginDbInfo, topicOriginPath); + } + + template + auto LocateFederate(TEvent&& event, std::shared_ptr db) { + NTopic::TPartitionSession::TPtr psPtr; + TFederatedPartitionSession::TPtr fps; + + using T = std::decay_t; + if constexpr (std::is_same_v) { + return Federate(std::move(event), std::move(fps)); + } else if constexpr (std::is_same_v) { + psPtr = std::visit([](auto&& arg) -> NTopic::TPartitionSession::TPtr { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return nullptr; + } else { + return arg.GetPartitionSession(); + } + }, event); + + if (!psPtr) { // TSessionClosedEvent + return Federate(std::move(event), std::move(fps)); + } + } else { + psPtr = event.GetPartitionSession(); + } + + with_lock(Lock) { + if (!FederatedPartitionSessions.contains(psPtr.Get())) { + auto [topicOriginDbInfo, topicOriginPath] = LocateTopicOrigin(event); + FederatedPartitionSessions[psPtr.Get()] = MakeIntrusive(psPtr, std::move(db), std::move(topicOriginDbInfo), std::move(topicOriginPath)); + } + fps = FederatedPartitionSessions[psPtr.Get()]; + } + + return Federate(std::move(event), std::move(fps)); + } + + template + TReadSessionEvent::TFederated Federate(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) { + return {std::move(event), std::move(federatedPartitionSession)}; + } + + TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, + TFederatedPartitionSession::TPtr federatedPartitionSession) { + return {std::move(event), std::move(federatedPartitionSession)}; + } + + TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, + TFederatedPartitionSession::TPtr federatedPartitionSession) { + return std::visit([fps = std::move(federatedPartitionSession)](auto&& arg) { + using T = std::decay_t; + std::optional ev; + if constexpr (std::is_same_v) { + ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(fps)); + } else if constexpr (std::is_same_v) { + ev = std::move(arg); + } else { + ev = TReadSessionEvent::TFederated(std::move(arg), std::move(fps)); + } + return *ev; + }, + event); + } + + void SetFederationState(std::shared_ptr state) { + with_lock(Lock) { + FederationState = std::move(state); + } + } + +private: + TAdaptiveLock Lock; + std::unordered_map FederatedPartitionSessions; + std::shared_ptr FederationState; +}; + class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext { friend class TFederatedTopicClient::TImpl; friend class TFederatedReadSession; @@ -52,7 +157,7 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext>& dbInfos); - std::vector GetAllFederationLocations(); + std::vector GetAllFederationDatabaseNames(); bool IsDatabaseEligibleForRead(const std::shared_ptr& db); @@ -70,6 +175,7 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext Observer; NThreading::TFuture AsyncInit; std::shared_ptr FederationState; + std::shared_ptr EventFederator; TLog Log; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp index 240fa38a80d1..bc20dab22b80 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp @@ -1,40 +1,19 @@ #include #include -#include - namespace NYdb::NFederatedTopic { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Helpers -std::pair GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { - if (dataReceivedEvent.HasCompressedMessages()) { - const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; - return {msg.GetOffset(), msg.GetOffset() + 1}; - } - const auto& msg = dataReceivedEvent.GetMessages()[index]; - return {msg.GetOffset(), msg.GetOffset() + 1}; -} - -TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr db) { - return {std::move(event), std::move(db)}; -} - -TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr db) { - return std::visit([db = std::move(db)](auto&& arg) { - using T = std::decay_t; - std::optional ev; - if constexpr (std::is_same_v) { - ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(db)); - } else if constexpr (std::is_same_v) { - ev = std::move(arg); - } else { - ev = TReadSessionEvent::TFederated(std::move(arg), std::move(db)); - } - return *ev; - }, event); -} +// std::pair GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { +// if (dataReceivedEvent.HasCompressedMessages()) { +// const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; +// return {msg.GetOffset(), msg.GetOffset() + 1}; +// } +// const auto& msg = dataReceivedEvent.GetMessages()[index]; +// return {msg.GetOffset(), msg.GetOffset() + 1}; +// } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Printable specializations @@ -163,38 +142,38 @@ namespace NYdb::NFederatedTopic { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NFederatedTopic::TReadSessionEvent::TDataReceivedEvent -TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr db) +TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) : NTopic::TReadSessionEvent::TPartitionSessionAccessor(event.GetPartitionSession()) - , TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db) + , TFederatedPartitionSessionAccessor(federatedPartitionSession) { if (event.HasCompressedMessages()) { for (auto& msg : event.GetCompressedMessages()) { - CompressedMessages.emplace_back(std::move(msg), db); + CompressedMessages.emplace_back(std::move(msg), federatedPartitionSession); } } else { for (auto& msg : event.GetMessages()) { - Messages.emplace_back(std::move(msg), db); + Messages.emplace_back(std::move(msg), federatedPartitionSession); } } } -TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent( - TVector messages, TVector compressedMessages, - NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db) - : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession) - , TFederatedPartitionSessionAccessor(partitionSession, db) - , Messages(std::move(messages)) - , CompressedMessages(std::move(compressedMessages)) -{ - for (size_t i = 0; i < GetMessagesCount(); ++i) { - auto [from, to] = GetMessageOffsetRange(*this, i); - if (OffsetRanges.empty() || OffsetRanges.back().second != from) { - OffsetRanges.emplace_back(from, to); - } else { - OffsetRanges.back().second = to; - } - } -} +// TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent( +// TVector messages, TVector compressedMessages, +// NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db) +// : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession) +// , TFederatedPartitionSessionAccessor(partitionSession, db) +// , Messages(std::move(messages)) +// , CompressedMessages(std::move(compressedMessages)) +// { +// for (size_t i = 0; i < GetMessagesCount(); ++i) { +// auto [from, to] = GetMessageOffsetRange(*this, i); +// if (OffsetRanges.empty() || OffsetRanges.back().second != from) { +// OffsetRanges.emplace_back(from, to); +// } else { +// OffsetRanges.back().second = to; +// } +// } +// } void TReadSessionEvent::TDataReceivedEvent::Commit() { for (auto [from, to] : OffsetRanges) { diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h index 6fc2baa80d9b..bf4ad2a16378 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h @@ -35,11 +35,22 @@ struct TFederatedDbState { , ControlPlaneEndpoint(result.control_plane_endpoint()) , SelfLocation(result.self_location()) { - // TODO remove copy + // TODO ensure that all databases have unique names? for (const auto& db : result.federation_databases()) { DbInfos.push_back(std::make_shared(db)); } } + + std::shared_ptr TryGetDbInfo(const TString& name) const noexcept { + // There are few databases per federation usually, so the linear search is probably ok. + // TODO better profile this + for (const auto& dbInfo : DbInfos) { + if (AsciiEqualsIgnoreCase(dbInfo->name(), name)) { + return dbInfo; + } + } + return nullptr; + } }; From dca01cee510aa39268f2e0b7b3a910f57ff786b2 Mon Sep 17 00:00:00 2001 From: Ildar Khisambeev Date: Thu, 25 Jan 2024 13:22:24 +0000 Subject: [PATCH 6/6] more issues, add basic test --- .../ydb_federated_topic/federated_topic.h | 7 - .../impl/federated_read_session.cpp | 28 ++-- .../impl/federated_read_session.h | 4 + .../impl/federated_read_session_event.cpp | 34 ---- .../ydb_federated_topic/ut/basic_usage_ut.cpp | 158 ++++++++++++++++++ .../client/ydb_federated_topic/ut/fds_mock.h | 8 + 6 files changed, 186 insertions(+), 53 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index 59575527d996..534193d35bf0 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -109,10 +109,6 @@ struct TReadSessionEvent { : FederatedPartitionSession(std::move(partitionSession)) {} - // TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db) - // : FederatedPartitionSession(MakeIntrusive(partitionSession, std::move(db))) - // {} - inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const { return FederatedPartitionSession; } @@ -148,9 +144,6 @@ struct TReadSessionEvent { public: TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession); - // TDataReceivedEvent(TVector messages, TVector compressedMessages, - // NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db); - const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override { ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead"; } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index b662e46628b1..df0bf1c8d079 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -30,10 +30,20 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& SubsessionSettings.EventHandlers_.MaxMessagesBytes(settings.EventHandlers_.MaxMessagesBytes_); SubsessionSettings.EventHandlers_.HandlersExecutor(settings.EventHandlers_.HandlersExecutor_); -#define MAYBE_CONVERT_HANDLER(type, name) \ - SubsessionSettings.EventHandlers_.name( \ - WrapFederatedHandler(settings.FederatedEventHandlers_.name##_, db, federator) \ - ); + if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) { + SubsessionSettings.EventHandlers_.SimpleDataHandlers( + WrapFederatedHandler( + settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db, federator), + settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing, + settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit); + } + +#define MAYBE_CONVERT_HANDLER(type, name) \ + if (settings.FederatedEventHandlers_.name##_) { \ + SubsessionSettings.EventHandlers_.name( \ + WrapFederatedHandler(settings.FederatedEventHandlers_.name##_, db, federator) \ + ); \ + } MAYBE_CONVERT_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler); MAYBE_CONVERT_HANDLER(TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler); @@ -47,14 +57,6 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& SubsessionSettings.EventHandlers_.SessionClosedHandler(settings.FederatedEventHandlers_.SessionClosedHandler_); - if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) { - SubsessionSettings.EventHandlers_.SimpleDataHandlers( - WrapFederatedHandler( - settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db, federator), - settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing, - settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit); - } - return SubsessionSettings; } @@ -143,6 +145,8 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { } if (databases.empty()) { + // TODO: investigate here, why empty list? + // Reason (and returned status) could be BAD_REQUEST or UNAVAILABLE. LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "No available databases to read."); CloseImpl(); return; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index 286060b1db8e..84c208258e37 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -70,6 +70,10 @@ class TEventFederator { FederatedPartitionSessions[psPtr.Get()] = MakeIntrusive(psPtr, std::move(db), std::move(topicOriginDbInfo), std::move(topicOriginPath)); } fps = FederatedPartitionSessions[psPtr.Get()]; + + if constexpr (std::is_same_v) { + FederatedPartitionSessions.erase(psPtr.Get()); + } } return Federate(std::move(event), std::move(fps)); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp index bc20dab22b80..f88836c33f7c 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp @@ -1,25 +1,10 @@ #include #include -namespace NYdb::NFederatedTopic { - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Helpers - -// std::pair GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { -// if (dataReceivedEvent.HasCompressedMessages()) { -// const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; -// return {msg.GetOffset(), msg.GetOffset() + 1}; -// } -// const auto& msg = dataReceivedEvent.GetMessages()[index]; -// return {msg.GetOffset(), msg.GetOffset() + 1}; -// } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Printable specializations -} - namespace NYdb::NTopic { using namespace NFederatedTopic; @@ -134,7 +119,6 @@ void TPrintable::DebugString(TStringBuilder& ret, bool print ret << " }"; } - } namespace NYdb::NFederatedTopic { @@ -157,24 +141,6 @@ TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEv } } -// TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent( -// TVector messages, TVector compressedMessages, -// NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr db) -// : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession) -// , TFederatedPartitionSessionAccessor(partitionSession, db) -// , Messages(std::move(messages)) -// , CompressedMessages(std::move(compressedMessages)) -// { -// for (size_t i = 0; i < GetMessagesCount(); ++i) { -// auto [from, to] = GetMessageOffsetRange(*this, i); -// if (OffsetRanges.empty() || OffsetRanges.back().second != from) { -// OffsetRanges.emplace_back(from, to); -// } else { -// OffsetRanges.back().second = to; -// } -// } -// } - void TReadSessionEvent::TDataReceivedEvent::Commit() { for (auto [from, to] : OffsetRanges) { static_cast*>(PartitionSession.Get())->Commit(from, to); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index a2780101c8a6..ad884e1ce7e7 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -520,6 +520,164 @@ Y_UNIT_TEST_SUITE(BasicUsage) { AtomicSet(check, 0); } + Y_UNIT_TEST(ReadMirrored) { + auto setup = std::make_shared(TEST_CASE_NAME, false); + setup->Start(true, true); + setup->CreateTopic(setup->GetTestTopic() + "-mirrored-from-dc2", setup->GetLocalCluster()); + setup->CreateTopic(setup->GetTestTopic() + "-mirrored-from-dc3", setup->GetLocalCluster()); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + auto clientSettings = TFederatedTopicClientSettings() + .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( + TDuration::Seconds(10), + TDuration::Seconds(10) + )); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); + + ui64 count = 5u; + + TString messageBase = "message----"; + TVector sentMessages; + std::unordered_set sentSet; + + for (auto i = 0u; i < count; i++) { + sentMessages.emplace_back(messageBase * (10 * i + 1)); + sentSet.emplace(sentMessages.back() + "-from-dc1"); + sentSet.emplace(sentMessages.back() + "-from-dc2"); + sentSet.emplace(sentMessages.back() + "-from-dc3"); + } + + NThreading::TPromise checkedPromise = NThreading::NewPromise(); + auto totalReceived = 0u; + + auto f = checkedPromise.GetFuture(); + TAtomic check = 1; + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ReadMirrored("dc1") + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(16_MB) + .AppendTopics(setup->GetTestTopic()); + + readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable { + Cerr << ">>> event from dataHandler: " << DebugString(ev) << Endl; + Y_VERIFY_S(AtomicGet(check) != 0, "check is false"); + auto& messages = ev.GetMessages(); + Cerr << ">>> get " << messages.size() << " messages in this event" << Endl; + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + UNIT_ASSERT(message.GetFederatedPartitionSession()->GetReadSourceDatabaseName() == "dc1"); + UNIT_ASSERT(message.GetFederatedPartitionSession()->GetTopicPath() == setup->GetTestTopic()); + UNIT_ASSERT(message.GetData().EndsWith(message.GetFederatedPartitionSession()->GetTopicOriginDatabaseName())); + + UNIT_ASSERT(!sentSet.empty()); + UNIT_ASSERT_C(sentSet.erase(message.GetData()), "no such element is sentSet: " + message.GetData()); + totalReceived++; + } + if (totalReceived == 3 * sentMessages.size()) { + UNIT_ASSERT(sentSet.empty()); + checkedPromise.SetValue(); + } + }); + + ReadSession = topicClient.CreateReadSession(readSettings); + Cerr << ">>> Session was created" << Endl; + + Sleep(TDuration::MilliSeconds(50)); + + auto events = ReadSession->GetEvents(false); + UNIT_ASSERT(events.empty()); + + std::optional fdsRequest; + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + + fdsRequest->Result.SetValue(fdsMock.ComposeOkResult()); + + { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i] + "-from-dc1"); + UNIT_ASSERT(res); + } + + session->Close(); + + Cerr << ">>> Writes to test-topic successful" << Endl; + } + + { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic() + "-mirrored-from-dc2").MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i] + "-from-dc2"); + UNIT_ASSERT(res); + } + + session->Close(); + + Cerr << ">>> Writes to test-topic-mirrored-from-dc2 successful" << Endl; + } + + { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic() + "-mirrored-from-dc3").MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i] + "-from-dc3"); + UNIT_ASSERT(res); + } + + session->Close(); + + Cerr << ">>> Writes to test-topic-mirrored-from-dc3 successful" << Endl; + } + + f.GetValueSync(); + ReadSession->Close(); + AtomicSet(check, 0); + } + Y_UNIT_TEST(BasicWriteSession) { auto setup = std::make_shared( TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h index b0746d14b700..5e48d4c662d1 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h @@ -96,6 +96,14 @@ class TFederationDiscoveryServiceMock: public Ydb::FederationDiscovery::V1::Fede c2->set_location("dc2"); c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); c2->set_weight(500); + auto c3 = mockResult.add_federation_databases(); + c3->set_name("dc3"); + c3->set_path("/Root"); + c3->set_id("account-dc3"); + c3->set_endpoint("localhost:" + ToString(Port)); + c3->set_location("dc3"); + c3->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c3->set_weight(500); op->mutable_result()->PackFrom(mockResult);