From 5c5c2ae7593a29ddd99350394c6027a5206d0b17 Mon Sep 17 00:00:00 2001 From: Laurynas Biveinis Date: Tue, 17 Dec 2024 17:17:51 +0200 Subject: [PATCH] Introduce rdb_snapshot_unique_ptr & make rdb_get_rocksdb_db return reference This is a second batch of cleanups from the repeatable read snapshot work. - Make rdb_get_rocksdb_db return reference instead of pointer, update callers and callees. Inline this function in ha_rocksdb_proto.h, move rdb declaration there to support inlining in detail namespace as documentation that it should not be accessed directly. Update ha_rocksdb.cc itself to use rdb_get_rocksdb_db instead of rdb. Remove many redudant rdb != nullptr checks and asserts. - Introduce rdb_snapshot_unique_ptr with a custom deleter and a factory function get_rdb_snapshot to manager global RocksDB snapshots - Make Rdb_explicit_snapshot create the snapshot itself, by calling get_rdb_snapshot, instead of receiving one through parameters, simplify its signature. - Replace rocksdb::ManagedSnapshot uses with rdb_snapshot_unique_ptr ones. - Remove rarely-used explicit transaction snapshot assignment code from acquire_snapshot and move it to the few callers where it may happen. Add new method Rdb_transaction::has_explicit_or_read_only_snapshot to support this. - For Rdb_transaction, make m_insert_count, m_update_count, m_delete_count, m_auto_incr_map, & m_rollback_only fields private instead of protected. Move their reset at the end of committed or rolledback transactions to on_finish method in the base class. Remove redundant m_writes_at_last_savepoint reset in set_initial_savepoint. - Add several asserts. --- storage/rocksdb/clone/donor.cc | 14 +- storage/rocksdb/ha_rocksdb.cc | 496 +++++++++--------- storage/rocksdb/ha_rocksdb.h | 3 +- storage/rocksdb/ha_rocksdb_proto.h | 33 +- storage/rocksdb/rdb_bulk_load.cc | 2 +- storage/rocksdb/rdb_bulk_load.h | 10 +- storage/rocksdb/rdb_cf_manager.cc | 16 +- storage/rocksdb/rdb_cf_manager.h | 10 +- storage/rocksdb/rdb_datadic.cc | 21 +- storage/rocksdb/rdb_datadic.h | 16 +- storage/rocksdb/rdb_i_s.cc | 113 +--- storage/rocksdb/rdb_iterator.cc | 9 +- storage/rocksdb/rdb_iterator.h | 2 +- storage/rocksdb/rdb_sst_info.cc | 18 +- storage/rocksdb/rdb_sst_info.h | 12 +- storage/rocksdb/rdb_sst_partitioner_factory.h | 14 +- 16 files changed, 370 insertions(+), 419 deletions(-) diff --git a/storage/rocksdb/clone/donor.cc b/storage/rocksdb/clone/donor.cc index c701ff0e7e3d..dcccc862929b 100644 --- a/storage/rocksdb/clone/donor.cc +++ b/storage/rocksdb/clone/donor.cc @@ -614,8 +614,8 @@ donor::donor(const myrocks::clone::locator &l, const uchar *&loc, donor::~donor() { if (m_rdb_file_deletes_disabled) { - auto *const rdb = myrocks::rdb_get_rocksdb_db(); - const auto result = rdb->EnableFileDeletions(); + auto &rdb = myrocks::rdb_get_rocksdb_db(); + const auto result = rdb.EnableFileDeletions(); if (!result.ok()) { myrocks::rdb_log_status_error(result, "RocksDB file deletion re-enable failed"); @@ -700,9 +700,9 @@ int donor::next_checkpoint_locked(bool final, std::size_t &total_new_size) { auto err = m_checkpoint.cleanup(); if (err != 0) return save_and_return_error(err, "RocksDB checkpoint error"); - auto *const rdb = final ? myrocks::rdb_get_rocksdb_db() : nullptr; - if (rdb != nullptr) { - const auto dfd_result = rdb->DisableFileDeletions(); + if (final) { + const auto dfd_result = + myrocks::rdb_get_rocksdb_db().DisableFileDeletions(); m_rdb_file_deletes_disabled = dfd_result.ok(); if (!m_rdb_file_deletes_disabled) { myrocks::rdb_log_status_error(dfd_result, @@ -714,7 +714,7 @@ int donor::next_checkpoint_locked(bool final, std::size_t &total_new_size) { err = m_checkpoint.init(); if (err != 0) { - if (rdb) rdb->EnableFileDeletions(); + if (final) myrocks::rdb_get_rocksdb_db().EnableFileDeletions(); return save_and_return_error(err, "RocksDB checkpoint error"); } @@ -726,7 +726,7 @@ int donor::next_checkpoint_locked(bool final, std::size_t &total_new_size) { if (err != 0) { // Ignore the return value because we are already returning an error (void)m_checkpoint.cleanup(); - if (rdb) rdb->EnableFileDeletions(); + if (final) myrocks::rdb_get_rocksdb_db().EnableFileDeletions(); return err; } diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index da3d824966f0..f53eccc4131a 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -211,7 +211,12 @@ static bool rocksdb_use_default_sk_cf = false; /////////////////////////////////////////////////////////// handlerton *rocksdb_hton; -static rocksdb::TransactionDB *rdb = nullptr; +namespace detail { + +rocksdb::TransactionDB *rdb; + +} // namespace detail + static rocksdb::HistogramImpl *commit_latency_stats = nullptr; static std::shared_ptr rocksdb_stats; @@ -252,7 +257,7 @@ static void rocksdb_flush_all_memtables() { // RocksDB will fail the flush if the CF is deleted, // but here we don't handle return status for (const auto &cf_handle : cf_manager.get_all_cf()) { - rdb->Flush(rocksdb::FlushOptions(), cf_handle.get()); + rdb_get_rocksdb_db().Flush(rocksdb::FlushOptions(), cf_handle.get()); } } @@ -351,7 +356,7 @@ int rocksdb_create_checkpoint(std::string_view checkpoint_dir_raw) { "creating checkpoint in directory: %s\n", checkpoint_dir.c_str()); rocksdb::Checkpoint *checkpoint; - auto status = rocksdb::Checkpoint::Create(rdb, &checkpoint); + auto status = rocksdb::Checkpoint::Create(&rdb_get_rocksdb_db(), &checkpoint); if (status.ok()) { status = checkpoint->CreateCheckpoint(checkpoint_dir.c_str()); delete checkpoint; @@ -449,7 +454,7 @@ static int rocksdb_compact_lzero() { for (const auto &cf_handle : cf_manager.get_all_cf()) { for (i = 0; i < max_attempts; i++) { - rdb->GetColumnFamilyMetaData(cf_handle.get(), &metadata); + rdb_get_rocksdb_db().GetColumnFamilyMetaData(cf_handle.get(), &metadata); cf_handle->GetDescriptor(&cf_descr); c_options.output_file_size_limit = cf_descr.options.target_file_size_base; @@ -457,9 +462,9 @@ static int rocksdb_compact_lzero() { c_options.compression = rocksdb::kDisableCompressionOption; uint64_t base_level; - if (!rdb->GetIntProperty(cf_handle.get(), - rocksdb::DB::Properties::kBaseLevel, - &base_level)) { + if (!rdb_get_rocksdb_db().GetIntProperty( + cf_handle.get(), rocksdb::DB::Properties::kBaseLevel, + &base_level)) { LogPluginErrMsg(ERROR_LEVEL, ER_LOG_PRINTF_MSG, "MyRocks: compact L0 cannot get base level"); break; @@ -484,8 +489,8 @@ static int rocksdb_compact_lzero() { break; } - rocksdb::Status s; - s = rdb->CompactFiles(c_options, cf_handle.get(), file_names, base_level); + const auto s = rdb_get_rocksdb_db().CompactFiles( + c_options, cf_handle.get(), file_names, base_level); if (!s.ok()) { std::shared_ptr cfh = @@ -574,11 +579,11 @@ static int rocksdb_cancel_manual_compactions( // NO_LINT_DEBUG LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "RocksDB: Stopping all Manual Compactions."); - rdb->GetBaseDB()->DisableManualCompaction(); + rdb_get_rocksdb_db().GetBaseDB()->DisableManualCompaction(); // NO_LINT_DEBUG LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "RocksDB: Enabling Manual Compactions."); - rdb->GetBaseDB()->EnableManualCompaction(); + rdb_get_rocksdb_db().GetBaseDB()->EnableManualCompaction(); return HA_EXIT_SUCCESS; } @@ -598,9 +603,9 @@ static void rocksdb_set_pause_background_work( const bool pause_requested = *static_cast(save); if (rocksdb_pause_background_work != pause_requested) { if (pause_requested) { - rdb->PauseBackgroundWork(); + rdb_get_rocksdb_db().PauseBackgroundWork(); } else { - rdb->ContinueBackgroundWork(); + rdb_get_rocksdb_db().ContinueBackgroundWork(); } rocksdb_pause_background_work = pause_requested; } @@ -962,7 +967,7 @@ static int rocksdb_tracing(THD *const thd MY_ATTRIBUTE((__unused__)), int len = 0; const char *const trace_opt_str_raw = value->val_str(value, nullptr, &len); rocksdb::Status s; - if (trace_opt_str_raw == nullptr || rdb == nullptr) { + if (trace_opt_str_raw == nullptr) { return HA_EXIT_FAILURE; } int rc __attribute__((__unused__)); @@ -972,7 +977,8 @@ static int rocksdb_tracing(THD *const thd MY_ATTRIBUTE((__unused__)), // NO_LINT_DEBUG LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "RocksDB: Stop tracing block cache accesses or queries.\n"); - s = trace_block_cache_access ? rdb->EndBlockCacheTrace() : rdb->EndTrace(); + s = trace_block_cache_access ? rdb_get_rocksdb_db().EndBlockCacheTrace() + : rdb_get_rocksdb_db().EndTrace(); if (!s.ok()) { rc = ha_rocksdb::rdb_error_to_mysql(s); @@ -1020,7 +1026,7 @@ static int rocksdb_tracing(THD *const thd MY_ATTRIBUTE((__unused__)), return HA_EXIT_FAILURE; } const std::string trace_dir = std::string(rocksdb_datadir) + trace_folder; - s = rdb->GetEnv()->CreateDirIfMissing(trace_dir); + s = rdb_get_rocksdb_db().GetEnv()->CreateDirIfMissing(trace_dir); if (!s.ok()) { // NO_LINT_DEBUG LogPluginErrMsg( @@ -1031,7 +1037,7 @@ static int rocksdb_tracing(THD *const thd MY_ATTRIBUTE((__unused__)), return HA_EXIT_FAILURE; } const auto trace_file_path = rdb_concat_paths(trace_dir, trace_file_name); - s = rdb->GetEnv()->FileExists(trace_file_path); + s = rdb_get_rocksdb_db().GetEnv()->FileExists(trace_file_path); if (s.ok() || !s.IsNotFound()) { // NO_LINT_DEBUG LogPluginErrMsg( @@ -1044,17 +1050,18 @@ static int rocksdb_tracing(THD *const thd MY_ATTRIBUTE((__unused__)), return HA_EXIT_FAILURE; } std::unique_ptr trace_writer; - const rocksdb::EnvOptions env_option(rdb->GetDBOptions()); - s = rocksdb::NewFileTraceWriter(rdb->GetEnv(), env_option, trace_file_path, - &trace_writer); + const rocksdb::EnvOptions env_option{rdb_get_rocksdb_db().GetDBOptions()}; + s = rocksdb::NewFileTraceWriter(rdb_get_rocksdb_db().GetEnv(), env_option, + trace_file_path, &trace_writer); if (!s.ok()) { rc = ha_rocksdb::rdb_error_to_mysql(s); return HA_EXIT_FAILURE; } if (trace_block_cache_access) { - s = rdb->StartBlockCacheTrace(trace_opt, std::move(trace_writer)); + s = rdb_get_rocksdb_db().StartBlockCacheTrace(trace_opt, + std::move(trace_writer)); } else { - s = rdb->StartTrace(trace_opt, std::move(trace_writer)); + s = rdb_get_rocksdb_db().StartTrace(trace_opt, std::move(trace_writer)); } if (!s.ok()) { rc = ha_rocksdb::rdb_error_to_mysql(s); @@ -1227,13 +1234,12 @@ static void rocksdb_set_reset_stats( my_core::SYS_VAR *const var MY_ATTRIBUTE((__unused__)), void *const var_ptr, const void *const save) { assert(save != nullptr); - assert(rdb != nullptr); assert(rocksdb_stats != nullptr); *static_cast(var_ptr) = *static_cast(save); if (rocksdb_reset_stats) { - rocksdb::Status s = rdb->ResetStats(); + auto s = rdb_get_rocksdb_db().ResetStats(); // RocksDB will always return success. Let's document this assumption here // as well so that we'll get immediately notified when contract changes. @@ -1272,7 +1278,6 @@ static void rocksdb_set_io_write_timeout( my_core::SYS_VAR *const var MY_ATTRIBUTE((__unused__)), void *const var_ptr MY_ATTRIBUTE((__unused__)), const void *const save) { assert(save != nullptr); - assert(rdb != nullptr); const auto new_val = *static_cast(save); rocksdb_io_write_timeout_secs = new_val; @@ -1567,8 +1572,10 @@ static void rocksdb_set_max_bottom_pri_background_compactions_internal( // This creates background threads in rocksdb with BOTTOM priority pool. // Compactions for bottommost level use threads in the BOTTOM pool, and // the threads in the BOTTOM pool run with lower OS priority (19 in Linux). - rdb->GetEnv()->SetBackgroundThreads(val, rocksdb::Env::Priority::BOTTOM); - rdb->GetEnv()->LowerThreadPoolCPUPriority(rocksdb::Env::Priority::BOTTOM); + rdb_get_rocksdb_db().GetEnv()->SetBackgroundThreads( + val, rocksdb::Env::Priority::BOTTOM); + rdb_get_rocksdb_db().GetEnv()->LowerThreadPoolCPUPriority( + rocksdb::Env::Priority::BOTTOM); LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "Set %d compaction thread(s) with " "lower scheduling priority.", @@ -3238,16 +3245,11 @@ static struct SYS_VAR *rocksdb_system_variables[] = { class Rdb_explicit_snapshot : public explicit_snapshot { public: [[nodiscard]] static std::shared_ptr create( - THD *thd, snapshot_info_st &ssinfo, rocksdb::DB *db, - const rocksdb::Snapshot *snapshot) { - auto s = std::unique_ptr( - new rocksdb::ManagedSnapshot(db, snapshot)); - if (!s) { - return nullptr; - } + THD &thd, snapshot_info_st &ssinfo) { + auto s = get_rdb_snapshot(); const uint64_t client_provided_read_filtering_ts = rdb_is_binlog_ttl_enabled() - ? THDVAR(thd, consistent_snapshot_ttl_read_filtering_ts_nsec) / + ? THDVAR(&thd, consistent_snapshot_ttl_read_filtering_ts_nsec) / 1000000000UL : 0; std::lock_guard lock(explicit_snapshot_mutex); @@ -3296,12 +3298,12 @@ class Rdb_explicit_snapshot : public explicit_snapshot { return elem->second.lock(); } - [[nodiscard]] rocksdb::ManagedSnapshot *get_snapshot() noexcept { + [[nodiscard]] const rocksdb::Snapshot *get_snapshot() const noexcept { return snapshot.get(); } Rdb_explicit_snapshot(snapshot_info_st ssinfo, - std::unique_ptr &&snapshot) + rdb_snapshot_unique_ptr &&snapshot) : explicit_snapshot(ssinfo), snapshot(std::move(snapshot)) {} virtual ~Rdb_explicit_snapshot() { @@ -3310,7 +3312,7 @@ class Rdb_explicit_snapshot : public explicit_snapshot { } private: - std::unique_ptr snapshot; + rdb_snapshot_unique_ptr snapshot; static std::mutex explicit_snapshot_mutex; static ulonglong explicit_snapshot_counter; @@ -3355,7 +3357,7 @@ static int rocksdb_compact_column_family( if (cf_name.empty()) cf_name = DEFAULT_CF_NAME; auto cfh = cf_manager.get_cf(cf_name); - if (cfh != nullptr && rdb != nullptr) { + if (cfh != nullptr) { rocksdb::BottommostLevelCompaction bottommost_level_compaction = (rocksdb::BottommostLevelCompaction)THDVAR( thd, manual_compaction_bottommost_level); @@ -3649,15 +3651,16 @@ class Rdb_transaction { Rdb_transaction *next{nullptr}; Rdb_transaction *prev{nullptr}; - protected: - ulonglong m_write_count[2] = {0, 0}; ulonglong m_insert_count = 0; ulonglong m_update_count = 0; ulonglong m_delete_count = 0; - // per row data - ulonglong m_row_lock_count = 0; + std::unordered_map m_auto_incr_map; + protected: + ulonglong m_write_count[2] = {0, 0}; + // per row data + ulonglong m_row_lock_count = 0; bool m_is_two_phase = false; private: @@ -3681,6 +3684,8 @@ class Rdb_transaction { Rdb_transaction_list::mutex. */ std::int64_t m_earliest_snapshot_ts = 0; + bool m_rollback_only = false; + protected: THD *m_thd = nullptr; @@ -3691,12 +3696,10 @@ class Rdb_transaction { /* Maximum number of locks the transaction can have */ ulonglong m_max_row_locks; - bool m_rollback_only = false; - enum class snapshot_type { NONE, - // A snapshot created through Tranaction API for regular transactions and by - // DB::GetSnapshot() for WB ones + // A snapshot created through Transaction API for regular transactions and + // by DB::GetSnapshot() for WB ones CURRENT, CURRENT_DELAYED, // This is used by transactions started with "START TRANSACTION WITH @@ -3728,7 +3731,7 @@ class Rdb_transaction { case snapshot_type::EXPLICIT: assert(m_explicit_snapshot != nullptr); assert(m_read_opts[USER_TABLE].snapshot == - m_explicit_snapshot->get_snapshot()->snapshot()); + m_explicit_snapshot->get_snapshot()); break; } #endif @@ -4130,8 +4133,7 @@ class Rdb_transaction { assert(m_read_opts[USER_TABLE].snapshot == nullptr); break; case snapshot_type::EXPLICIT: - assert(snapshot == - m_explicit_snapshot->get_snapshot()->snapshot()); + assert(snapshot == m_explicit_snapshot->get_snapshot()); break; case snapshot_type::NONE: assert(false); @@ -4170,12 +4172,13 @@ class Rdb_transaction { } void share_explicit_snapshot( - std::shared_ptr snapshot) noexcept { + std::shared_ptr &&snapshot) noexcept { + assert(statement_snapshot_type == snapshot_type::NONE); assert_snapshot_invariants(); m_explicit_snapshot = std::move(snapshot); statement_snapshot_type = snapshot_type::EXPLICIT; - auto *const rdb_snapshot = m_explicit_snapshot->get_snapshot()->snapshot(); + auto *const rdb_snapshot = m_explicit_snapshot->get_snapshot(); assign_snapshot(rdb_snapshot); assert_snapshot_invariants(); @@ -4185,11 +4188,9 @@ class Rdb_transaction { assert(statement_snapshot_type == snapshot_type::NONE); assert_snapshot_invariants(); - auto *const rdb_snapshot = rdb->GetSnapshot(); - m_explicit_snapshot = - Rdb_explicit_snapshot::create(m_thd, ss_info, rdb, rdb_snapshot); + m_explicit_snapshot = Rdb_explicit_snapshot::create(*m_thd, ss_info); statement_snapshot_type = snapshot_type::EXPLICIT; - assign_snapshot(rdb_snapshot); + assign_snapshot(m_explicit_snapshot->get_snapshot()); assert_snapshot_invariants(); } @@ -4199,6 +4200,12 @@ class Rdb_transaction { return statement_snapshot_type == snapshot_type::EXPLICIT; } + [[nodiscard]] bool has_explicit_or_read_only_snapshot() const noexcept { + assert_snapshot_invariants(); + return statement_snapshot_type == snapshot_type::EXPLICIT || + statement_snapshot_type == snapshot_type::READ_ONLY_TRX; + } + [[nodiscard]] snapshot_info_st clone_explicit_snapshot_info() const noexcept { assert(has_explicit_snapshot()); return m_explicit_snapshot->ss_info; @@ -4253,7 +4260,7 @@ class Rdb_transaction { auto ctx = get_bulk_load_ctx(); auto &bulk_load_index_registry = ctx->bulk_load_index_registry(); - rocksdb::Status s = rdb->IngestExternalFiles(args); + auto s = rdb_get_rocksdb_db().IngestExternalFiles(args); if (!s.ok() && bulk_load_index_registry.index_registered_in_sst_partitioner()) { // NO_LINT_DEBUG @@ -4262,7 +4269,7 @@ class Rdb_transaction { "status code = %d, status = %s", s.code(), s.ToString().c_str()); s = bulk_load_index_registry.compact_index_ranges( - rdb, getCompactRangeOptions()); + rdb_get_rocksdb_db(), getCompactRangeOptions()); if (!s.ok()) { // NO_LINT_DEBUG LogPluginErrMsg(WARNING_LEVEL, ER_LOG_PRINTF_MSG, @@ -4272,7 +4279,7 @@ class Rdb_transaction { return s; } // try again after compaction - s = rdb->IngestExternalFiles(args); + s = rdb_get_rocksdb_db().IngestExternalFiles(args); } return s; } @@ -4443,9 +4450,9 @@ class Rdb_transaction { full_name.c_str(), index_name.c_str()); } - auto sst_info = std::make_unique( - rdb, rdb_merge.get_table_name(), index_name, rdb_merge.get_cf(), - *rocksdb_db_options, trace_sst_api, + const auto sst_info = std::make_unique( + rdb_get_rocksdb_db(), rdb_merge.get_table_name(), index_name, + rdb_merge.get_cf(), *rocksdb_db_options, trace_sst_api, THDVAR(get_thd(), bulk_load_compression_parallel_threads)); const auto enable_unique_key_check = @@ -4801,7 +4808,7 @@ class Rdb_transaction { rocksdb::ColumnFamilyHandle &cf, const Rdb_key_def &kd) { assert(!is_ac_nl_ro_rc_transaction()); return get_bulk_load_ctx()->bulk_load_index_registry().add_index( - rdb, cf, kd.get_index_number()); + rdb_get_rocksdb_db(), cf, kd.get_index_number()); } [[nodiscard]] int start_bulk_load(ha_rocksdb *const bulk_load, @@ -4848,8 +4855,8 @@ class Rdb_transaction { } *sst_info = ctx->add_sst_info( - rdb, table_handler->m_table_name, kd, *rocksdb_db_options, - trace_sst_api, + rdb_get_rocksdb_db(), table_handler->m_table_name, kd, + *rocksdb_db_options, trace_sst_api, THDVAR(get_thd(), bulk_load_compression_parallel_threads)); return HA_EXIT_SUCCESS; @@ -5031,6 +5038,16 @@ class Rdb_transaction { void on_finish() noexcept { assert(statement_snapshot_type == snapshot_type::NONE); assert_snapshot_invariants(); + assert(m_auto_incr_map.empty()); + + m_write_count[USER_TABLE] = 0; + m_write_count[INTRINSIC_TMP] = 0; + m_insert_count = 0; + m_update_count = 0; + m_delete_count = 0; + m_row_lock_count = 0; + m_rollback_only = false; + m_writes_at_last_savepoint = 0; modified_tables.clear(); @@ -5052,7 +5069,10 @@ class Rdb_transaction { on_finish(); } - void on_rollback() { on_finish(); } + void on_rollback() { + m_auto_incr_map.clear(); + on_finish(); + } private: std::atomic m_binlog_ttl_read_filtering_ts{0}; @@ -5097,13 +5117,16 @@ class Rdb_transaction { } void set_initial_savepoint() { + assert(statement_snapshot_type == snapshot_type::NONE); + assert(m_write_count[TABLE_TYPE::USER_TABLE] == 0); + assert(m_writes_at_last_savepoint == 0); + /* Set the initial savepoint. If the first statement in the transaction fails, we need something to roll back to, without rolling back the entire transaction. */ do_set_savepoint(); - m_writes_at_last_savepoint = m_write_count[USER_TABLE]; } /* @@ -5298,7 +5321,7 @@ class Rdb_transaction_impl : public Rdb_transaction { assert(m_rocksdb_tx[table_type] == nullptr); // If m_rocksdb_reuse_tx[table_type] is nullptr this will create a new // transaction object. Otherwise it will reuse the existing one. - m_rocksdb_tx[table_type].reset(rdb->BeginTransaction( + m_rocksdb_tx[table_type].reset(rdb_get_rocksdb_db().BeginTransaction( write_opts, tx_opts, m_rocksdb_reuse_tx[table_type].release())); } @@ -5427,21 +5450,16 @@ class Rdb_transaction_impl : public Rdb_transaction { goto error; } - on_commit(table_type); error: - if (table_type == USER_TABLE) { + if (likely(!res)) + on_commit(table_type); + else if (table_type == USER_TABLE) on_rollback(); + + if (table_type == USER_TABLE) { /* Save the transaction object to be reused */ release_tx(wb_size); - m_write_count[USER_TABLE] = 0; - m_write_count[INTRINSIC_TMP] = 0; - m_insert_count = 0; - m_update_count = 0; - m_delete_count = 0; - m_row_lock_count = 0; - m_rollback_only = false; } else { - m_write_count[INTRINSIC_TMP] = 0; // clean up only tmp table tx release_intrinsic_table_tx(); } @@ -5460,19 +5478,10 @@ class Rdb_transaction_impl : public Rdb_transaction { m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->Rollback(); release_tx(wb_size); - - m_rollback_only = false; } else { release_intrinsic_table_tx(); } on_rollback(); - m_write_count[USER_TABLE] = 0; - m_write_count[INTRINSIC_TMP] = 0; - m_insert_count = 0; - m_update_count = 0; - m_delete_count = 0; - m_row_lock_count = 0; - m_auto_incr_map.clear(); reset_flags(); } @@ -5509,25 +5518,27 @@ class Rdb_transaction_impl : public Rdb_transaction { return; } - auto thd_ss = std::static_pointer_cast( +#ifndef NDEBUG + const auto thd_ss = std::static_pointer_cast( m_thd->get_explicit_snapshot()); - if (thd_ss) { - share_explicit_snapshot(std::move(thd_ss)); - } +#endif switch (statement_snapshot_type) { case Rdb_transaction::snapshot_type::NONE: + assert(thd_ss == nullptr); if (acquire_now) acquire_snapshot_now(); else acquire_snapshot_on_next_op(); break; case Rdb_transaction::snapshot_type::READ_ONLY_TRX: - assign_snapshot(rdb->GetSnapshot()); + assign_snapshot(rdb_get_rocksdb_db().GetSnapshot()); break; case Rdb_transaction::snapshot_type::EXPLICIT: + assert(thd_ss != nullptr); break; case Rdb_transaction::snapshot_type::CURRENT_DELAYED: + assert(thd_ss == nullptr); if (acquire_now) { acquire_snapshot_now(); } @@ -5603,7 +5614,7 @@ class Rdb_transaction_impl : public Rdb_transaction { break; } case Rdb_transaction::snapshot_type::READ_ONLY_TRX: - rdb->ReleaseSnapshot(m_read_opts[table_type].snapshot); + rdb_get_rocksdb_db().ReleaseSnapshot(m_read_opts[table_type].snapshot); m_read_opts[table_type].snapshot = nullptr; statement_snapshot_type = snapshot_type::NONE; break; @@ -5814,6 +5825,7 @@ class Rdb_transaction_impl : public Rdb_transaction { m_read_opts[table_type], &column_family, key, value, exclusive, m_read_opts[table_type].snapshot ? do_validate : false); } else { + assert(statement_snapshot_type == snapshot_type::CURRENT); // If snapshot is set, and if skipping validation, // call GetForUpdate without validation and set back old snapshot auto saved_snapshot = m_read_opts[table_type].snapshot; @@ -6012,7 +6024,10 @@ class Rdb_writebatch_impl : public Rdb_transaction { } private: - bool prepare() override { return true; } + bool prepare() override { + assert(statement_snapshot_type == snapshot_type::CURRENT); + return true; + } bool commit_no_binlog(TABLE_TYPE table_type) override { assert(!is_ac_nl_ro_rc_transaction()); @@ -6033,22 +6048,21 @@ class Rdb_writebatch_impl : public Rdb_transaction { } release_snapshot(table_type); - s = rdb->Write(write_opts, optimize, m_batch.GetWriteBatch()); + s = rdb_get_rocksdb_db().Write(write_opts, optimize, + m_batch.GetWriteBatch()); if (!s.ok()) { rdb_handle_io_error(s, RDB_IO_ERROR_TX_COMMIT); res = true; goto error; } - on_commit(table_type); + error: - on_rollback(); + if (likely(!res)) + on_commit(table_type); + else + on_rollback(); reset(); - m_write_count[table_type] = 0; - m_insert_count = 0; - m_update_count = 0; - m_delete_count = 0; - m_rollback_only = false; return res; } @@ -6086,16 +6100,13 @@ class Rdb_writebatch_impl : public Rdb_transaction { } void rollback() override { + assert(statement_snapshot_type == snapshot_type::NONE || + statement_snapshot_type == snapshot_type::CURRENT); + release_snapshot(TABLE_TYPE::USER_TABLE); on_rollback(); - m_write_count[TABLE_TYPE::USER_TABLE] = 0; - m_insert_count = 0; - m_update_count = 0; - m_delete_count = 0; - m_row_lock_count = 0; reset(); - m_rollback_only = false; } void acquire_snapshot(bool acquire_now MY_ATTRIBUTE((unused)), @@ -6108,7 +6119,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { if (!has_snapshot(table_type)) { assert(statement_snapshot_type == snapshot_type::NONE); statement_snapshot_type = snapshot_type::CURRENT; - assign_snapshot(rdb->GetSnapshot()); + assign_snapshot(rdb_get_rocksdb_db().GetSnapshot()); } assert_snapshot_invariants(); } @@ -6122,7 +6133,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { if (has_snapshot(table_type)) { assert(statement_snapshot_type == snapshot_type::CURRENT); statement_snapshot_type = snapshot_type::NONE; - rdb->ReleaseSnapshot(m_read_opts[table_type].snapshot); + rdb_get_rocksdb_db().ReleaseSnapshot(m_read_opts[table_type].snapshot); m_read_opts[table_type].snapshot = nullptr; } assert_snapshot_invariants(); @@ -6140,12 +6151,12 @@ class Rdb_writebatch_impl : public Rdb_transaction { const rocksdb::Slice &value, TABLE_TYPE table_type, bool) override { assert(!is_ac_nl_ro_rc_transaction()); + assert(statement_snapshot_type == snapshot_type::CURRENT); if (table_type == TABLE_TYPE::INTRINSIC_TMP) { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - assert(!is_ac_nl_ro_rc_transaction()); ++m_write_count[table_type]; m_batch.Put(&column_family, key, value); @@ -6158,6 +6169,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { rocksdb::ColumnFamilyHandle &column_family, const rocksdb::Slice &key, TABLE_TYPE table_type, bool) override { assert(!is_ac_nl_ro_rc_transaction()); + assert(statement_snapshot_type == snapshot_type::CURRENT); if (table_type == TABLE_TYPE::INTRINSIC_TMP) { assert(false); @@ -6174,6 +6186,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { rocksdb::ColumnFamilyHandle &column_family, const rocksdb::Slice &key, TABLE_TYPE table_type, bool) override { assert(!is_ac_nl_ro_rc_transaction()); + assert(statement_snapshot_type == snapshot_type::CURRENT); if (table_type == TABLE_TYPE::INTRINSIC_TMP) { assert(false); @@ -6197,6 +6210,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { TABLE_TYPE table_type) override { assert(table_type != TABLE_TYPE::INTRINSIC_TMP); assert(!is_ac_nl_ro_rc_transaction()); + assert(statement_snapshot_type == snapshot_type::CURRENT); ++m_write_count[table_type]; return m_batch; @@ -6214,8 +6228,9 @@ class Rdb_writebatch_impl : public Rdb_transaction { "Not supported for intrinsic tmp tables"); } value->Reset(); - return m_batch.GetFromBatchAndDB(rdb, m_read_opts[table_type], - &column_family, key, value); + return m_batch.GetFromBatchAndDB(&rdb_get_rocksdb_db(), + m_read_opts[table_type], &column_family, + key, value); } void multi_get(rocksdb::ColumnFamilyHandle &column_family, size_t num_keys, @@ -6228,9 +6243,9 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - m_batch.MultiGetFromBatchAndDB(rdb, m_read_opts[table_type], &column_family, - num_keys, keys, values, statuses, - sorted_input); + m_batch.MultiGetFromBatchAndDB( + &rdb_get_rocksdb_db(), m_read_opts[table_type], &column_family, + num_keys, keys, values, statuses, sorted_input); } rocksdb::Status get_for_update(const Rdb_key_def &key_descr, @@ -6267,7 +6282,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return nullptr; } - const auto it = rdb->NewIterator(options); + const auto it = rdb_get_rocksdb_db().NewIterator(options); return std::unique_ptr(m_batch.NewIteratorWithBase(it)); } @@ -6806,7 +6821,7 @@ static int rocksdb_close_connection( rocksdb_remove_checkpoint(checkpoint_dir); } if (get_ha_data(thd)->get_disable_file_deletions()) { - rdb->EnableFileDeletions(); + rdb_get_rocksdb_db().EnableFileDeletions(); } destroy_ha_data(thd); return HA_EXIT_SUCCESS; @@ -6815,7 +6830,6 @@ static int rocksdb_close_connection( static int rocksdb_create_temporary_checkpoint_validate( my_core::THD *const thd, my_core::SYS_VAR *const /* unused */, void *const save, my_core::st_mysql_value *const value) { - assert(rdb != nullptr); assert(thd != nullptr); const char *current_checkpoint_dir = THDVAR(thd, create_temporary_checkpoint); @@ -6855,16 +6869,15 @@ static int rocksdb_create_temporary_checkpoint_validate( static void rocksdb_disable_file_deletions_update( my_core::THD *const thd, my_core::SYS_VAR *const /* unused */, void *const var_ptr, const void *const save) { - assert(rdb != nullptr); assert(thd != nullptr); bool val = *static_cast(var_ptr) = *static_cast(save); bool old_val = get_ha_data(thd)->get_disable_file_deletions(); if (val && !old_val) { - rdb->DisableFileDeletions(); + rdb_get_rocksdb_db().DisableFileDeletions(); get_ha_data(thd)->set_disable_file_deletions(true); } else if (!val && old_val) { - rdb->EnableFileDeletions(); + rdb_get_rocksdb_db().EnableFileDeletions(); get_ha_data(thd)->set_disable_file_deletions(false); } } @@ -6878,8 +6891,6 @@ static void rocksdb_disable_file_deletions_update( */ static bool rocksdb_flush_wal(handlerton *const hton MY_ATTRIBUTE((__unused__)), bool binlog_group_flush) { - assert(rdb != nullptr); - rocksdb::Status s; if ((!binlog_group_flush && !rocksdb_db_options->allow_mmap_writes) || rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER) { @@ -6887,7 +6898,7 @@ static bool rocksdb_flush_wal(handlerton *const hton MY_ATTRIBUTE((__unused__)), bool sync = rdb_sync_wal_supported() && (!binlog_group_flush || rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC); - s = rdb->FlushWAL(sync); + s = rdb_get_rocksdb_db().FlushWAL(sync); } if (!s.ok()) { @@ -7021,7 +7032,7 @@ static xa_status_code rocksdb_commit_by_xid( const auto name = rdb_xid_to_string(*xid); assert(!name.empty()); - rocksdb::Transaction *const trx = rdb->GetTransactionByName(name); + auto *const trx = rdb_get_rocksdb_db().GetTransactionByName(name); if (trx == nullptr) { DBUG_RETURN(XAER_NOTA); @@ -7061,11 +7072,9 @@ static xa_status_code rocksdb_rollback_by_xid( assert(hton != nullptr); assert(xid != nullptr); - assert(rdb != nullptr); const auto name = rdb_xid_to_string(*xid); - - rocksdb::Transaction *const trx = rdb->GetTransactionByName(name); + auto *const trx = rdb_get_rocksdb_db().GetTransactionByName(name); if (trx == nullptr) { DBUG_RETURN(XAER_NOTA); @@ -7217,7 +7226,7 @@ static int rocksdb_recover(handlerton *const hton [[maybe_unused]], } std::vector trans_list; - rdb->GetAllPreparedTransactions(&trans_list); + rdb_get_rocksdb_db().GetAllPreparedTransactions(&trans_list); uint count = 0; for (auto &trans : trans_list) { @@ -7472,7 +7481,7 @@ class Rdb_snapshot_status : public Rdb_tx_list_walker { const auto earliest_snapshot_timestamp = tx->get_earliest_snapshot_ts(); if (earliest_snapshot_timestamp != 0) { int64_t curr_time; - rdb->GetEnv()->GetCurrentTime(&curr_time); + rdb_get_rocksdb_db().GetEnv()->GetCurrentTime(&curr_time); const auto earliest_snapshot_age = curr_time - earliest_snapshot_timestamp; @@ -7516,7 +7525,7 @@ class Rdb_snapshot_status : public Rdb_tx_list_walker { } void populate_deadlock_buffer() { - auto dlock_buffer = rdb->GetDeadlockInfoBuffer(); + const auto dlock_buffer = rdb_get_rocksdb_db().GetDeadlockInfoBuffer(); m_data += "----------LATEST DETECTED DEADLOCKS----------\n"; for (const auto &path_entry : dlock_buffer) { @@ -7558,7 +7567,7 @@ class Rdb_snapshot_status : public Rdb_tx_list_walker { std::vector get_deadlock_info() { std::vector deadlock_info; - auto dlock_buffer = rdb->GetDeadlockInfoBuffer(); + const auto dlock_buffer = rdb_get_rocksdb_db().GetDeadlockInfoBuffer(); for (const auto &path_entry : dlock_buffer) { if (!path_entry.limit_exceeded) { deadlock_info.push_back(get_dl_path_trx_info(path_entry)); @@ -7707,10 +7716,9 @@ static uint64_t advance_binlog_ttl_compaction_timestamp(uint64_t ts) { static bool rocksdb_update_binlog_ttl_compaction_ts( handlerton *const hton MY_ATTRIBUTE((__unused__)), THD *thd, uint64_t *timestamp) { - assert(rdb != nullptr); assert(timestamp != nullptr); - if (unlikely(!timestamp || !rdb)) { + if (unlikely(!timestamp)) { return HA_EXIT_FAILURE; } @@ -7809,8 +7817,6 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, char buf[100] = {'\0'}; if (stat_type == HA_ENGINE_STATUS) { - assert(rdb != nullptr); - std::string str; /* Global DB Statistics */ @@ -7840,13 +7846,14 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, // // NB! We're replacing hyphens with underscores in output to better match // the existing naming convention. - if (rdb->GetIntProperty("rocksdb.is-write-stopped", &v)) { + if (rdb_get_rocksdb_db().GetIntProperty("rocksdb.is-write-stopped", &v)) { snprintf(buf, sizeof(buf), "rocksdb.is_write_stopped COUNT : %" PRIu64 "\n", v); str.append(buf); } - if (rdb->GetIntProperty("rocksdb.actual-delayed-write-rate", &v)) { + if (rdb_get_rocksdb_db().GetIntProperty( + "rocksdb.actual-delayed-write-rate", &v)) { snprintf(buf, sizeof(buf), "rocksdb.actual_delayed_write_rate " "COUNT : %" PRIu64 "\n", @@ -7858,7 +7865,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, } /* Per DB stats */ - if (rdb->GetProperty("rocksdb.dbstats", &str)) { + if (rdb_get_rocksdb_db().GetProperty("rocksdb.dbstats", &str)) { res |= print_stats(thd, "DBSTATS", "rocksdb", str, stat_print); } @@ -7873,7 +7880,8 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, // Retrieve information from CF handle object. // Even if the CF is removed from CF_manager, the handle object // is valid. - if (!rdb->GetProperty(cfh.get(), "rocksdb.cfstats", &str)) { + if (!rdb_get_rocksdb_db().GetProperty(cfh.get(), "rocksdb.cfstats", + &str)) { continue; } @@ -7886,7 +7894,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, size_t internal_cache_count = 0; size_t kDefaultInternalCacheSize = 8 * 1024 * 1024; - dbs.push_back(rdb); + dbs.push_back(&rdb_get_rocksdb_db()); cache_set.insert(rocksdb_tbl_options->block_cache.get()); for (const auto &cf_handle : cf_manager.get_all_cf()) { @@ -7938,7 +7946,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, /* Show the background thread status */ std::vector thread_list; - rocksdb::Status s = rdb->GetEnv()->GetThreadList(&thread_list); + const auto s = rdb_get_rocksdb_db().GetEnv()->GetThreadList(&thread_list); // GetThreadList() may return Status::NotSupported when // ROCKSDB_USING_THREAD_STATUS is not defined @@ -7998,10 +8006,8 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, returns false on success */ -static bool rocksdb_lock_hton_log( - handlerton *const MY_ATTRIBUTE((__unused__))) { - assert(rdb != nullptr); - return !rdb->LockWAL().ok(); +static bool rocksdb_lock_hton_log(handlerton *) { + return !rdb_get_rocksdb_db().LockWAL().ok(); } /* @@ -8009,9 +8015,8 @@ static bool rocksdb_lock_hton_log( returns false on success */ -static bool rocksdb_unlock_hton_log(handlerton *const /* unused */) { - assert(rdb != nullptr); - return !rdb->UnlockWAL().ok(); +static bool rocksdb_unlock_hton_log(handlerton *) { + return !rdb_get_rocksdb_db().UnlockWAL().ok(); } /* @@ -8032,7 +8037,7 @@ static bool rocksdb_collect_hton_log_info(handlerton *const /* unused */, Json_dom *json) { bool ret_val = false; rocksdb::VectorLogPtr live_wal_files; - const auto s = rdb->GetSortedWalFiles(live_wal_files); + const auto s = rdb_get_rocksdb_db().GetSortedWalFiles(live_wal_files); if (!s.ok()) { return true; @@ -8067,25 +8072,33 @@ static bool rocksdb_collect_hton_log_info(handlerton *const /* unused */, return ret_val; } -static inline void rocksdb_register_tx( - handlerton *const hton MY_ATTRIBUTE((__unused__)), THD *const thd, - Rdb_transaction *const tx) { - assert(tx != nullptr); +static inline void rocksdb_register_tx(handlerton *hton [[maybe_unused]], + THD &thd, Rdb_transaction &tx) { + assert(hton == rocksdb_hton); - trans_register_ha(thd, false, rocksdb_hton, NULL); + trans_register_ha(&thd, false, rocksdb_hton, nullptr); if (rocksdb_write_policy == rocksdb::TxnDBWritePolicy::WRITE_UNPREPARED) { // Some internal operations will call trans_register_ha, but they do not // go through 2pc. In this case, the xid is set with query_id == 0, which // means that rocksdb will receive transactions with duplicate names. // // Skip setting name in these cases. - if (thd->query_id != 0) { - tx->set_name(); + if (thd.query_id != 0) { + tx.set_name(); + } + } + + if (!tx.has_explicit_or_read_only_snapshot()) { + auto thd_ss = std::static_pointer_cast( + thd.get_explicit_snapshot()); + if (thd_ss) { + tx.share_explicit_snapshot(std::move(thd_ss)); } } - if (!is_autocommit(*thd)) { - tx->start_stmt(); - trans_register_ha(thd, true, rocksdb_hton, NULL); + + if (!is_autocommit(thd)) { + tx.start_stmt(); + trans_register_ha(&thd, true, rocksdb_hton, nullptr); } } @@ -8099,8 +8112,7 @@ static int rocksdb_explicit_snapshot( if (mysql_bin_log_is_open()) { mysql_bin_log_lock_commits(ss_info); } - auto s = - Rdb_explicit_snapshot::create(thd, *ss_info, rdb, rdb->GetSnapshot()); + const auto s = Rdb_explicit_snapshot::create(*thd, *ss_info); if (mysql_bin_log_is_open()) { mysql_bin_log_unlock_commits(ss_info); } @@ -8168,11 +8180,11 @@ static int rocksdb_start_tx_and_assign_read_view( return HA_EXIT_FAILURE; } - Rdb_transaction *const tx = get_or_create_tx(thd, TABLE_TYPE::USER_TABLE); - Rdb_perf_context_guard guard(tx, thd); + auto &tx = *get_or_create_tx(thd, TABLE_TYPE::USER_TABLE); + Rdb_perf_context_guard guard(&tx, thd); - tx->set_tx_read_only(); - rocksdb_register_tx(hton, thd, tx); + tx.set_tx_read_only(); + rocksdb_register_tx(hton, *thd, tx); const uint64_t client_provided_read_filtering_ts = rdb_is_binlog_ttl_enabled() @@ -8192,7 +8204,7 @@ static int rocksdb_start_tx_and_assign_read_view( if (ss_info) { ss_info->read_filtering_ts = read_filtering_ts; } - tx->set_ttl_read_filtering_ts(read_filtering_ts); + tx.set_ttl_read_filtering_ts(read_filtering_ts); return HA_EXIT_SUCCESS; } @@ -8255,7 +8267,7 @@ static int rocksdb_start_tx_with_shared_read_view( } } - rocksdb_register_tx(hton, thd, tx); + rocksdb_register_tx(hton, *thd, *tx); } // case: unlock the binlog @@ -9155,8 +9167,9 @@ static int rocksdb_init_internal(void *const p) { // NO_LINT_DEBUG LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "RocksDB: Opening TransactionDB..."); - status = rocksdb::TransactionDB::Open( - main_opts, tx_db_options, rocksdb_datadir, cf_descr, &cf_handles, &rdb); + status = + rocksdb::TransactionDB::Open(main_opts, tx_db_options, rocksdb_datadir, + cf_descr, &cf_handles, &detail::rdb); DBUG_EXECUTE_IF("rocksdb_init_failure_open_db", { // Simulate opening TransactionDB failure @@ -9176,7 +9189,7 @@ static int rocksdb_init_internal(void *const p) { "Verifying file checksums..."); rocksdb::ReadOptions checksum_read_options; checksum_read_options.readahead_size = 2 * 1024 * 1024; - status = rdb->VerifyFileChecksums(checksum_read_options); + status = rdb_get_rocksdb_db().VerifyFileChecksums(checksum_read_options); if (!status.ok()) { rdb_log_status_error(status, "Instance failed checksum verification"); for (auto cfh_ptr : cf_handles) delete (cfh_ptr); @@ -9189,7 +9202,8 @@ static int rocksdb_init_internal(void *const p) { LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "RocksDB:Init column families..."); if (st_rdb_exec_time.exec("cf_manager::init", [&]() { - return cf_manager.init(rdb, std::move(cf_options_map), &cf_handles); + return cf_manager.init(rdb_get_rocksdb_db(), std::move(cf_options_map), + &cf_handles); })) { // NO_LINT_DEBUG LogPluginErrMsg(ERROR_LEVEL, ER_LOG_PRINTF_MSG, @@ -9201,7 +9215,7 @@ static int rocksdb_init_internal(void *const p) { LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, "RocksDB: Initializing data dictionary..."); if (st_rdb_exec_time.exec("Rdb_dict_manager_selector::init", [&]() { - return dict_manager.init(rdb, &cf_manager, + return dict_manager.init(rdb_get_rocksdb_db(), &cf_manager, rocksdb_enable_remove_orphaned_dropped_cfs); })) { // NO_LINT_DEBUG @@ -9242,7 +9256,7 @@ static int rocksdb_init_internal(void *const p) { DBUG_EXECUTE_IF("rocksdb_init_failure_managers", { DBUG_RETURN(HA_EXIT_FAILURE); }); - Rdb_sst_info::init(rdb); + Rdb_sst_info::init(rdb_get_rocksdb_db()); /* Enable auto compaction, things needed for compaction filter are finished @@ -9255,7 +9269,8 @@ static int rocksdb_init_internal(void *const p) { new_compaction_enabled_cf_handles.push_back(cfh_ptr); } } - status = rdb->EnableAutoCompaction(new_compaction_enabled_cf_handles); + status = rdb_get_rocksdb_db().EnableAutoCompaction( + new_compaction_enabled_cf_handles); if (!status.ok()) { rdb_log_status_error(status, "Error enabling compaction"); @@ -9325,7 +9340,7 @@ static int rocksdb_init_internal(void *const p) { { DBUG_RETURN(HA_EXIT_FAILURE); }); if (rocksdb_pause_background_work) { - rdb->PauseBackgroundWork(); + rdb_get_rocksdb_db().PauseBackgroundWork(); } // NO_LINT_DEBUG @@ -9418,8 +9433,8 @@ static int rocksdb_shutdown(bool minimalShutdown) { rocksdb_flush_all_memtables(); // Stop all rocksdb background work - if (rdb && rdb->GetBaseDB()) { - CancelAllBackgroundWork(rdb->GetBaseDB(), true); + if (detail::rdb != nullptr && rdb_get_rocksdb_db().GetBaseDB() != nullptr) { + CancelAllBackgroundWork(rdb_get_rocksdb_db().GetBaseDB(), true); } // Signal the background thread to stop and to persist all stats collected @@ -9526,8 +9541,8 @@ static int rocksdb_shutdown(bool minimalShutdown) { clone::client_shutdown(); clone::donor_shutdown(); - delete rdb; - rdb = nullptr; + delete detail::rdb; + detail::rdb = nullptr; delete commit_latency_stats; commit_latency_stats = nullptr; @@ -10829,7 +10844,7 @@ bool ha_rocksdb::create_cfs( auto local_dict_manager = dict_manager.get_dict_manager_selector_non_const(cf_name); std::lock_guard dm_lock(*local_dict_manager); - cf_handle = cf_manager.get_or_create_cf(rdb, cf_name); + cf_handle = cf_manager.get_or_create_cf(rdb_get_rocksdb_db(), cf_name); if (!cf_handle) { return true; } @@ -15384,7 +15399,7 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) { } } tx->m_n_mysql_tables_in_use++; - rocksdb_register_tx(rocksdb_hton, thd, tx); + rocksdb_register_tx(rocksdb_hton, *thd, *tx); tx->io_perf_start(&m_io_perf); } @@ -15413,7 +15428,7 @@ int ha_rocksdb::start_stmt(THD *const thd, Rdb_transaction *const tx = get_or_create_tx(thd, m_tbl_def->get_table_type()); read_thd_vars(thd); - rocksdb_register_tx(ht, thd, tx); + rocksdb_register_tx(ht, *thd, *tx); tx->io_perf_start(&m_io_perf); DBUG_RETURN(HA_EXIT_SUCCESS); @@ -15474,8 +15489,8 @@ static int delete_range(const std::unordered_set &indices) { uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; rocksdb::Range range = get_range(d.index_id, buf, is_reverse_cf ? 1 : 0, is_reverse_cf ? 0 : 1); - rocksdb::Status status = DeleteFilesInRange(rdb->GetBaseDB(), cfh.get(), - &range.start, &range.limit); + auto status = DeleteFilesInRange(rdb_get_rocksdb_db().GetBaseDB(), + cfh.get(), &range.start, &range.limit); if (!status.ok()) { // NO_LINT_DEBUG LogPluginErrMsg( @@ -15501,13 +15516,13 @@ static int delete_range(const std::unordered_set &indices) { rocksdb::TransactionDBWriteOptimizations optimize; optimize.skip_concurrency_control = true; optimize.skip_duplicate_key_check = true; - rocksdb::Status status = - rdb->Write(rocksdb::WriteOptions(), optimize, &batch); + const auto status = + rdb_get_rocksdb_db().Write(rocksdb::WriteOptions(), optimize, &batch); if (status.ok()) { if (!rdb_sync_wal_supported()) { // If we don't support SyncWAL, do a flush at least - rdb->FlushWAL(false); + rdb_get_rocksdb_db().FlushWAL(false); } } return HA_EXIT_SUCCESS; @@ -15529,7 +15544,8 @@ static bool is_myrocks_index_empty(rocksdb::ColumnFamilyHandle *cfh, rdb_netbuf_store_uint32(key_buf, index_id); const rocksdb::Slice key = rocksdb::Slice(reinterpret_cast(key_buf), sizeof(key_buf)); - std::unique_ptr it(rdb->NewIterator(read_opts, cfh)); + std::unique_ptr it( + rdb_get_rocksdb_db().NewIterator(read_opts, cfh)); rocksdb_smart_seek(is_reverse_cf, *it, key); if (!it->Valid()) { index_removed = true; @@ -15616,8 +15632,9 @@ void Rdb_drop_index_thread::run() { rocksdb::Range range = get_range( d.index_id, buf, is_reverse_cf ? 1 : 0, is_reverse_cf ? 0 : 1); - rocksdb::Status status = DeleteFilesInRange( - rdb->GetBaseDB(), cfh.get(), &range.start, &range.limit); + auto status = + DeleteFilesInRange(rdb_get_rocksdb_db().GetBaseDB(), cfh.get(), + &range.start, &range.limit); if (!status.ok()) { if (status.IsIncomplete()) { continue; @@ -15627,8 +15644,8 @@ void Rdb_drop_index_thread::run() { rdb_handle_io_error(status, RDB_IO_ERROR_BG_THREAD); } - status = rdb->CompactRange(getCompactRangeOptions(), cfh.get(), - &range.start, &range.limit); + status = rdb_get_rocksdb_db().CompactRange( + getCompactRangeOptions(), cfh.get(), &range.start, &range.limit); if (!status.ok()) { if (status.IsIncomplete()) { continue; @@ -15685,7 +15702,8 @@ void Rdb_drop_index_thread::run() { for (const auto cf_id : dropped_cf_ids) { if (ongoing_drop_cf_ids.find(cf_id) == ongoing_drop_cf_ids.end()) { - cf_manager.remove_dropped_cf(local_dict_manager, rdb, cf_id); + cf_manager.remove_dropped_cf(local_dict_manager, + rdb_get_rocksdb_db(), cf_id); } } } @@ -16123,11 +16141,13 @@ void ha_rocksdb::records_in_range_internal(uint inx, key_range *const min_key, // Getting statistics, including from Memtables rocksdb::DB::SizeApproximationFlags include_flags = rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES; - rdb->GetApproximateSizes(&kd.get_cf(), &r, 1, &sz, include_flags); + rdb_get_rocksdb_db().GetApproximateSizes(&kd.get_cf(), &r, 1, &sz, + include_flags); *row_count = rows * ((double)sz / (double)disk_size); *total_size = sz; uint64_t memTableCount; - rdb->GetApproximateMemTableStats(&kd.get_cf(), r, &memTableCount, &sz); + rdb_get_rocksdb_db().GetApproximateMemTableStats(&kd.get_cf(), r, + &memTableCount, &sz); *row_count += memTableCount; *total_size += sz; DBUG_VOID_RETURN; @@ -16174,9 +16194,9 @@ int ha_rocksdb::optimize(THD *const thd MY_ATTRIBUTE((__unused__)), for (uint i = 0; i < table->s->keys; i++) { uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; auto range = get_range(i, buf); - const auto s = rdb->CompactRange(getCompactRangeOptions(), - &m_key_descr_arr[i]->get_cf(), - &range.start, &range.limit); + const auto s = rdb_get_rocksdb_db().CompactRange( + getCompactRangeOptions(), &m_key_descr_arr[i]->get_cf(), &range.start, + &range.limit); if (!s.ok()) { DBUG_RETURN(rdb_error_to_mysql(s)); } @@ -16248,8 +16268,8 @@ static int calculate_cardinality_table_scan( auto r = ha_rocksdb::get_range(*kd, r_buf); uint64_t memtableCount; uint64_t memtableSize; - rdb->GetApproximateMemTableStats(&kd->get_cf(), r, &memtableCount, - &memtableSize); + rdb_get_rocksdb_db().GetApproximateMemTableStats( + &kd->get_cf(), r, &memtableCount, &memtableSize); if (scan_type == SCAN_TYPE_MEMTABLE_ONLY && memtableCount < (uint64_t)stat.m_rows / 10) { @@ -16267,8 +16287,8 @@ static int calculate_cardinality_table_scan( stat.m_actual_disk_size = memtableSize; } - std::unique_ptr it = std::unique_ptr( - rdb->NewIterator(read_opts, &kd->get_cf())); + const auto it = std::unique_ptr( + rdb_get_rocksdb_db().NewIterator(read_opts, &kd->get_cf())); rocksdb::Slice first_index_key((const char *)r_buf, Rdb_key_def::INDEX_NUMBER_SIZE); @@ -16394,7 +16414,7 @@ static int read_stats_from_ssts( rocksdb::TablePropertiesCollection props; for (const auto &it : ranges) { const auto old_size MY_ATTRIBUTE((__unused__)) = props.size(); - const auto status = rdb->GetPropertiesOfTablesInRange( + const auto status = rdb_get_rocksdb_db().GetPropertiesOfTablesInRange( it.first, &it.second[0], it.second.size(), &props); assert(props.size() >= old_size); if (!status.ok()) { @@ -16619,7 +16639,8 @@ int ha_rocksdb::adjust_handler_stats_sst_and_memtable(ha_statistics *ha_stats, uint64_t sz = 0; rocksdb::DB::SizeApproximationFlags include_flags = rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES; - rdb->GetApproximateSizes(&pk_def->get_cf(), &r, 1, &sz, include_flags); + rdb_get_rocksdb_db().GetApproximateSizes(&pk_def->get_cf(), &r, 1, &sz, + include_flags); ha_stats->records += sz / ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE; ha_stats->data_file_length += sz; } @@ -16637,8 +16658,8 @@ int ha_rocksdb::adjust_handler_stats_sst_and_memtable(ha_statistics *ha_stats, // it also can return 0 for quite a large tables which means that // cardinality for memtable only indxes will be reported as 0 - rdb->GetApproximateMemTableStats(&pk_def->get_cf(), r, &memtableCount, - &memtableSize); + rdb_get_rocksdb_db().GetApproximateMemTableStats( + &pk_def->get_cf(), r, &memtableCount, &memtableSize); // Atomically update all of these fields at the same time if (cachetime > 0) { @@ -18163,7 +18184,7 @@ static void myrocks_update_status() { static void myrocks_update_memory_status() { std::vector dbs; std::unordered_set cache_set; - dbs.push_back(rdb); + dbs.push_back(&rdb_get_rocksdb_db()); std::map temp_usage_by_type; rocksdb::MemoryUtil::GetApproximateMemoryUsageByType(dbs, cache_set, &temp_usage_by_type); @@ -18255,7 +18276,7 @@ static void update_rocksdb_stall_status() { // Retrieve information from valid CF handle object. It is safe // even if the CF is removed from cf_manager at this point. std::map props; - if (!rdb->GetMapProperty( + if (!rdb_get_rocksdb_db().GetMapProperty( cfh.get(), rocksdb::DB::Properties::kCFWriteStallStats, &props)) { continue; } @@ -18538,10 +18559,10 @@ void Rdb_background_thread::run() { // InnoDB's behavior. For mode never, the wal file isn't even written, // whereas background writes to the wal file, but issues the syncs in a // background thread. - if (rdb && (rocksdb_flush_log_at_trx_commit != FLUSH_LOG_SYNC) && + if (detail::rdb && (rocksdb_flush_log_at_trx_commit != FLUSH_LOG_SYNC) && !rocksdb_db_options->allow_mmap_writes) { bool sync = rdb_sync_wal_supported(); - const rocksdb::Status s = rdb->FlushWAL(sync); + const auto s = rdb_get_rocksdb_db().FlushWAL(sync); if (!s.ok()) { rdb_handle_io_error(s, RDB_IO_ERROR_BG_THREAD); } @@ -18874,8 +18895,8 @@ void Rdb_manual_compaction_thread::run() { // CompactRange may take a very long time. On clean shutdown, // it is cancelled by CancelAllBackgroundWork, then status is // set to shutdownInProgress. - const rocksdb::Status s = - rdb->CompactRange(mcr.option, mcr.cf.get(), mcr.start, mcr.limit); + const auto s = rdb_get_rocksdb_db().CompactRange(mcr.option, mcr.cf.get(), + mcr.start, mcr.limit); rocksdb_manual_compactions_running--; if (s.ok()) { @@ -19199,9 +19220,6 @@ bool ha_rocksdb::can_use_bloom_filter(THD *thd, const Rdb_key_def &kd, return can_use; } -/* For modules that need access to the global data structures */ -rocksdb::TransactionDB *rdb_get_rocksdb_db() { return rdb; } - Rdb_cf_manager &rdb_get_cf_manager() { return cf_manager; } const rocksdb::BlockBasedTableOptions &rdb_get_table_options() { @@ -19475,8 +19493,8 @@ static void rocksdb_set_delayed_write_rate( const uint64_t new_val = *static_cast(save); if (rocksdb_db_options->delayed_write_rate != new_val) { rocksdb_db_options->delayed_write_rate = new_val; - rocksdb::Status s = - rdb->SetDBOptions({{"delayed_write_rate", std::to_string(new_val)}}); + const auto s = rdb_get_rocksdb_db().SetDBOptions( + {{"delayed_write_rate", std::to_string(new_val)}}); if (!s.ok()) { /* NO_LINT_DEBUG */ @@ -19494,7 +19512,8 @@ static void rocksdb_set_max_latest_deadlocks( const uint32_t new_val = *static_cast(save); if (rocksdb_max_latest_deadlocks != new_val) { rocksdb_max_latest_deadlocks = new_val; - rdb->SetDeadlockInfoBufferSize(rocksdb_max_latest_deadlocks); + rdb_get_rocksdb_db().SetDeadlockInfoBufferSize( + rocksdb_max_latest_deadlocks); } } @@ -19606,8 +19625,8 @@ static void rocksdb_set_max_background_jobs( if (rocksdb_db_options->max_background_jobs != new_val) { rocksdb_db_options->max_background_jobs = new_val; - rocksdb::Status s = - rdb->SetDBOptions({{"max_background_jobs", std::to_string(new_val)}}); + const auto s = rdb_get_rocksdb_db().SetDBOptions( + {{"max_background_jobs", std::to_string(new_val)}}); if (!s.ok()) { /* NO_LINT_DEBUG */ @@ -19631,7 +19650,7 @@ static void rocksdb_set_max_background_compactions( if (rocksdb_db_options->max_background_compactions != new_val) { rocksdb_db_options->max_background_compactions = new_val; - rocksdb::Status s = rdb->SetDBOptions( + const auto s = rdb_get_rocksdb_db().SetDBOptions( {{"max_background_compactions", std::to_string(new_val)}}); if (!s.ok()) { @@ -19698,8 +19717,8 @@ static void rocksdb_set_bytes_per_sync( if (rocksdb_db_options->bytes_per_sync != new_val) { rocksdb_db_options->bytes_per_sync = new_val; - rocksdb::Status s = - rdb->SetDBOptions({{"bytes_per_sync", std::to_string(new_val)}}); + const auto s = rdb_get_rocksdb_db().SetDBOptions( + {{"bytes_per_sync", std::to_string(new_val)}}); if (!s.ok()) { /* NO_LINT_DEBUG */ @@ -19723,8 +19742,8 @@ static void rocksdb_set_wal_bytes_per_sync( if (rocksdb_db_options->wal_bytes_per_sync != new_val) { rocksdb_db_options->wal_bytes_per_sync = new_val; - rocksdb::Status s = - rdb->SetDBOptions({{"wal_bytes_per_sync", std::to_string(new_val)}}); + const auto s = rdb_get_rocksdb_db().SetDBOptions( + {{"wal_bytes_per_sync", std::to_string(new_val)}}); if (!s.ok()) { /* NO_LINT_DEBUG */ @@ -19804,7 +19823,8 @@ static int rocksdb_validate_update_cf_options(THD * /* unused */, auto local_dict_manager = dict_manager.get_dict_manager_selector_non_const(cf_name); std::lock_guard dm_lock(*local_dict_manager); - auto cfh = cf_manager.get_or_create_cf(rdb, cf_name); + const auto cfh = + cf_manager.get_or_create_cf(rdb_get_rocksdb_db(), cf_name); if (!cfh) { return HA_EXIT_FAILURE; @@ -19878,13 +19898,11 @@ static void rocksdb_set_update_cf_options(THD *const /* unused */, "family '%s' to a map. %s", cf_name.c_str(), s.ToString().c_str()); } else { - assert(rdb != nullptr); - // Finally we can apply the options. // If cf_manager.drop_cf() has been called at this point, SetOptions() // will still succeed. The options data will only be cleared when // the CF handle object is destroyed. - s = rdb->SetOptions(cfh.get(), opt_map); + s = rdb_get_rocksdb_db().SetOptions(cfh.get(), opt_map); if (s != rocksdb::Status::OK()) { // NO_LINT_DEBUG @@ -19903,7 +19921,7 @@ static void rocksdb_set_update_cf_options(THD *const /* unused */, // the CF options. This is necessary also to make sure that the CF // options will be correctly reflected in the relevant table: // ROCKSDB_CF_OPTIONS in INFORMATION_SCHEMA. - const auto cf_options = rdb->GetOptions(cfh.get()); + const auto cf_options = rdb_get_rocksdb_db().GetOptions(cfh.get()); std::string updated_options; s = rocksdb::GetStringFromColumnFamilyOptions(&updated_options, @@ -20058,8 +20076,6 @@ rocksdb::DBOptions *get_rocksdb_db_options() { static void rocksdb_select_bypass_rejected_query_history_size_update( my_core::THD *const /* unused */, my_core::SYS_VAR *const /* unused */, void *const var_ptr, const void *const save) { - assert(rdb != nullptr); - uint32_t val = *static_cast(var_ptr) = *static_cast(save); @@ -20072,8 +20088,6 @@ static void rocksdb_select_bypass_rejected_query_history_size_update( static void rocksdb_max_compaction_history_update( my_core::THD *const /* unused */, my_core::SYS_VAR *const /* unused */, void *const var_ptr, const void *const save) { - assert(rdb != nullptr); - uint64_t val = *static_cast(var_ptr) = *static_cast(save); compaction_stats.resize_history(val); @@ -20199,22 +20213,22 @@ std::unique_ptr rdb_tx_get_iterator( THD *thd, rocksdb::ColumnFamilyHandle &cf, bool skip_bloom_filter, const rocksdb::Slice &eq_cond_lower_bound, const rocksdb::Slice &eq_cond_upper_bound, - const rocksdb::Snapshot **snapshot, TABLE_TYPE table_type, - bool read_current, bool create_snapshot) { + rdb_snapshot_unique_ptr *snapshot, TABLE_TYPE table_type, bool read_current, + bool create_snapshot) { if (commit_in_the_middle(thd)) { assert(snapshot && *snapshot == nullptr); if (snapshot) { - *snapshot = rdb->GetSnapshot(); + *snapshot = get_rdb_snapshot(); auto read_opts = rocksdb::ReadOptions(); // TODO(mung): set based on WHERE conditions read_opts.total_order_seek = true; - read_opts.snapshot = *snapshot; + read_opts.snapshot = (*snapshot).get(); if (rocksdb_enable_udt_in_mem && is_udt_compatible_cf(cf)) { Rdb_transaction *tx = get_tx_from_thd(thd); read_opts.timestamp = tx->get_tx_read_timestamp_slice(); } return std::unique_ptr( - rdb->NewIterator(read_opts, &cf)); + rdb_get_rocksdb_db().NewIterator(read_opts, &cf)); } else { return nullptr; } diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 28ce89398ad4..a75300c53c13 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -49,6 +49,7 @@ #include "./rdb_perf_context.h" #include "./rdb_sst_info.h" #include "./rdb_utils.h" +#include "ha_rocksdb_proto.h" #ifndef __APPLE__ #include "./rdb_io_watchdog.h" @@ -1218,7 +1219,7 @@ void rdb_tx_acquire_snapshot(Rdb_transaction &tx); THD *thd, rocksdb::ColumnFamilyHandle &cf, bool skip_bloom_filter, const rocksdb::Slice &eq_cond_lower_bound, const rocksdb::Slice &eq_cond_upper_bound, - const rocksdb::Snapshot **snapshot, TABLE_TYPE table_type, + rdb_snapshot_unique_ptr *snapshot, TABLE_TYPE table_type, bool read_current = false, bool create_snapshot = true); [[nodiscard]] rocksdb::Status rdb_tx_get( diff --git a/storage/rocksdb/ha_rocksdb_proto.h b/storage/rocksdb/ha_rocksdb_proto.h index 3b673f70be15..656506b246ba 100644 --- a/storage/rocksdb/ha_rocksdb_proto.h +++ b/storage/rocksdb/ha_rocksdb_proto.h @@ -72,11 +72,36 @@ void rdb_queue_save_stats_request(); extern const std::string TRUNCATE_TABLE_PREFIX; -/* - Access to singleton objects. -*/ +// Do not use declarations in this namespace outside of ha_rocksdb.cc +namespace detail { + +extern rocksdb::TransactionDB *rdb; + +} // namespace detail + +// Safe to call between successful call to rocksdb_init_internal and +// rocksdb_shutdown +[[nodiscard]] inline rocksdb::TransactionDB &rdb_get_rocksdb_db() { + return *detail::rdb; +} + +namespace detail { +struct rdb_snapshot_deleter { + void operator()(const rocksdb::Snapshot *snapshot) { + rdb_get_rocksdb_db().ReleaseSnapshot(snapshot); + } +}; + +} // namespace detail + +// Similar to rocksdb::ManagedSnapshot but taking less space and supporting move +// semantics too +using rdb_snapshot_unique_ptr = + std::unique_ptr; -rocksdb::TransactionDB *rdb_get_rocksdb_db(); +[[nodiscard]] inline rdb_snapshot_unique_ptr get_rdb_snapshot() { + return rdb_snapshot_unique_ptr{rdb_get_rocksdb_db().GetSnapshot()}; +} class Rdb_cf_manager; Rdb_cf_manager &rdb_get_cf_manager(); diff --git a/storage/rocksdb/rdb_bulk_load.cc b/storage/rocksdb/rdb_bulk_load.cc index 9912dcea87dc..0c23919f20a5 100644 --- a/storage/rocksdb/rdb_bulk_load.cc +++ b/storage/rocksdb/rdb_bulk_load.cc @@ -276,7 +276,7 @@ uint Rdb_bulk_load_context::notify_ddl(std::string_view db_name, } Rdb_sst_info *Rdb_bulk_load_context::add_sst_info( - rocksdb::DB *rdb, const std::string &tablename, const Rdb_key_def &kd, + rocksdb::DB &rdb, const std::string &tablename, const Rdb_key_def &kd, rocksdb::DBOptions &db_option, bool trace_sst_api, bool compression_parallel_threads) { auto sst_info_ptr = std::make_unique( diff --git a/storage/rocksdb/rdb_bulk_load.h b/storage/rocksdb/rdb_bulk_load.h index 9e6f8bb23774..7d0fd38c88f1 100644 --- a/storage/rocksdb/rdb_bulk_load.h +++ b/storage/rocksdb/rdb_bulk_load.h @@ -180,10 +180,12 @@ class Rdb_bulk_load_context { return it->second.get(); } - Rdb_sst_info *add_sst_info(rocksdb::DB *rdb, const std::string &tablename, - const Rdb_key_def &kd, - rocksdb::DBOptions &db_option, bool trace_sst_api, - bool compression_parallel_threads); + [[nodiscard]] Rdb_sst_info *add_sst_info(rocksdb::DB &rdb, + const std::string &tablename, + const Rdb_key_def &kd, + rocksdb::DBOptions &db_option, + bool trace_sst_api, + bool compression_parallel_threads); Rdb_index_merge *find_key_merge(GL_INDEX_ID index_id) { const auto it = m_key_merge.find(index_id); diff --git a/storage/rocksdb/rdb_cf_manager.cc b/storage/rocksdb/rdb_cf_manager.cc index df00c1386ce0..f01fe3eae220 100644 --- a/storage/rocksdb/rdb_cf_manager.cc +++ b/storage/rocksdb/rdb_cf_manager.cc @@ -45,7 +45,7 @@ bool Rdb_cf_manager::is_cf_name_reverse(std::string_view name) { return name.compare(0, 4, "rev:") == 0; } -bool Rdb_cf_manager::init(rocksdb::DB *const rdb, +bool Rdb_cf_manager::init(rocksdb::DB &rdb, std::unique_ptr &&cf_options, std::vector *handles) { mysql_mutex_init(rdb_cfm_mutex_key, &m_mutex, MY_MUTEX_INIT_FAST); @@ -74,7 +74,7 @@ bool Rdb_cf_manager::init(rocksdb::DB *const rdb, "RocksDB: Dropping column family %s with id %u on RocksDB for temp " "table", cf_name.c_str(), cf_id); - auto status = rdb->DropColumnFamily(cfh_ptr); + const auto status = rdb.DropColumnFamily(cfh_ptr); if (status.ok()) { delete (cfh_ptr); continue; @@ -149,8 +149,7 @@ void Rdb_cf_manager::cleanup() { See Rdb_cf_manager::get_cf */ std::shared_ptr Rdb_cf_manager::get_or_create_cf( - rocksdb::DB *const rdb, const std::string &cf_name) { - assert(rdb != nullptr); + rocksdb::DB &rdb, const std::string &cf_name) { assert(!cf_name.empty()); std::shared_ptr cf_handle; @@ -187,8 +186,7 @@ std::shared_ptr Rdb_cf_manager::get_or_create_cf( opts.target_file_size_base); rocksdb::ColumnFamilyHandle *cf_handle_ptr = nullptr; - const rocksdb::Status s = - rdb->CreateColumnFamily(opts, cf_name, &cf_handle_ptr); + const auto s = rdb.CreateColumnFamily(opts, cf_name, &cf_handle_ptr); if (s.ok()) { assert(cf_handle_ptr != nullptr); @@ -280,8 +278,8 @@ Rdb_cf_manager::get_all_cf(void) const { } int Rdb_cf_manager::remove_dropped_cf(Rdb_dict_manager *const dict_manager, - rocksdb::TransactionDB *const rdb, - const uint32 &cf_id) { + rocksdb::TransactionDB &rdb, + uint32 cf_id) { dict_manager->assert_lock_held(); RDB_MUTEX_LOCK_CHECK(m_mutex); auto batch = Rdb_dict_manager::begin(); @@ -314,7 +312,7 @@ int Rdb_cf_manager::remove_dropped_cf(Rdb_dict_manager *const dict_manager, return HA_EXIT_FAILURE; } - auto status = rdb->DropColumnFamily(cf_handle); + const auto status = rdb.DropColumnFamily(cf_handle); if (!status.ok()) { dict_manager->delete_dropped_cf(batch, cf_id); diff --git a/storage/rocksdb/rdb_cf_manager.h b/storage/rocksdb/rdb_cf_manager.h index b5c4b23bfe9a..e62ac42deeb7 100644 --- a/storage/rocksdb/rdb_cf_manager.h +++ b/storage/rocksdb/rdb_cf_manager.h @@ -75,9 +75,9 @@ class Rdb_cf_manager : public Ensure_initialized { @param handles [IN][OUT]: list of all active cf_handles fetched from rdb transaction. */ - bool init(rocksdb::DB *const rdb, - std::unique_ptr &&cf_options, - std::vector *handles); + [[nodiscard]] bool init(rocksdb::DB &rdb, + std::unique_ptr &&cf_options, + std::vector *handles); void cleanup(); /* @@ -85,7 +85,7 @@ class Rdb_cf_manager : public Ensure_initialized { cf_name requires non-empty string */ std::shared_ptr get_or_create_cf( - rocksdb::DB *const rdb, const std::string &cf_name); + rocksdb::DB &rdb, const std::string &cf_name); /* Used by table open */ std::shared_ptr get_cf( @@ -102,7 +102,7 @@ class Rdb_cf_manager : public Ensure_initialized { void) const; int remove_dropped_cf(Rdb_dict_manager *const dict_manager, - rocksdb::TransactionDB *const rdb, const uint32 &cf_id); + rocksdb::TransactionDB &rdb, uint32 cf_id); /* Used to delete cf by name */ int drop_cf(Rdb_ddl_manager *const ddl_manager, diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc index c1958e58bab5..7da300b3e3f1 100644 --- a/storage/rocksdb/rdb_datadic.cc +++ b/storage/rocksdb/rdb_datadic.cc @@ -566,7 +566,7 @@ uint Rdb_key_def::setup(const TABLE &tbl, const Rdb_tbl_def &tbl_def, m_stats.m_distinct_keys_per_prefix.resize(get_key_parts()); /* Cache prefix extractor for bloom filter usage later */ - const auto opt = rdb_get_rocksdb_db()->GetOptions(&get_cf()); + const auto opt = rdb_get_rocksdb_db().GetOptions(&get_cf()); m_prefix_extractor = opt.prefix_extractor; uint rtn = setup_vector_index(tbl, tbl_def, cmd_srv_helper); @@ -5491,24 +5491,23 @@ bool Rdb_binlog_manager::unpack_value(const std::string &value_str, return false; } -bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict, - Rdb_cf_manager *const cf_manager, - const bool enable_remove_orphaned_dropped_cfs, +bool Rdb_dict_manager::init(rocksdb::TransactionDB &rdb_dict, + Rdb_cf_manager *cf_manager, + bool enable_remove_orphaned_dropped_cfs, const std::string &system_cf_name, const std::string &default_cf_name) { - assert(rdb_dict != nullptr); assert(cf_manager != nullptr); mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST); - m_db = rdb_dict; + m_db = &rdb_dict; // It is safe to get raw pointers here since: // 1. System CF and default CF cannot be dropped // 2. cf_manager outlives dict_manager - m_system_cfh = cf_manager->get_or_create_cf(m_db, system_cf_name).get(); + m_system_cfh = cf_manager->get_or_create_cf(*m_db, system_cf_name).get(); rocksdb::ColumnFamilyHandle *default_cfh = - cf_manager->get_or_create_cf(m_db, default_cf_name).get(); + cf_manager->get_or_create_cf(*m_db, default_cf_name).get(); // System CF and default CF should be initialized if (m_system_cfh == nullptr || default_cfh == nullptr) { return HA_EXIT_FAILURE; @@ -6431,9 +6430,9 @@ Rdb_dict_manager_selector::get_dict_manager_selector_const( return &m_user_table_dict_manager; } -bool Rdb_dict_manager_selector::init( - rocksdb::TransactionDB *const rdb_dict, Rdb_cf_manager *const cf_manager, - const bool enable_remove_orphaned_cf_flags) { +bool Rdb_dict_manager_selector::init(rocksdb::TransactionDB &rdb_dict, + Rdb_cf_manager *cf_manager, + bool enable_remove_orphaned_cf_flags) { m_cf_manager = cf_manager; bool ret = m_user_table_dict_manager.init( rdb_dict, cf_manager, enable_remove_orphaned_cf_flags, diff --git a/storage/rocksdb/rdb_datadic.h b/storage/rocksdb/rdb_datadic.h index a67ff500d0a4..a44ad71a45e8 100644 --- a/storage/rocksdb/rdb_datadic.h +++ b/storage/rocksdb/rdb_datadic.h @@ -1687,11 +1687,11 @@ class Rdb_dict_manager : public Ensure_initialized { Rdb_dict_manager &operator=(const Rdb_dict_manager &) = delete; Rdb_dict_manager() = default; - bool init(rocksdb::TransactionDB *const rdb_dict, - Rdb_cf_manager *const cf_manager, - const bool enable_remove_orphaned_cf_flags, - const std::string &system_cf_name, - const std::string &default_cf_name); + [[nodiscard]] bool init(rocksdb::TransactionDB &rdb_dict, + Rdb_cf_manager *cf_manager, + bool enable_remove_orphaned_cf_flags, + const std::string &system_cf_name, + const std::string &default_cf_name); inline void cleanup() { if (!initialized) return; @@ -1983,9 +1983,9 @@ class Rdb_dict_manager_selector { const Rdb_dict_manager *get_dict_manager_selector_const( bool fetch_tmp_dict_manager) const; - bool init(rocksdb::TransactionDB *const rdb_dict, - Rdb_cf_manager *const cf_manager, - const bool enable_remove_orphaned_cf_flags); + [[nodiscard]] bool init(rocksdb::TransactionDB &rdb_dict, + Rdb_cf_manager *cf_manager, + bool enable_remove_orphaned_cf_flags); void cleanup(); }; diff --git a/storage/rocksdb/rdb_i_s.cc b/storage/rocksdb/rdb_i_s.cc index acb947310619..0b6d70de36bc 100644 --- a/storage/rocksdb/rdb_i_s.cc +++ b/storage/rocksdb/rdb_i_s.cc @@ -118,11 +118,7 @@ static int rdb_i_s_cfstats_fill_table( {rocksdb::DB::Properties::kEstimatePendingCompactionBytes, "ESTIMATE_PENDING_COMPACTION_BYTES"}}; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } + auto &rdb = rdb_get_rocksdb_db(); const Rdb_cf_manager &cf_manager = rdb_get_cf_manager(); @@ -137,7 +133,7 @@ static int rdb_i_s_cfstats_fill_table( // It is safe if the CF is removed from cf_manager at // this point. The CF handle object is valid and sufficient here. for (const auto &property : cf_properties) { - if (!rdb->GetIntProperty(cfh.get(), property.first, &val)) { + if (!rdb.GetIntProperty(cfh.get(), property.first, &val)) { continue; } @@ -204,17 +200,13 @@ static int rdb_i_s_dbstats_fill_table( {rocksdb::DB::Properties::kOldestSnapshotTime, "DB_OLDEST_SNAPSHOT_TIME"}}; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } + auto &rdb = rdb_get_rocksdb_db(); const rocksdb::BlockBasedTableOptions &table_options = rdb_get_table_options(); for (const auto &property : db_properties) { - if (!rdb->GetIntProperty(property.first, &val)) { + if (!rdb.GetIntProperty(property.first, &val)) { continue; } @@ -296,12 +288,6 @@ static int rdb_i_s_perf_context_fill_table( Field **field = tables->table->field; assert(field != nullptr); - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - const std::vector tablenames = rdb_get_open_table_names(); for (const auto &it : tablenames) { @@ -392,12 +378,6 @@ static int rdb_i_s_perf_context_global_fill_table( int ret = 0; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - // Get a copy of the global perf counters. Rdb_perf_counters global_counters; rdb_get_global_perf_counters(&global_counters); @@ -458,12 +438,6 @@ static int rdb_i_s_cfoptions_fill_table( int ret = 0; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - Rdb_cf_manager &cf_manager = rdb_get_cf_manager(); for (const auto &cf_name : cf_manager.get_cf_names()) { @@ -748,12 +722,6 @@ static int rdb_i_s_global_info_fill_table( int ret = 0; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - /* binlog info */ Rdb_binlog_manager *const blm = rdb_get_binlog_manager(); assert(blm != nullptr); @@ -871,11 +839,7 @@ static int rdb_i_s_compact_stats_fill_table( DBUG_ENTER_FUNC(); int ret = 0; - rocksdb::DB *rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } + auto &rdb = rdb_get_rocksdb_db(); Rdb_cf_manager &cf_manager = rdb_get_cf_manager(); @@ -891,7 +855,7 @@ static int rdb_i_s_compact_stats_fill_table( // this point. The CF handle object is valid and sufficient here. std::map props; bool bool_ret MY_ATTRIBUTE((__unused__)); - bool_ret = rdb->GetMapProperty(cfh.get(), "rocksdb.cfstats", &props); + bool_ret = rdb.GetMapProperty(cfh.get(), "rocksdb.cfstats", &props); assert(bool_ret); @@ -1223,14 +1187,10 @@ static int rdb_i_s_live_files_metadata_fill_table( DBUG_ENTER_FUNC(); int ret = 0; - rocksdb::DB *rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } + auto &rdb = rdb_get_rocksdb_db(); std::vector metadata; - rdb->GetLiveFilesMetaData(&metadata); + rdb.GetLiveFilesMetaData(&metadata); for (const auto &file : metadata) { Field **field = tables->table->field; @@ -1300,10 +1260,6 @@ static int rdb_i_s_live_files_metadata_fill_table( } } - if (!rdb) { - DBUG_RETURN(ret); - } - DBUG_RETURN(ret); } @@ -1524,14 +1480,8 @@ static int rdb_i_s_ddl_fill_table( assert(tables->table != nullptr); int ret = 0; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } Rdb_ddl_scanner ddl_arg; - ddl_arg.m_thd = thd; ddl_arg.m_table = tables->table; @@ -1778,11 +1728,6 @@ static int rdb_i_s_vector_index_config_fill_table( assert(tables->table != nullptr); int ret = HA_EXIT_SUCCESS; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } Rdb_vector_index_scanner ddl_arg(thd, tables->table); Rdb_ddl_manager *ddl_manager = rdb_get_ddl_manager(); @@ -1892,19 +1837,14 @@ static int rdb_i_s_sst_props_fill_table( assert(field != nullptr); /* Iterate over all the column families */ - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - + auto &rdb = rdb_get_rocksdb_db(); const Rdb_cf_manager &cf_manager = rdb_get_cf_manager(); for (const auto &cf_handle : cf_manager.get_all_cf()) { /* Grab the the properties of all the tables in the column family */ rocksdb::TablePropertiesCollection table_props_collection; const rocksdb::Status s = - rdb->GetPropertiesOfAllTables(cf_handle.get(), &table_props_collection); + rdb.GetPropertiesOfAllTables(cf_handle.get(), &table_props_collection); if (!s.ok()) { continue; @@ -2057,13 +1997,8 @@ static int rdb_i_s_index_file_map_fill_table( assert(field != nullptr); /* Iterate over all the column families */ - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - - const Rdb_cf_manager &cf_manager = rdb_get_cf_manager(); + auto &rdb = rdb_get_rocksdb_db(); + const auto &cf_manager = rdb_get_cf_manager(); for (const auto &cf_handle : cf_manager.get_all_cf()) { /* Grab the the properties of all the tables in the column family */ @@ -2072,7 +2007,7 @@ static int rdb_i_s_index_file_map_fill_table( // It is safe if the CF is removed from cf_manager at // this point. The CF handle object is valid and sufficient here. const rocksdb::Status s = - rdb->GetPropertiesOfAllTables(cf_handle.get(), &table_props_collection); + rdb.GetPropertiesOfAllTables(cf_handle.get(), &table_props_collection); if (!s.ok()) { continue; @@ -2195,15 +2130,11 @@ static int rdb_i_s_lock_info_fill_table( int ret = 0; - rocksdb::TransactionDB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } + auto &rdb = rdb_get_rocksdb_db(); /* cf id -> rocksdb::KeyLockInfo */ std::unordered_multimap lock_info = - rdb->GetLockStatusData(); + rdb.GetLockStatusData(); for (const auto &lock : lock_info) { const uint32_t cf_id = lock.first; @@ -2308,12 +2239,6 @@ static int rdb_i_s_trx_info_fill_table( assert(tables->table->field != nullptr); int ret = 0; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - const std::vector &all_trx_info = rdb_get_all_trx_info(); for (const auto &info : all_trx_info) { @@ -2427,13 +2352,7 @@ static int rdb_i_s_deadlock_info_fill_table( static const std::string str_shared("SHARED"); int ret = 0; - rocksdb::DB *const rdb = rdb_get_rocksdb_db(); - - if (!rdb) { - DBUG_RETURN(ret); - } - - const std::vector &all_dl_info = rdb_get_deadlock_info(); + const auto &all_dl_info = rdb_get_deadlock_info(); ulonglong id = 0; for (const auto &info : all_dl_info) { diff --git a/storage/rocksdb/rdb_iterator.cc b/storage/rocksdb/rdb_iterator.cc index 0aca21b377df..d710341a7de8 100644 --- a/storage/rocksdb/rdb_iterator.cc +++ b/storage/rocksdb/rdb_iterator.cc @@ -143,12 +143,7 @@ int Rdb_iterator_base::read_after_key(const rocksdb::Slice &key_slice) { void Rdb_iterator_base::release_scan_iterator() { m_scan_it.reset(); - - if (m_scan_it_snapshot) { - auto rdb = rdb_get_rocksdb_db(); - rdb->ReleaseSnapshot(m_scan_it_snapshot); - m_scan_it_snapshot = nullptr; - } + m_scan_it_snapshot.reset(); } void Rdb_iterator_base::setup_scan_iterator( @@ -864,7 +859,7 @@ int Rdb_iterator_partial::materialize_prefix() { } } - s = rdb_get_rocksdb_db()->GetBaseDB()->Write(options, wb.get()); + s = rdb_get_rocksdb_db().GetBaseDB()->Write(options, wb.get()); if (!s.ok()) { rc = rdb_tx_set_status_error(*tx, s, m_kd, m_tbl_def); goto exit; diff --git a/storage/rocksdb/rdb_iterator.h b/storage/rocksdb/rdb_iterator.h index 3729ddbeedb6..4917bf5d2779 100644 --- a/storage/rocksdb/rdb_iterator.h +++ b/storage/rocksdb/rdb_iterator.h @@ -161,7 +161,7 @@ class Rdb_iterator_base : public Rdb_iterator { /* Whether m_scan_it was created with skip_bloom=true */ bool m_scan_it_skips_bloom; - const rocksdb::Snapshot *m_scan_it_snapshot; + rdb_snapshot_unique_ptr m_scan_it_snapshot; /* Buffers used for upper/lower bounds for m_scan_it. */ uchar *m_scan_it_lower_bound; diff --git a/storage/rocksdb/rdb_sst_info.cc b/storage/rocksdb/rdb_sst_info.cc index 3244a62565e4..b8354d4c6049 100644 --- a/storage/rocksdb/rdb_sst_info.cc +++ b/storage/rocksdb/rdb_sst_info.cc @@ -43,7 +43,7 @@ namespace myrocks { // don't assign timestamp to bulk-loaded key. GetRootComparator() can return us // a non-timestamp aware one when UDT-IN-MEM is enabled or disabled. Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file( - rocksdb::DB *db, rocksdb::ColumnFamilyHandle &cf, + rocksdb::DB &db, rocksdb::ColumnFamilyHandle &cf, const rocksdb::DBOptions &db_options, const std::string &name, bool tracing, uint32_t compression_parallel_threads) : m_db(db), @@ -53,9 +53,7 @@ Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file( m_name(name), m_tracing(tracing), m_comparator(cf.GetComparator()->GetRootComparator()), - m_compression_parallel_threads(compression_parallel_threads) { - assert(db != nullptr); -} + m_compression_parallel_threads(compression_parallel_threads) {} rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() { assert(m_sst_file_writer == nullptr); @@ -189,7 +187,7 @@ Rdb_sst_file_ordered::Rdb_sst_stack::top() { } Rdb_sst_file_ordered::Rdb_sst_file_ordered( - rocksdb::DB *db, rocksdb::ColumnFamilyHandle &cf, + rocksdb::DB &db, rocksdb::ColumnFamilyHandle &cf, const rocksdb::DBOptions &db_options, const std::string &name, bool tracing, size_t max_size, uint32_t compression_parallel_threads) : m_use_stack(false), @@ -297,7 +295,7 @@ rocksdb::Status Rdb_sst_file_ordered::commit() { return m_file.commit(); } -Rdb_sst_info::Rdb_sst_info(rocksdb::DB *db, const std::string &tablename, +Rdb_sst_info::Rdb_sst_info(rocksdb::DB &db, const std::string &tablename, const std::string &indexname, rocksdb::ColumnFamilyHandle &cf, const rocksdb::DBOptions &db_options, bool tracing, @@ -313,7 +311,7 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *db, const std::string &tablename, m_tracing(tracing), m_print_client_error(true), m_compression_parallel_threads(compression_parallel_threads) { - m_prefix = db->GetName() + '/'; + m_prefix = db.GetName() + '/'; std::string normalized_table; if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) { @@ -521,9 +519,9 @@ void Rdb_sst_info::report_error_msg(const rocksdb::Status &s, } } -void Rdb_sst_info::init(const rocksdb::DB *const db) { - const std::string dir = db->GetName(); - const auto &fs = db->GetEnv()->GetFileSystem(); +void Rdb_sst_info::init(const rocksdb::DB &db) { + const auto &dir = db.GetName(); + const auto &fs = db.GetEnv()->GetFileSystem(); std::vector files_in_dir; // Get the files in the specified directory diff --git a/storage/rocksdb/rdb_sst_info.h b/storage/rocksdb/rdb_sst_info.h index 08b39b84ae95..ab22f1d6e4f0 100644 --- a/storage/rocksdb/rdb_sst_info.h +++ b/storage/rocksdb/rdb_sst_info.h @@ -40,7 +40,7 @@ class Rdb_sst_file_ordered { Rdb_sst_file(Rdb_sst_file &&) = delete; Rdb_sst_file &operator=(Rdb_sst_file &&) = delete; - rocksdb::DB *const m_db; + rocksdb::DB &m_db; rocksdb::ColumnFamilyHandle &m_cf; const rocksdb::DBOptions &m_db_options; std::unique_ptr m_sst_file_writer; @@ -51,7 +51,7 @@ class Rdb_sst_file_ordered { std::string generateKey(const std::string &key); public: - Rdb_sst_file(rocksdb::DB *db, rocksdb::ColumnFamilyHandle &cf, + Rdb_sst_file(rocksdb::DB &db, rocksdb::ColumnFamilyHandle &cf, const rocksdb::DBOptions &db_options, const std::string &name, bool tracing, uint32_t compression_parallel_threads); @@ -95,7 +95,7 @@ class Rdb_sst_file_ordered { rocksdb::Status apply_first(); public: - Rdb_sst_file_ordered(rocksdb::DB *db, rocksdb::ColumnFamilyHandle &cf, + Rdb_sst_file_ordered(rocksdb::DB &db, rocksdb::ColumnFamilyHandle &cf, const rocksdb::DBOptions &db_options, const std::string &name, bool tracing, size_t max_size, uint32_t compression_parallel_threads); @@ -112,7 +112,7 @@ class Rdb_sst_info { Rdb_sst_info(Rdb_sst_info &&) = delete; Rdb_sst_info &operator=(Rdb_sst_info &&) = delete; - rocksdb::DB *const m_db; + rocksdb::DB &m_db; rocksdb::ColumnFamilyHandle &m_cf; const rocksdb::DBOptions &m_db_options; uint64_t m_curr_size; @@ -141,7 +141,7 @@ class Rdb_sst_info { const rocksdb::Status &s); public: - Rdb_sst_info(rocksdb::DB *db, const std::string &tablename, + Rdb_sst_info(rocksdb::DB &db, const std::string &tablename, const std::string &indexname, rocksdb::ColumnFamilyHandle &cf, const rocksdb::DBOptions &db_options, bool tracing, uint32_t compression_parallel_threads); @@ -252,7 +252,7 @@ class Rdb_sst_info { const rocksdb::ColumnFamilyHandle &get_cf() const { return m_cf; } - static void init(const rocksdb::DB *const db); + static void init(const rocksdb::DB &db); static void report_error_msg(const rocksdb::Status &s, const char *sst_file_name); diff --git a/storage/rocksdb/rdb_sst_partitioner_factory.h b/storage/rocksdb/rdb_sst_partitioner_factory.h index 309362c1f859..eca5f0ec6697 100644 --- a/storage/rocksdb/rdb_sst_partitioner_factory.h +++ b/storage/rocksdb/rdb_sst_partitioner_factory.h @@ -264,7 +264,7 @@ class Rdb_bulk_load_index_registry { * not already registered. * returns true when success. */ - [[nodiscard]] bool add_index(rocksdb::TransactionDB *rdb, + [[nodiscard]] bool add_index(rocksdb::TransactionDB &rdb, rocksdb::ColumnFamilyHandle &cf, Index_id index_id) { if (m_partitioner_factories.count(index_id) != 0) { @@ -273,7 +273,7 @@ class Rdb_bulk_load_index_registry { } auto *const sst_partitioner_factory = - rdb->GetOptions(&cf).sst_partitioner_factory.get(); + rdb.GetOptions(&cf).sst_partitioner_factory.get(); auto *const rdb_sst_partitioner_factory = dynamic_cast(sst_partitioner_factory); if (rdb_sst_partitioner_factory == nullptr) { @@ -328,9 +328,9 @@ class Rdb_bulk_load_index_registry { * trigger compaction that covers all indexes registered in * this object */ - rocksdb::Status compact_index_ranges( - rocksdb::TransactionDB *rdb, - const rocksdb::CompactRangeOptions compact_range_options) { + [[nodiscard]] rocksdb::Status compact_index_ranges( + rocksdb::TransactionDB &rdb, + const rocksdb::CompactRangeOptions &compact_range_options) { rocksdb::Status status; for (auto &entry : m_cf_indexes) { auto cf = entry.first; @@ -358,8 +358,8 @@ class Rdb_bulk_load_index_registry { compact_begin_key.ToString(/*hex*/ true).c_str(), compact_end_key.ToString(/*hex*/ true).c_str()); - status = rdb->CompactRange(compact_range_options, cf, &compact_begin_key, - &compact_end_key); + status = rdb.CompactRange(compact_range_options, cf, &compact_begin_key, + &compact_end_key); if (!status.ok()) { break; }