Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of the command RENAME&RENAMENX #2026

Merged
merged 13 commits into from
Jan 23, 2024
34 changes: 33 additions & 1 deletion src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,36 @@ class CommandDel : public Commander {
}
};

class CommandRename : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
bool ret = true;

auto s = redis.Rename(args_[1], args_[2], false, &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::SimpleString("OK");
return Status::OK();
}
};

class CommandRenameNX : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
bool ret = true;
auto s = redis.Rename(args_[1], args_[2], true, &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
if (ret) {
*output = redis::Integer(1);
} else {
*output = redis::Integer(0);
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPTTL>("pttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandType>("type", 2, "read-only", 1, 1, 1),
Expand All @@ -321,6 +351,8 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandExpireTime>("expiretime", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPExpireTime>("pexpiretime", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandDel>("del", -2, "write", 1, -1, 1),
MakeCmdAttr<CommandDel>("unlink", -2, "write", 1, -1, 1), )
MakeCmdAttr<CommandDel>("unlink", -2, "write", 1, -1, 1),
MakeCmdAttr<CommandRename>("rename", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandRenameNX>("renamenx", 3, "write", 1, 2, 1), )

} // namespace redis
2 changes: 2 additions & 0 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ Slice SubKeyIterator::UserKey() const {
return internal_key.GetSubKey();
}

rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

void SubKeyIterator::Seek() {
Expand Down
1 change: 1 addition & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SubKeyIterator {
Slice Key() const;
// return the user key without prefix
Slice UserKey() const;
rocksdb::ColumnFamilyHandle *ColumnFamilyHandle() const;
Slice Value() const;
void Reset();

Expand Down
60 changes: 60 additions & 0 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
#include "db_util.h"
#include "parse_util.h"
#include "rocksdb/iterator.h"
#include "rocksdb/status.h"
#include "server/server.h"
#include "storage/iterator.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "time_util.h"

namespace redis {
Expand Down Expand Up @@ -690,4 +693,61 @@ Status WriteBatchLogData::Decode(const rocksdb::Slice &blob) {

return Status::OK();
}

rocksdb::Status Database::Rename(const std::string &key, const std::string &new_key, bool nx, bool *ret) {
*ret = true;
RedisType type = kRedisNone;
std::string ns_key = AppendNamespacePrefix(key);
std::string new_ns_key = AppendNamespacePrefix(new_key);
std::vector<std::string> lock_keys = {ns_key, new_ns_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
auto s = Type(key, &type);
if (!s.ok()) return s;
if (type == kRedisNone) return rocksdb::Status::InvalidArgument("ERR no such key");

if (nx) {
int exist = 0;
if (s = Exists({new_key}, &exist), !s.ok()) return s;
if (exist > 0) {
*ret = false;
return rocksdb::Status::OK();
}
}

if (key == new_key) return rocksdb::Status::OK();

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(type);
batch->PutLogData(log_data.Encode());
git-hulk marked this conversation as resolved.
Show resolved Hide resolved

engine::DBIterator iter(storage_, rocksdb::ReadOptions());
iter.Seek(ns_key);

// copy metadata
batch->Delete(metadata_cf_handle_, ns_key);
batch->Put(metadata_cf_handle_, new_ns_key, iter.Value());

auto subkey_iter = iter.GetSubKeyIterator();

if (subkey_iter != nullptr) {
for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
InternalKey from_ikey(subkey_iter->Key(), storage_->IsSlotIdEncoded());
std::string to_ikey =
InternalKey(new_ns_key, from_ikey.GetSubKey(), from_ikey.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
// copy sub key
batch->Put(subkey_iter->ColumnFamilyHandle(), to_ikey, subkey_iter->Value());

if (type == kRedisZSet) {
jihuayu marked this conversation as resolved.
Show resolved Hide resolved
std::string score_bytes = subkey_iter->Value().ToString();
score_bytes.append(from_ikey.GetSubKey().ToString());
// copy score key
std::string score_key =
InternalKey(new_ns_key, score_bytes, from_ikey.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
batch->Put(storage_->GetCFHandle(engine::kZSetScoreColumnFamilyName), score_key, Slice());
jihuayu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
} // namespace redis
1 change: 1 addition & 0 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Database {
rocksdb::ColumnFamilyHandle *cf_handle = nullptr);
[[nodiscard]] rocksdb::Status ClearKeysOfSlot(const rocksdb::Slice &ns, int slot);
[[nodiscard]] rocksdb::Status KeyExist(const std::string &key);
[[nodiscard]] rocksdb::Status Rename(const std::string &key, const std::string &new_key, bool nx, bool *ret);

protected:
engine::Storage *storage_;
Expand Down
Loading