From d32ec45396c6fd309a89ce4a14199f2274078cd2 Mon Sep 17 00:00:00 2001 From: wangshaoyi Date: Mon, 11 Mar 2024 16:42:13 +0800 Subject: [PATCH 1/2] change config about rocksdb-cloud, add storage ut --- src/storage/src/redis.cc | 32 ++++- src/storage/src/redis.h | 20 +++ src/storage/tests/cloud_test.cc | 246 ++++++++++++++++++++++++++++++++ 3 files changed, 295 insertions(+), 3 deletions(-) create mode 100644 src/storage/tests/cloud_test.cc diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 23c22b3c48..772fb58abd 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -35,9 +35,12 @@ Redis::Redis(Storage* const s, int32_t index) spop_counts_store_ = std::make_unique>(); default_compact_range_options_.exclusive_manual_compaction = false; default_compact_range_options_.change_level = true; + default_write_options_.disableWAL = true; spop_counts_store_->SetCapacity(1000); scan_cursors_store_->SetCapacity(5000); //env_ = rocksdb::Env::Instance(); + + listener_ = std::make_shared(index_, this); handles_.clear(); } @@ -54,9 +57,26 @@ Redis::~Redis() { if (default_compact_range_options_.canceled) { delete default_compact_range_options_.canceled; } + opened_ = false; } -Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) { +Status Redis::Open(const StorageOptions& tmp_storage_options, const std::string& db_path) { + + StorageOptions storage_options(tmp_storage_options); +#ifdef USE_S3 + storage_options.cloud_fs_options.roll_cloud_manifest_on_open = true; + storage_options.cloud_fs_options.resync_on_open = true; + storage_options.cloud_fs_options.resync_manifest_on_open = true; + storage_options.cloud_fs_options.skip_dbid_verification = true; + if (tmp_storage_options.cloud_fs_options.is_master) { + storage_options.options.replication_log_listener = listener_; + } else { + storage_options.options.disable_auto_flush = true; + storage_options.options.disable_auto_compactions = true; + } + storage_options.options.atomic_flush = true; +#endif + statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; @@ -186,7 +206,9 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_ db_ops.env = cloud_env_.get(); return rocksdb::DBCloud::Open(db_ops, db_path, column_families, "", 0, &handles_, &db_); #else - return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_); + auto s = rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_); + opened_ = true; + return s; #endif } @@ -529,7 +551,11 @@ Status Redis::SwitchMaster(bool is_old_master, bool is_new_master) { db_options["disable_auto_flush"] = "false"; cfs_options["is_master"] = "true"; // compare manifest_sequence - uint64_t local_manifest_sequence = db_->GetManifestUpdateSequence(); + uint64_t local_manifest_sequence = 0; + auto s = db_->GetManifestUpdateSequence(&local_manifest_sequence); + if (!s.ok()) { + LOG(ERROR) << "get manifestupdatesequence error: " << s.ToString(); + } uint64_t remote_manifest_sequence = 0; cfs_->GetMaxManifestSequenceFromCurrentManifest(db_->GetName(), &remote_manifest_sequence); // local version behind remote, directly reopen diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 87320f31cc..e6ae3f30d8 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -37,6 +37,7 @@ namespace storage { using Status = rocksdb::Status; using Slice = rocksdb::Slice; +class Listener; class Redis { public: Redis(Storage* storage, int32_t index); @@ -393,6 +394,9 @@ class Redis { #ifdef USE_S3 Status SwitchMaster(bool is_old_master, bool is_new_master); + void ResetListener(std::shared_ptr handle) { + listener_ = handle; + } #endif private: @@ -429,11 +433,13 @@ class Redis { private: + bool opened_ = false; int32_t index_ = 0; Storage* const storage_; std::shared_ptr lock_mgr_; #ifdef USE_S3 rocksdb::DBCloud* db_ = nullptr; + std::shared_ptr listener_; #else rocksdb::DB* db_ = nullptr; #endif @@ -471,5 +477,19 @@ class Redis { #endif }; +// TODO(wangshaoyi): implement details +class Listener : public rocksdb::ReplicationLogListener { +public: + Listener(int rocksdb_id, void* inst) : rocksdb_id_(rocksdb_id), counter_(0), inst_(inst) {} + std::string OnReplicationLogRecord(rocksdb::ReplicationLogRecord record) override { + auto id = counter_.fetch_add(1); + return std::to_string(id); + } +private: + int rocksdb_id_ = 0; + std::atomic counter_ = {0}; + void* inst_; +}; + } // namespace storage #endif // SRC_REDIS_H_ diff --git a/src/storage/tests/cloud_test.cc b/src/storage/tests/cloud_test.cc new file mode 100644 index 0000000000..65b43164a6 --- /dev/null +++ b/src/storage/tests/cloud_test.cc @@ -0,0 +1,246 @@ +#include +#include +#include +#include +#include +#include "glog/logging.h" + +#include "pstd/include/env.h" +#include "storage/storage.h" +#include "src/redis.h" +#include "storage/util.h" + +using namespace storage; + +std::queue> items; + +struct MockReplicationListener : public rocksdb::ReplicationLogListener{ + MockReplicationListener() = default; + ~MockReplicationListener() = default; + std::string OnReplicationLogRecord(rocksdb::ReplicationLogRecord record) override { + std::string cnt = std::to_string(counter_.fetch_add(1)); + items.push(std::make_pair(cnt, record)); + LOG(WARNING) << "write binlog, replication_sequence: " << cnt << " type: " << record.type << " items count:" << items.size(); + return cnt; + } + std::atomic counter_ = {0}; +}; + +class CloudTest : public ::testing::Test { +public: + CloudTest() = default; + ~CloudTest() override = default; + + void SetUp() override { + storage_options.options.create_if_missing = true; + storage_options.options.avoid_flush_during_shutdown = true; + auto& cloud_fs_opts = storage_options.cloud_fs_options; + cloud_fs_opts.endpoint_override = "http://127.0.0.1:9000"; + cloud_fs_opts.credentials.InitializeSimple("minioadmin", "minioadmin"); + ASSERT_TRUE(cloud_fs_opts.credentials.HasValid().ok()); + cloud_fs_opts.src_bucket.SetBucketName("database.unit.test", "pika."); + cloud_fs_opts.dest_bucket.SetBucketName("database.unit.test", "pika."); + storage_options.options.max_log_file_size = 0; + } + + void TearDown() override { + } + + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} + + StorageOptions storage_options; + storage::Status s; + std::string path; +}; + +Status OpenMaster(storage::Redis*& inst, StorageOptions storage_options) { + storage::Storage str; + while (!items.empty()) + { + items.pop(); + } + + inst = new storage::Redis(&str, 0); + auto listener = std::make_shared(); + inst->ResetListener(listener); + storage_options.cloud_fs_options.is_master = true; + auto s = inst->Open(storage_options, "cloud_test"); + return s; +} + +Status OpenSlave(storage::Redis*& inst, StorageOptions storage_options) { + storage::Storage str; + inst = new storage::Redis(&str, 0); + storage_options.cloud_fs_options.is_master = false; + auto s = inst->Open(storage_options, "cloud_test"); + return s; +} + +TEST_F(CloudTest, simple_master) { + storage::Redis* inst; + auto s = OpenMaster(inst, storage_options); + ASSERT_TRUE(s.ok()); + for (int i = 0; i < 10000; i++) { + if (i + 1 % 100 == 0) { + sleep(1); + } + s = inst->Set(std::to_string(i), std::to_string(i)); + ASSERT_TRUE(s.ok()); + } + rocksdb::FlushOptions fo; + fo.wait = true; + inst->GetDB()->Flush(fo); + delete inst; + inst = nullptr; +} + +Status SlaveCatchUp(storage::Redis* slave) { + Status s; + LOG(WARNING) << "SlaveCatchUp, items.size: " << items.size(); + while (!items.empty()) { + std::string replication_sequence = items.front().first; + auto record = items.front().second; + items.pop(); + LOG(WARNING) << "replication_sequence: " << replication_sequence << " type: " << record.type; + // slave catchup + rocksdb::DB::ApplyReplicationLogRecordInfo info; + s = slave->GetDB()->ApplyReplicationLogRecord(record, replication_sequence, nullptr, true, &info, rocksdb::DB::AR_EVICT_OBSOLETE_FILES); + if (!s.ok()) { + LOG(WARNING) << "reapply log error: " << s.ToString(); + return s; + } + } + return s; +} + +TEST_F(CloudTest, master_slave) { + storage::Redis* inst_master, *inst_slave; + auto s = OpenMaster(inst_master, storage_options); + ASSERT_TRUE(s.ok()); + // master write + for (int i = 0; i < 20000; i++) { + if (i + 1 % 100 == 0) { + sleep(1); + } + s = inst_master->Set(std::to_string(i), std::to_string(i)); + ASSERT_TRUE(s.ok()); + } + + rocksdb::FlushOptions fo; + fo.wait = true; + inst_master->GetDB()->Flush(fo); + delete inst_master; + inst_master = nullptr; + + std::vector children; + pstd::GetChildren("cloud_test", children); + std::for_each(children.begin(), children.end(), [](auto& file) { + if (file.find("sst") != std::string::npos) { + std::string path = "cloud_test/"; + path = path + file; + pstd::DeleteFile(path); + } + }); + + s = OpenSlave(inst_slave, storage_options); + ASSERT_TRUE(s.ok()); + for (int i = 0; i < 20000; i++) { + std::string val; + s = inst_slave->Get(std::to_string(i), &val); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(val, std::to_string(i)); + } + SlaveCatchUp(inst_slave); + + delete inst_slave; + inst_slave = nullptr; + + s = OpenMaster(inst_master, storage_options); + ASSERT_TRUE(s.ok()); + for (int i = 0; i < 20000; i++) { + std::string val; + s = inst_master->Get(std::to_string(i), &val); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(val, std::to_string(i)); + } + delete inst_master; + inst_master = nullptr; +} + +TEST_F(CloudTest, switch_master) { + storage::Redis* inst_master, *inst_slave; + auto s = OpenMaster(inst_master, storage_options); + ASSERT_TRUE(s.ok()); + // master write + for (int i = 0; i < 20000; i++) { + if (i + 1 % 100 == 0) { + sleep(1); + } + s = inst_master->Set(std::to_string(i), std::to_string(i)); + ASSERT_TRUE(s.ok()); + } + + delete inst_master; + inst_master = nullptr; + LOG(WARNING) << "close master already"; + sleep(20); + + std::vector children; + pstd::GetChildren("cloud_test", children); + std::for_each(children.begin(), children.end(), [](auto& file) { + if (file.find("sst") != std::string::npos) { + std::string path = "cloud_test/"; + path = path + file; + pstd::DeleteFile(path); + } + }); + + s = OpenSlave(inst_slave, storage_options); + ASSERT_TRUE(s.ok()); + s = SlaveCatchUp(inst_slave); + ASSERT_TRUE(s.ok()); + for (int i = 0; i < 20000; i++) { + std::string val; + s = inst_slave->Get(std::to_string(i), &val); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(val, std::to_string(i)); + } + s = inst_slave->SwitchMaster(false, true); + ASSERT_TRUE(s.ok()); + delete inst_slave; + inst_slave = nullptr; + + pstd::GetChildren("cloud_test", children); + std::for_each(children.begin(), children.end(), [](auto& file) { + if (file.find("sst") != std::string::npos) { + std::string path = "cloud_test/"; + path = path + file; + pstd::DeleteFile(path); + } + }); + + s = OpenMaster(inst_master, storage_options); + ASSERT_TRUE(s.ok()); + for (int i = 0; i < 20000; i++) { + std::string val; + s = inst_master->Get(std::to_string(i), &val); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(val, std::to_string(i)); + } + delete inst_master; + inst_master = nullptr; +} + +int main(int argc, char** argv) { + if (!pstd::FileExists("./log")) { + pstd::CreatePath("./log"); + } + FLAGS_log_dir = "./log"; + FLAGS_minloglevel = 0; + FLAGS_max_log_size = 1800; + FLAGS_logbufsecs = 0; + ::google::InitGoogleLogging("cloud_test"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 55e6bc678df3edb6972de8c5df47cc4c3eab8109 Mon Sep 17 00:00:00 2001 From: wangshaoyi Date: Wed, 20 Mar 2024 18:01:09 +0800 Subject: [PATCH 2/2] fix by review comments --- src/storage/src/redis.cc | 4 ++-- src/storage/src/redis.h | 14 +++++++------- src/storage/tests/cloud_test.cc | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 772fb58abd..aa938389bc 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -40,7 +40,7 @@ Redis::Redis(Storage* const s, int32_t index) scan_cursors_store_->SetCapacity(5000); //env_ = rocksdb::Env::Instance(); - listener_ = std::make_shared(index_, this); + log_listener_ = std::make_shared(index_, this); handles_.clear(); } @@ -69,7 +69,7 @@ Status Redis::Open(const StorageOptions& tmp_storage_options, const std::string& storage_options.cloud_fs_options.resync_manifest_on_open = true; storage_options.cloud_fs_options.skip_dbid_verification = true; if (tmp_storage_options.cloud_fs_options.is_master) { - storage_options.options.replication_log_listener = listener_; + storage_options.options.replication_log_listener = log_listener_; } else { storage_options.options.disable_auto_flush = true; storage_options.options.disable_auto_compactions = true; diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index e6ae3f30d8..1870ec618e 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -37,7 +37,7 @@ namespace storage { using Status = rocksdb::Status; using Slice = rocksdb::Slice; -class Listener; +class LogListener; class Redis { public: Redis(Storage* storage, int32_t index); @@ -394,8 +394,8 @@ class Redis { #ifdef USE_S3 Status SwitchMaster(bool is_old_master, bool is_new_master); - void ResetListener(std::shared_ptr handle) { - listener_ = handle; + void ResetLogListener(std::shared_ptr handle) { + log_listener_ = handle; } #endif @@ -439,7 +439,7 @@ class Redis { std::shared_ptr lock_mgr_; #ifdef USE_S3 rocksdb::DBCloud* db_ = nullptr; - std::shared_ptr listener_; + std::shared_ptr log_listener_; #else rocksdb::DB* db_ = nullptr; #endif @@ -478,9 +478,9 @@ class Redis { }; // TODO(wangshaoyi): implement details -class Listener : public rocksdb::ReplicationLogListener { +class LogListener : public rocksdb::ReplicationLogListener { public: - Listener(int rocksdb_id, void* inst) : rocksdb_id_(rocksdb_id), counter_(0), inst_(inst) {} + LogListener(int rocksdb_id, void* inst) : rocksdb_id_(rocksdb_id), counter_(0), inst_(inst) {} std::string OnReplicationLogRecord(rocksdb::ReplicationLogRecord record) override { auto id = counter_.fetch_add(1); return std::to_string(id); @@ -488,7 +488,7 @@ class Listener : public rocksdb::ReplicationLogListener { private: int rocksdb_id_ = 0; std::atomic counter_ = {0}; - void* inst_; + void* inst_ = nullptr; }; } // namespace storage diff --git a/src/storage/tests/cloud_test.cc b/src/storage/tests/cloud_test.cc index 65b43164a6..32f29d4e72 100644 --- a/src/storage/tests/cloud_test.cc +++ b/src/storage/tests/cloud_test.cc @@ -63,7 +63,7 @@ Status OpenMaster(storage::Redis*& inst, StorageOptions storage_options) { inst = new storage::Redis(&str, 0); auto listener = std::make_shared(); - inst->ResetListener(listener); + inst->ResetLogListener(listener); storage_options.cloud_fs_options.is_master = true; auto s = inst->Open(storage_options, "cloud_test"); return s;