Skip to content

Commit

Permalink
Merge pull request OpenAtomFoundation#5 from pikiwidb/feature/wsy_cha…
Browse files Browse the repository at this point in the history
…nge_config

feat: 修改rocksdb-cloud相关参数,增加storage层单测
  • Loading branch information
baixin01 authored Mar 20, 2024
2 parents 978a151 + 55e6bc6 commit d147e2f
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 3 deletions.
32 changes: 29 additions & 3 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ Redis::Redis(Storage* const s, int32_t index)
spop_counts_store_ = std::make_unique<LRUCache<std::string, size_t>>();
default_compact_range_options_.exclusive_manual_compaction = false;
default_compact_range_options_.change_level = true;
default_write_options_.disableWAL = true;
spop_counts_store_->SetCapacity(1000);
scan_cursors_store_->SetCapacity(5000);
//env_ = rocksdb::Env::Instance();

log_listener_ = std::make_shared<LogListener>(index_, this);
handles_.clear();
}

Expand All @@ -54,9 +57,26 @@ Redis::~Redis() {
if (default_compact_range_options_.canceled) {
delete default_compact_range_options_.canceled;
}
opened_ = false;
}

Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) {
Status Redis::Open(const StorageOptions& tmp_storage_options, const std::string& db_path) {

StorageOptions storage_options(tmp_storage_options);
#ifdef USE_S3
storage_options.cloud_fs_options.roll_cloud_manifest_on_open = true;
storage_options.cloud_fs_options.resync_on_open = true;
storage_options.cloud_fs_options.resync_manifest_on_open = true;
storage_options.cloud_fs_options.skip_dbid_verification = true;
if (tmp_storage_options.cloud_fs_options.is_master) {
storage_options.options.replication_log_listener = log_listener_;
} else {
storage_options.options.disable_auto_flush = true;
storage_options.options.disable_auto_compactions = true;
}
storage_options.options.atomic_flush = true;
#endif

statistics_store_->SetCapacity(storage_options.statistics_max_size);
small_compaction_threshold_ = storage_options.small_compaction_threshold;

Expand Down Expand Up @@ -186,7 +206,9 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
db_ops.env = cloud_env_.get();
return rocksdb::DBCloud::Open(db_ops, db_path, column_families, "", 0, &handles_, &db_);
#else
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
auto s = rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
opened_ = true;
return s;
#endif
}

Expand Down Expand Up @@ -529,7 +551,11 @@ Status Redis::SwitchMaster(bool is_old_master, bool is_new_master) {
db_options["disable_auto_flush"] = "false";
cfs_options["is_master"] = "true";
// compare manifest_sequence
uint64_t local_manifest_sequence = db_->GetManifestUpdateSequence();
uint64_t local_manifest_sequence = 0;
auto s = db_->GetManifestUpdateSequence(&local_manifest_sequence);
if (!s.ok()) {
LOG(ERROR) << "get manifestupdatesequence error: " << s.ToString();
}
uint64_t remote_manifest_sequence = 0;
cfs_->GetMaxManifestSequenceFromCurrentManifest(db_->GetName(), &remote_manifest_sequence);
// local version behind remote, directly reopen
Expand Down
20 changes: 20 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace storage {
using Status = rocksdb::Status;
using Slice = rocksdb::Slice;

class LogListener;
class Redis {
public:
Redis(Storage* storage, int32_t index);
Expand Down Expand Up @@ -393,6 +394,9 @@ class Redis {

#ifdef USE_S3
Status SwitchMaster(bool is_old_master, bool is_new_master);
void ResetLogListener(std::shared_ptr<rocksdb::ReplicationLogListener> handle) {
log_listener_ = handle;
}
#endif

private:
Expand Down Expand Up @@ -429,11 +433,13 @@ class Redis {


private:
bool opened_ = false;
int32_t index_ = 0;
Storage* const storage_;
std::shared_ptr<LockMgr> lock_mgr_;
#ifdef USE_S3
rocksdb::DBCloud* db_ = nullptr;
std::shared_ptr<rocksdb::ReplicationLogListener> log_listener_;
#else
rocksdb::DB* db_ = nullptr;
#endif
Expand Down Expand Up @@ -471,5 +477,19 @@ class Redis {
#endif
};

// TODO(wangshaoyi): implement details
class LogListener : public rocksdb::ReplicationLogListener {
public:
LogListener(int rocksdb_id, void* inst) : rocksdb_id_(rocksdb_id), counter_(0), inst_(inst) {}
std::string OnReplicationLogRecord(rocksdb::ReplicationLogRecord record) override {
auto id = counter_.fetch_add(1);
return std::to_string(id);
}
private:
int rocksdb_id_ = 0;
std::atomic<int> counter_ = {0};
void* inst_ = nullptr;
};

} // namespace storage
#endif // SRC_REDIS_H_
246 changes: 246 additions & 0 deletions src/storage/tests/cloud_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
#include <thread>
#include <iostream>
#include <queue>
#include <atomic>
#include <gtest/gtest.h>
#include "glog/logging.h"

#include "pstd/include/env.h"
#include "storage/storage.h"
#include "src/redis.h"
#include "storage/util.h"

using namespace storage;

std::queue<std::pair<std::string, rocksdb::ReplicationLogRecord>> items;

struct MockReplicationListener : public rocksdb::ReplicationLogListener{
MockReplicationListener() = default;
~MockReplicationListener() = default;
std::string OnReplicationLogRecord(rocksdb::ReplicationLogRecord record) override {
std::string cnt = std::to_string(counter_.fetch_add(1));
items.push(std::make_pair(cnt, record));
LOG(WARNING) << "write binlog, replication_sequence: " << cnt << " type: " << record.type << " items count:" << items.size();
return cnt;
}
std::atomic<int> counter_ = {0};
};

class CloudTest : public ::testing::Test {
public:
CloudTest() = default;
~CloudTest() override = default;

void SetUp() override {
storage_options.options.create_if_missing = true;
storage_options.options.avoid_flush_during_shutdown = true;
auto& cloud_fs_opts = storage_options.cloud_fs_options;
cloud_fs_opts.endpoint_override = "http://127.0.0.1:9000";
cloud_fs_opts.credentials.InitializeSimple("minioadmin", "minioadmin");
ASSERT_TRUE(cloud_fs_opts.credentials.HasValid().ok());
cloud_fs_opts.src_bucket.SetBucketName("database.unit.test", "pika.");
cloud_fs_opts.dest_bucket.SetBucketName("database.unit.test", "pika.");
storage_options.options.max_log_file_size = 0;
}

void TearDown() override {
}

static void SetUpTestSuite() {}
static void TearDownTestSuite() {}

StorageOptions storage_options;
storage::Status s;
std::string path;
};

Status OpenMaster(storage::Redis*& inst, StorageOptions storage_options) {
storage::Storage str;
while (!items.empty())
{
items.pop();
}

inst = new storage::Redis(&str, 0);
auto listener = std::make_shared<MockReplicationListener>();
inst->ResetLogListener(listener);
storage_options.cloud_fs_options.is_master = true;
auto s = inst->Open(storage_options, "cloud_test");
return s;
}

Status OpenSlave(storage::Redis*& inst, StorageOptions storage_options) {
storage::Storage str;
inst = new storage::Redis(&str, 0);
storage_options.cloud_fs_options.is_master = false;
auto s = inst->Open(storage_options, "cloud_test");
return s;
}

TEST_F(CloudTest, simple_master) {
storage::Redis* inst;
auto s = OpenMaster(inst, storage_options);
ASSERT_TRUE(s.ok());
for (int i = 0; i < 10000; i++) {
if (i + 1 % 100 == 0) {
sleep(1);
}
s = inst->Set(std::to_string(i), std::to_string(i));
ASSERT_TRUE(s.ok());
}
rocksdb::FlushOptions fo;
fo.wait = true;
inst->GetDB()->Flush(fo);
delete inst;
inst = nullptr;
}

Status SlaveCatchUp(storage::Redis* slave) {
Status s;
LOG(WARNING) << "SlaveCatchUp, items.size: " << items.size();
while (!items.empty()) {
std::string replication_sequence = items.front().first;
auto record = items.front().second;
items.pop();
LOG(WARNING) << "replication_sequence: " << replication_sequence << " type: " << record.type;
// slave catchup
rocksdb::DB::ApplyReplicationLogRecordInfo info;
s = slave->GetDB()->ApplyReplicationLogRecord(record, replication_sequence, nullptr, true, &info, rocksdb::DB::AR_EVICT_OBSOLETE_FILES);
if (!s.ok()) {
LOG(WARNING) << "reapply log error: " << s.ToString();
return s;
}
}
return s;
}

TEST_F(CloudTest, master_slave) {
storage::Redis* inst_master, *inst_slave;
auto s = OpenMaster(inst_master, storage_options);
ASSERT_TRUE(s.ok());
// master write
for (int i = 0; i < 20000; i++) {
if (i + 1 % 100 == 0) {
sleep(1);
}
s = inst_master->Set(std::to_string(i), std::to_string(i));
ASSERT_TRUE(s.ok());
}

rocksdb::FlushOptions fo;
fo.wait = true;
inst_master->GetDB()->Flush(fo);
delete inst_master;
inst_master = nullptr;

std::vector<std::string> children;
pstd::GetChildren("cloud_test", children);
std::for_each(children.begin(), children.end(), [](auto& file) {
if (file.find("sst") != std::string::npos) {
std::string path = "cloud_test/";
path = path + file;
pstd::DeleteFile(path);
}
});

s = OpenSlave(inst_slave, storage_options);
ASSERT_TRUE(s.ok());
for (int i = 0; i < 20000; i++) {
std::string val;
s = inst_slave->Get(std::to_string(i), &val);
ASSERT_TRUE(s.ok());
ASSERT_EQ(val, std::to_string(i));
}
SlaveCatchUp(inst_slave);

delete inst_slave;
inst_slave = nullptr;

s = OpenMaster(inst_master, storage_options);
ASSERT_TRUE(s.ok());
for (int i = 0; i < 20000; i++) {
std::string val;
s = inst_master->Get(std::to_string(i), &val);
ASSERT_TRUE(s.ok());
ASSERT_EQ(val, std::to_string(i));
}
delete inst_master;
inst_master = nullptr;
}

TEST_F(CloudTest, switch_master) {
storage::Redis* inst_master, *inst_slave;
auto s = OpenMaster(inst_master, storage_options);
ASSERT_TRUE(s.ok());
// master write
for (int i = 0; i < 20000; i++) {
if (i + 1 % 100 == 0) {
sleep(1);
}
s = inst_master->Set(std::to_string(i), std::to_string(i));
ASSERT_TRUE(s.ok());
}

delete inst_master;
inst_master = nullptr;
LOG(WARNING) << "close master already";
sleep(20);

std::vector<std::string> children;
pstd::GetChildren("cloud_test", children);
std::for_each(children.begin(), children.end(), [](auto& file) {
if (file.find("sst") != std::string::npos) {
std::string path = "cloud_test/";
path = path + file;
pstd::DeleteFile(path);
}
});

s = OpenSlave(inst_slave, storage_options);
ASSERT_TRUE(s.ok());
s = SlaveCatchUp(inst_slave);
ASSERT_TRUE(s.ok());
for (int i = 0; i < 20000; i++) {
std::string val;
s = inst_slave->Get(std::to_string(i), &val);
ASSERT_TRUE(s.ok());
ASSERT_EQ(val, std::to_string(i));
}
s = inst_slave->SwitchMaster(false, true);
ASSERT_TRUE(s.ok());
delete inst_slave;
inst_slave = nullptr;

pstd::GetChildren("cloud_test", children);
std::for_each(children.begin(), children.end(), [](auto& file) {
if (file.find("sst") != std::string::npos) {
std::string path = "cloud_test/";
path = path + file;
pstd::DeleteFile(path);
}
});

s = OpenMaster(inst_master, storage_options);
ASSERT_TRUE(s.ok());
for (int i = 0; i < 20000; i++) {
std::string val;
s = inst_master->Get(std::to_string(i), &val);
ASSERT_TRUE(s.ok());
ASSERT_EQ(val, std::to_string(i));
}
delete inst_master;
inst_master = nullptr;
}

int main(int argc, char** argv) {
if (!pstd::FileExists("./log")) {
pstd::CreatePath("./log");
}
FLAGS_log_dir = "./log";
FLAGS_minloglevel = 0;
FLAGS_max_log_size = 1800;
FLAGS_logbufsecs = 0;
::google::InitGoogleLogging("cloud_test");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit d147e2f

Please sign in to comment.