From 76d2521a8f7865066ef3d1e8c1f0f96895f18b50 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Thu, 25 Jan 2024 14:43:51 +0000 Subject: [PATCH 1/2] Support table changes observer KIKIMR-20853 --- .../tablet_flat/CMakeLists.darwin-arm64.txt | 1 + .../tablet_flat/CMakeLists.darwin-x86_64.txt | 1 + .../tablet_flat/CMakeLists.linux-aarch64.txt | 1 + .../tablet_flat/CMakeLists.linux-x86_64.txt | 1 + .../tablet_flat/CMakeLists.windows-x86_64.txt | 1 + ydb/core/tablet_flat/flat_database.cpp | 5 ++ ydb/core/tablet_flat/flat_database.h | 3 + ydb/core/tablet_flat/flat_executor.cpp | 17 +++++- ydb/core/tablet_flat/flat_table.cpp | 8 +++ ydb/core/tablet_flat/flat_table.h | 4 +- ydb/core/tablet_flat/flat_table_observer.cpp | 1 + ydb/core/tablet_flat/flat_table_observer.h | 61 +++++++++++++++++++ ydb/core/tablet_flat/tablet_flat_executor.cpp | 8 +++ ydb/core/tablet_flat/tablet_flat_executor.h | 3 + ydb/core/tablet_flat/ya.make | 2 + 15 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 ydb/core/tablet_flat/flat_table_observer.cpp create mode 100644 ydb/core/tablet_flat/flat_table_observer.h diff --git a/ydb/core/tablet_flat/CMakeLists.darwin-arm64.txt b/ydb/core/tablet_flat/CMakeLists.darwin-arm64.txt index c6d0640e5ff9..1ec29730846e 100644 --- a/ydb/core/tablet_flat/CMakeLists.darwin-arm64.txt +++ b/ydb/core/tablet_flat/CMakeLists.darwin-arm64.txt @@ -124,6 +124,7 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp diff --git a/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt index c6d0640e5ff9..1ec29730846e 100644 --- a/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt @@ -124,6 +124,7 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp diff --git a/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt index 8dd469d19707..6a78f839ed8d 100644 --- a/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt @@ -125,6 +125,7 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp diff --git a/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt index 8dd469d19707..6a78f839ed8d 100644 --- a/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt @@ -125,6 +125,7 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp diff --git a/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt index c6d0640e5ff9..1ec29730846e 100644 --- a/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt @@ -124,6 +124,7 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index ab46be9de032..8eb780705d39 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -495,6 +495,11 @@ const TDbStats& TDatabase::Counters() const noexcept return DatabaseImpl->Stats; } +void TDatabase::SetTableObserver(ui32 table, ITableObserverPtr ptr) noexcept +{ + Require(table)->SetTableObserver(std::move(ptr)); +} + TDatabase::TChg TDatabase::Head(ui32 table) const noexcept { if (table == Max()) { diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 85f81af257f2..50e486e6320e 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -6,6 +6,7 @@ #include "flat_dbase_change.h" #include "flat_dbase_misc.h" #include "flat_iterator.h" +#include "flat_table_observer.h" #include "util_basics.h" namespace NKikimr { @@ -57,6 +58,8 @@ class TDatabase { TDatabase(TDatabaseImpl *databaseImpl = nullptr) noexcept; ~TDatabase(); + void SetTableObserver(ui32 table, ITableObserverPtr ptr) noexcept; + /* Returns durable monotonic change number for table or entire database on default (table = Max()). Serial is incremented for each successful Commit(). AHTUNG: Serial may go to the past in case of diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 9f559a8374c5..d023f995a8fe 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -876,6 +876,10 @@ void TExecutor::ApplyFollowerUpdate(THolder update) { if (update->IsSnapshot) // do nothing over snapshot after initial one return; + // Protect against recursive transactions in callbacks + Y_DEBUG_ABORT_UNLESS(!ActiveTransaction); + ActiveTransaction = true; + TString schemeUpdate; TString dataUpdate; TStackVec partSwitches; @@ -940,6 +944,11 @@ void TExecutor::ApplyFollowerUpdate(THolder update) { if (schemeUpdate) { ReadResourceProfile(); ReflectSchemeSettings(); + Owner->OnFollowerSchemaUpdated(); + } + + if (dataUpdate) { + Owner->OnFollowerDataUpdated(); } } @@ -1000,14 +1009,20 @@ void TExecutor::ApplyFollowerUpdate(THolder update) { } else if (update->NeedFollowerGcAck) { Send(Owner->Tablet(), new TEvTablet::TEvFGcAck(Owner->TabletID(), Generation(), Step0)); } + + ActiveTransaction = false; } void TExecutor::ApplyFollowerAuxUpdate(const TString &auxBody) { const TString aux = NPageCollection::TSlicer::Lz4()->Decode(auxBody); TProtoBox proto(aux); - if (proto.HasUserAuxUpdate()) + if (proto.HasUserAuxUpdate()) { + Y_DEBUG_ABORT_UNLESS(!ActiveTransaction); + ActiveTransaction = true; Owner->OnLeaderUserAuxUpdate(std::move(proto.GetUserAuxUpdate())); + ActiveTransaction = false; + } } void TExecutor::RequestFromSharedCache(TAutoPtr fetch, diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index e3feb7e3ea39..c44990da748e 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -830,6 +830,7 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef 0); } + + TableObserver.OnUpdateTx(rop, key, ops, txId); } void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) @@ -1338,6 +1341,11 @@ TCompactionStats TTable::GetCompactionStats() const return stats; } +void TTable::SetTableObserver(ITableObserverPtr ptr) noexcept +{ + TableObserver = std::move(ptr); +} + void TPartStats::Add(const TPartView& partView) { PartsCount += 1; diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 0b0cf7963933..8b4f68fae6cf 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -13,6 +13,7 @@ #include "flat_table_stats.h" #include "flat_table_subset.h" #include "flat_table_misc.h" +#include "flat_table_observer.h" #include "flat_sausage_solid.h" #include "util_basics.h" @@ -322,7 +323,7 @@ class TTable: public TAtomicRefCount { TCompactionStats GetCompactionStats() const; - void FillTxStatusCache(THashMap& cache) const noexcept; + void SetTableObserver(ITableObserverPtr ptr) noexcept; private: TMemTable& MemTable(); @@ -358,6 +359,7 @@ class TTable: public TAtomicRefCount { absl::flat_hash_set CheckTransactions; TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; + ITableObserverPtr TableObserver; private: struct TRollbackRemoveTxRef { diff --git a/ydb/core/tablet_flat/flat_table_observer.cpp b/ydb/core/tablet_flat/flat_table_observer.cpp new file mode 100644 index 000000000000..5adda17187de --- /dev/null +++ b/ydb/core/tablet_flat/flat_table_observer.cpp @@ -0,0 +1 @@ +#include "flat_table_observer.h" diff --git a/ydb/core/tablet_flat/flat_table_observer.h b/ydb/core/tablet_flat/flat_table_observer.h new file mode 100644 index 000000000000..72f8f3083c2d --- /dev/null +++ b/ydb/core/tablet_flat/flat_table_observer.h @@ -0,0 +1,61 @@ +#pragma once +#include "defs.h" +#include "flat_row_eggs.h" +#include "flat_update_op.h" + +#include + +namespace NKikimr::NTable { + + class ITableObserver : public TThrRefBase { + public: + /** + * Called when a new update is applied to the table + */ + virtual void OnUpdate( + ERowOp rop, + TArrayRef key, + TArrayRef ops, + TRowVersion rowVersion) = 0; + + /** + * Called when an uncommitted update is applied to the table + */ + virtual void OnUpdateTx( + ERowOp rop, + TArrayRef key, + TArrayRef ops, + ui64 txId) = 0; + }; + + /** + * Smart pointer that can safely be used to call methods even when it's nullptr + */ + class ITableObserverPtr : public TIntrusivePtr { + public: + using TIntrusivePtr::TIntrusivePtr; + + void OnUpdate( + ERowOp rop, + TArrayRef key, + TArrayRef ops, + TRowVersion rowVersion) + { + if (ITableObserver* p = Get()) { + p->OnUpdate(rop, key, ops, rowVersion); + } + } + + void OnUpdateTx( + ERowOp rop, + TArrayRef key, + TArrayRef ops, + ui64 txId) + { + if (ITableObserver* p = Get()) { + p->OnUpdateTx(rop, key, ops, txId); + } + } + }; + +} // namespace NKikimr::NTable diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp index 6b1ccd28e4f2..d075515a1e3c 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp @@ -68,6 +68,14 @@ namespace NFlatExecutorSetup { void ITablet::OnFollowersCountChanged() { // nothing by default } + + void ITablet::OnFollowerSchemaUpdated() { + // nothing by default + } + + void ITablet::OnFollowerDataUpdated() { + // nothing by default + } } }} diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 33ff77f951d6..edd006ed266f 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -505,6 +505,9 @@ namespace NFlatExecutorSetup { virtual void OnFollowersCountChanged(); + virtual void OnFollowerSchemaUpdated(); + virtual void OnFollowerDataUpdated(); + // create transaction? protected: ITablet(TTabletStorageInfo *info, const TActorId &tablet) diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make index 8a6604935ebf..7c97ebd5045b 100644 --- a/ydb/core/tablet_flat/ya.make +++ b/ydb/core/tablet_flat/ya.make @@ -62,6 +62,8 @@ SRCS( flat_table_part.cpp flat_table_part.h flat_table_misc.cpp + flat_table_observer.cpp + flat_table_observer.h flat_update_op.h probes.cpp shared_handle.cpp From 4f66a2354c482772b2ab826b9633310dd96e9e96 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Thu, 25 Jan 2024 15:48:31 +0000 Subject: [PATCH 2/2] Fix code review suggestions --- ydb/core/tablet_flat/flat_database.cpp | 2 +- ydb/core/tablet_flat/flat_database.h | 2 +- ydb/core/tablet_flat/flat_executor.cpp | 49 +++++++++++++++------- ydb/core/tablet_flat/flat_executor.h | 2 + ydb/core/tablet_flat/flat_table.cpp | 10 +++-- ydb/core/tablet_flat/flat_table.h | 4 +- ydb/core/tablet_flat/flat_table_observer.h | 30 ------------- 7 files changed, 47 insertions(+), 52 deletions(-) diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 8eb780705d39..5a47f8c31d14 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -495,7 +495,7 @@ const TDbStats& TDatabase::Counters() const noexcept return DatabaseImpl->Stats; } -void TDatabase::SetTableObserver(ui32 table, ITableObserverPtr ptr) noexcept +void TDatabase::SetTableObserver(ui32 table, TIntrusivePtr ptr) noexcept { Require(table)->SetTableObserver(std::move(ptr)); } diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 50e486e6320e..e35f58fbd326 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -58,7 +58,7 @@ class TDatabase { TDatabase(TDatabaseImpl *databaseImpl = nullptr) noexcept; ~TDatabase(); - void SetTableObserver(ui32 table, ITableObserverPtr ptr) noexcept; + void SetTableObserver(ui32 table, TIntrusivePtr ptr) noexcept; /* Returns durable monotonic change number for table or entire database on default (table = Max()). Serial is incremented for each diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index d023f995a8fe..13cdb2d535e2 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -97,6 +97,32 @@ TTableSnapshotContext::~TTableSnapshotContext() = default; using namespace NResourceBroker; +class TExecutor::TActiveTransactionZone { +public: + explicit TActiveTransactionZone(TExecutor* self) noexcept + : Self(self) + { + Y_DEBUG_ABORT_UNLESS(!Self->ActiveTransaction); + Self->ActiveTransaction = true; + Active = true; + } + + ~TActiveTransactionZone() noexcept { + Done(); + } + + void Done() noexcept { + if (Active) { + Self->ActiveTransaction = false; + Active = false; + } + } + +private: + TExecutor* Self; + bool Active = false; +}; + TExecutor::TExecutor( NFlatExecutorSetup::ITablet* owner, const TActorId& ownerActorId) @@ -877,8 +903,7 @@ void TExecutor::ApplyFollowerUpdate(THolder update) { return; // Protect against recursive transactions in callbacks - Y_DEBUG_ABORT_UNLESS(!ActiveTransaction); - ActiveTransaction = true; + TActiveTransactionZone activeTransaction(this); TString schemeUpdate; TString dataUpdate; @@ -1009,8 +1034,6 @@ void TExecutor::ApplyFollowerUpdate(THolder update) { } else if (update->NeedFollowerGcAck) { Send(Owner->Tablet(), new TEvTablet::TEvFGcAck(Owner->TabletID(), Generation(), Step0)); } - - ActiveTransaction = false; } void TExecutor::ApplyFollowerAuxUpdate(const TString &auxBody) { @@ -1018,10 +1041,8 @@ void TExecutor::ApplyFollowerAuxUpdate(const TString &auxBody) { TProtoBox proto(aux); if (proto.HasUserAuxUpdate()) { - Y_DEBUG_ABORT_UNLESS(!ActiveTransaction); - ActiveTransaction = true; + TActiveTransactionZone activeTransaction(this); Owner->OnLeaderUserAuxUpdate(std::move(proto.GetUserAuxUpdate())); - ActiveTransaction = false; } } @@ -1659,9 +1680,7 @@ void TExecutor::Enqueue(TAutoPtr self, const TActorContext &ctx) { } void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ctx) { - Y_DEBUG_ABORT_UNLESS(!ActiveTransaction); - - ActiveTransaction = true; + TActiveTransactionZone activeTransaction(this); ++seat->Retries; THPTimer cpuTimer; @@ -1751,7 +1770,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ct } PrivatePageCache->ResetTouchesAndToLoad(false); - ActiveTransaction = false; + activeTransaction.Done(); PlanTransactionActivation(); } @@ -2828,7 +2847,7 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext Y_ABORT_UNLESS(msg->Generation == Generation()); const ui32 step = msg->Step; - ActiveTransaction = true; + TActiveTransactionZone activeTransaction(this); GcLogic->OnCommitLog(step, msg->ConfirmedOnSend, ctx); CommitManager->Confirm(step); @@ -2922,7 +2941,7 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext std::move(msg->GroupWrittenOps), ctx); - ActiveTransaction = false; + activeTransaction.Done(); PlanTransactionActivation(); MaybeRelaxRejectProbability(); @@ -3251,7 +3270,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) return Broken(); } - ActiveTransaction = true; + TActiveTransactionZone activeTransaction(this); const ui64 snapStamp = msg->Params->Edge.TxStamp ? msg->Params->Edge.TxStamp : MakeGenStepPair(Generation(), msg->Step); @@ -3484,7 +3503,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) Owner->CompactionComplete(tableId, OwnerCtx()); MaybeRelaxRejectProbability(); - ActiveTransaction = false; + activeTransaction.Done(); if (LogicSnap->MayFlush(false)) { MakeLogSnapshot(); diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 594ccb0dbecd..a8feffc9ddad 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -413,6 +413,8 @@ class TExecutor THashMap> ScanSnapshots; ui64 ScanSnapshotId = 1; + class TActiveTransactionZone; + bool ActiveTransaction = false; bool BrokenTransaction = false; ui32 ActivateTransactionWaiting = 0; diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index c44990da748e..fb5f8d46ba7b 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -830,7 +830,9 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRefOnUpdate(rop, key, ops, rowVersion); + } } void TTable::AddTxRef(ui64 txId) @@ -865,7 +867,9 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef 0); } - TableObserver.OnUpdateTx(rop, key, ops, txId); + if (TableObserver) { + TableObserver->OnUpdateTx(rop, key, ops, txId); + } } void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) @@ -1341,7 +1345,7 @@ TCompactionStats TTable::GetCompactionStats() const return stats; } -void TTable::SetTableObserver(ITableObserverPtr ptr) noexcept +void TTable::SetTableObserver(TIntrusivePtr ptr) noexcept { TableObserver = std::move(ptr); } diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 8b4f68fae6cf..cd718f4be509 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -323,7 +323,7 @@ class TTable: public TAtomicRefCount { TCompactionStats GetCompactionStats() const; - void SetTableObserver(ITableObserverPtr ptr) noexcept; + void SetTableObserver(TIntrusivePtr ptr) noexcept; private: TMemTable& MemTable(); @@ -359,7 +359,7 @@ class TTable: public TAtomicRefCount { absl::flat_hash_set CheckTransactions; TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; - ITableObserverPtr TableObserver; + TIntrusivePtr TableObserver; private: struct TRollbackRemoveTxRef { diff --git a/ydb/core/tablet_flat/flat_table_observer.h b/ydb/core/tablet_flat/flat_table_observer.h index 72f8f3083c2d..374a0a0152d3 100644 --- a/ydb/core/tablet_flat/flat_table_observer.h +++ b/ydb/core/tablet_flat/flat_table_observer.h @@ -28,34 +28,4 @@ namespace NKikimr::NTable { ui64 txId) = 0; }; - /** - * Smart pointer that can safely be used to call methods even when it's nullptr - */ - class ITableObserverPtr : public TIntrusivePtr { - public: - using TIntrusivePtr::TIntrusivePtr; - - void OnUpdate( - ERowOp rop, - TArrayRef key, - TArrayRef ops, - TRowVersion rowVersion) - { - if (ITableObserver* p = Get()) { - p->OnUpdate(rop, key, ops, rowVersion); - } - } - - void OnUpdateTx( - ERowOp rop, - TArrayRef key, - TArrayRef ops, - ui64 txId) - { - if (ITableObserver* p = Get()) { - p->OnUpdateTx(rop, key, ops, txId); - } - } - }; - } // namespace NKikimr::NTable