Skip to content

Commit

Permalink
Merge branch 'unstable' into limit_maxmemory_clients
Browse files Browse the repository at this point in the history
  • Loading branch information
AntiTopQuark committed Sep 8, 2024
2 parents 5bfd3ce + a29df09 commit 104f354
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
7 changes: 7 additions & 0 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish";
post_fullsync_cb_();

// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " << s.Msg();
}

// Switch to psync state machine again
psync_steps_.Start();
return CBState::QUIT;
Expand Down
3 changes: 2 additions & 1 deletion src/server/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ bool Namespace::IsAllowModify() const {
Status Namespace::loadFromDB(std::map<std::string, std::string>* db_tokens) const {
std::string value;
engine::Context ctx(storage_);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf_, kNamespaceDBKey, &value);
auto cf = storage_->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf, kNamespaceDBKey, &value);
if (!s.ok()) {
if (s.IsNotFound()) return Status::OK();
return {Status::NotOK, s.ToString()};
Expand Down
4 changes: 1 addition & 3 deletions src/server/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__";

class Namespace {
public:
explicit Namespace(engine::Storage *storage)
: storage_(storage), cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {}
explicit Namespace(engine::Storage *storage) : storage_(storage) {}

~Namespace() = default;
Namespace(const Namespace &) = delete;
Expand All @@ -45,7 +44,6 @@ class Namespace {

private:
engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *cf_ = nullptr;

std::shared_mutex tokens_mu_;
// mapping from token to namespace name
Expand Down
39 changes: 39 additions & 0 deletions tests/gocase/unit/namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package namespace

import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -252,6 +254,43 @@ func TestNamespaceReplicate(t *testing.T) {
})
}

func TestNamespaceReplicateWithFullSync(t *testing.T) {
config := map[string]string{
"rocksdb.write_buffer_size": "4",
"rocksdb.target_file_size_base": "16",
"rocksdb.max_write_buffer_number": "1",
"rocksdb.wal_ttl_seconds": "0",
"rocksdb.wal_size_limit_mb": "0",
"repl-namespace-enabled": "yes",
"requirepass": "123",
"masterauth": "123",
}
master := util.StartServer(t, config)
defer master.Close()
masterClient := master.NewClientWithOption(&redis.Options{Password: "123"})
defer func() { require.NoError(t, masterClient.Close()) }()

slave := util.StartServer(t, config)
defer slave.Close()
slaveClient := slave.NewClientWithOption(&redis.Options{Password: "123"})
defer func() { require.NoError(t, slaveClient.Close()) }()

ctx := context.Background()
value := strings.Repeat("a", 128*1024)
for i := 0; i < 1024; i++ {
require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err())
}
require.NoError(t, masterClient.Do(ctx, "NAMESPACE", "ADD", "foo", "bar").Err())

util.SlaveOf(t, slaveClient, master)
util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second)

// Namespaces should be replicated after the full sync
token, err := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, "bar", token)
}

func TestNamespaceRewrite(t *testing.T) {
password := "pwd"
srv := util.StartServerWithCLIOptions(t, false, map[string]string{
Expand Down

0 comments on commit 104f354

Please sign in to comment.