Skip to content

Commit

Permalink
Implement CAS and CAD commands for string type (#415)
Browse files Browse the repository at this point in the history
CAS, set its value to new value when the current value of the string corresponding to key is equal to old value.
- syntax: CAS key old_value new_value [EX seconds | PX milliseconds]
- return value: returns 1 if the value is successfully updated, -1 if the key does not exist, and 0 if the value fails to be updated

CAD, delete the key when the value is equal to the value in the db.
- syntax: CAD key value
- return value: returns 1 if the value is successfully updated, -1 if the key does not exist, and 0 if the value fails to be updated

More details, please see #415
  • Loading branch information
caipengbo authored Dec 3, 2021
1 parent f395386 commit 1c021ae
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 2 deletions.
64 changes: 64 additions & 0 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,68 @@ class CommandDecrBy : public Commander {
int64_t increment_ = 0;
};

class CommandCAS : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
bool last_arg;
for (size_t i = 4; i < args.size(); i++) {
last_arg = (i == args.size()-1);
std::string opt = Util::ToLower(args[i]);
if (opt == "ex") {
if (last_arg) return Status(Status::NotOK, errWrongNumOfArguments);
ttl_ = atoi(args_[++i].c_str());
if (ttl_ <= 0) return Status(Status::RedisParseErr, errInvalidExpireTime);
} else if (opt == "px") {
if (last_arg) return Status(Status::NotOK, errWrongNumOfArguments);
auto ttl_ms = atol(args[++i].c_str());
if (ttl_ms <= 0) return Status(Status::RedisParseErr, errInvalidExpireTime);
if (ttl_ms > 0 && ttl_ms < 1000) {
// round up the pttl to second
ttl_ = 1;
} else {
ttl_ = static_cast<int>(ttl_ms/1000);
}
} else {
return Status(Status::NotOK, errInvalidSyntax);
}
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::String string_db(svr->storage_, conn->GetNamespace());
rocksdb::Status s;
int ret = 0;
s = string_db.CAS(args_[1], args_[2], args_[3], ttl_, &ret);
if (!s.ok()) {
return Status(Status::RedisExecErr, s.ToString());
}
*output = Redis::Integer(ret);
return Status::OK();
}

private:
int ttl_ = 0;
};

class CommandCAD : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::String string_db(svr->storage_, conn->GetNamespace());
rocksdb::Status s;
int ret = 0;
s = string_db.CAD(args_[1], args_[2], &ret);
if (!s.ok()) {
return Status(Status::RedisExecErr, s.ToString());
}
*output = Redis::Integer(ret);
return Status::OK();
}

private:
int ttl_ = 0;
};

class CommandDel : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -4424,6 +4486,8 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("incr", 2, "write", 1, 1, 1, CommandIncr),
ADD_CMD("decrby", 3, "write", 1, 1, 1, CommandDecrBy),
ADD_CMD("decr", 2, "write", 1, 1, 1, CommandDecr),
ADD_CMD("cas", -4, "write", 1, 1, 1, CommandCAS),
ADD_CMD("cad", 3, "write", 1, 1, 1, CommandCAD),

ADD_CMD("getbit", 3, "read-only", 1, 1, 1, CommandGetBit),
ADD_CMD("setbit", 4, "write", 1, 1, 1, CommandSetBit),
Expand Down
80 changes: 80 additions & 0 deletions src/redis_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,84 @@ rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, int ttl, in
*ret = 1;
return rocksdb::Status::OK();
}

// Change the value of user_key to a new_value if the current value of the key matches old_value.
// ret will be:
// 1 if the operation is successful
// -1 if the user_key does not exist
// 0 if the operation fails
rocksdb::Status String::CAS(const std::string &user_key, const std::string &old_value,
const std::string &new_value, int ttl, int *ret) {
*ret = 0;

std::string ns_key, current_value;
AppendNamespacePrefix(user_key, &ns_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = getValue(ns_key, &current_value);

if (!s.ok() && !s.IsNotFound()) {
return s;
}

if (s.IsNotFound()) {
*ret = -1;
return rocksdb::Status::OK();
}

if (old_value == current_value) {
std::string raw_value;
uint32_t expire = 0;
Metadata metadata(kRedisString, false);
if (ttl > 0) {
int64_t now;
rocksdb::Env::Default()->GetCurrentTime(&now);
expire = uint32_t(now) + ttl;
}
metadata.expire = expire;
metadata.Encode(&raw_value);
raw_value.append(new_value);
auto write_status = updateRawValue(ns_key, raw_value);
if (!write_status.ok()) {
return s;
}
*ret = 1;
}

return rocksdb::Status::OK();
}

// Delete a specified user_key if the current value of the user_key matches a specified value.
// For ret, same as CAS.
rocksdb::Status String::CAD(const std::string &user_key, const std::string &value, int *ret) {
*ret = 0;

std::string ns_key, current_value;
AppendNamespacePrefix(user_key, &ns_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = getValue(ns_key, &current_value);

if (!s.ok() && !s.IsNotFound()) {
return s;
}

if (s.IsNotFound()) {
*ret = -1;
return rocksdb::Status::OK();
}

if (value == current_value) {
auto delete_status = storage_->Delete(rocksdb::WriteOptions(),
storage_->GetCFHandle(Engine::kMetadataColumnFamilyName),
ns_key);
if (!delete_status.ok()) {
return s;
}
*ret = 1;
}

return rocksdb::Status::OK();
}

} // namespace Redis
3 changes: 3 additions & 0 deletions src/redis_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class String : public Database {
std::vector<rocksdb::Status> MGet(const std::vector<Slice> &keys, std::vector<std::string> *values);
rocksdb::Status MSet(const std::vector<StringPair> &pairs, int ttl = 0);
rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, int ttl, int *ret);
rocksdb::Status CAS(const std::string &user_key, const std::string &old_value,
const std::string &new_value, int ttl, int *ret);
rocksdb::Status CAD(const std::string &user_key, const std::string &value, int *ret);

private:
rocksdb::Status getValue(const std::string &ns_key, std::string *value);
Expand Down
57 changes: 57 additions & 0 deletions tests/t_string_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,61 @@ TEST_F(RedisStringTest, SetRange) {
string->Get(key_, &value);
EXPECT_EQ(16, value.size());
string->Del(key_);
}

TEST_F(RedisStringTest, CAS) {
int ret;
std::string key = "cas_key", value = "cas_value", new_value = "new_value";

auto status = string->Set(key, value);
ASSERT_TRUE(status.ok());

status = string->CAS("non_exist_key", value, new_value, 10, &ret);
ASSERT_TRUE(status.ok());
EXPECT_EQ(-1, ret);

status = string->CAS(key, "cas_value_err", new_value, 10, &ret);
ASSERT_TRUE(status.ok());
EXPECT_EQ(0, ret);

status = string->CAS(key, value, new_value, 10, &ret);
ASSERT_TRUE(status.ok());
EXPECT_EQ(1, ret);

std::string current_value;
status = string->Get(key, &current_value);
ASSERT_TRUE(status.ok());
EXPECT_EQ(new_value, current_value);

int ttl;
string->TTL(key, &ttl);
EXPECT_TRUE(ttl >= 9 && ttl <= 10);

string->Del(key);
}

TEST_F(RedisStringTest, CAD) {
int ret;
std::string key = "cas_key", value = "cas_value";

auto status = string->Set(key, value);
ASSERT_TRUE(status.ok());

status = string->CAD("non_exist_key", value, &ret);
ASSERT_TRUE(status.ok());
EXPECT_EQ(-1, ret);

status = string->CAD(key, "cas_value_err", &ret);
ASSERT_TRUE(status.ok());
EXPECT_EQ(0, ret);

status = string->CAD(key, value, &ret);
ASSERT_TRUE(status.ok());
EXPECT_EQ(1, ret);

std::string current_value;
status = string->Get(key, &current_value);
ASSERT_TRUE(status.IsNotFound());

string->Del(key);
}
4 changes: 2 additions & 2 deletions tests/tcl/tests/unit/command.tcl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
start_server {tags {"command"}} {
test {kvrocks has 164 commands currently} {
test {kvrocks has 166 commands currently} {
r command count
} {164}
} {166}

test {acquire GET command info by COMMAND INFO} {
set e [lindex [r command info get] 0]
Expand Down
80 changes: 80 additions & 0 deletions tests/tcl/tests/unit/type/string.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -575,4 +575,84 @@ start_server {tags {"string"}} {
# test {LCS indexes with match len and minimum match len} {
# dict get [r STRALGO LCS IDX KEYS virus1 virus2 WITHMATCHLEN MINMATCHLEN 5] matches
# } {{{1 222} {13 234} 222}}

# CAS/CAD
test {CAS normal case} {
r del cas_key

set res [r cas cas_key old_value new_value]
assert_equal $res -1

set res [r exists cas_key]
assert_equal $res 0

set res [r set cas_key old_value]
assert_equal $res "OK"

set res [r cas cas_key old_val new_value]
assert_equal $res 0

set res [r cas cas_key old_value new_value]
assert_equal $res 1
}

test {CAS wrong key type} {
r del a_list_key
r lpush a_list_key 123

catch {r cas a_list_key 123 234} err
assert_match {*WRONGTYPE*} $err
}

test {CAS invalid param num} {
r del cas_key
r set cas_key 123

catch {r cas cas_key 123} err
assert_match {*ERR*wrong*number*of*arguments*} $err

catch {r cas cas_key 123 234 ex} err
assert_match {*ERR*wrong*number*of*arguments*} $err
}

test {CAS expire} {
r del cas_key
r set cas_key 123

set res [r cas cas_key 123 234 ex 1]
assert_equal $res 1

set res [r get cas_key]
assert_equal $res "234"

after 2000

set res [r get cas_key]
assert_equal $res ""
}

test {CAD normal case} {
set res [r cad cad_key 123]
assert_equal $res -1

r set cad_key 123

set res [r cad cad_key 234]
assert_equal $res 0

set res [r cad cad_key 123]
assert_equal $res 1

set res [r get cad_key]
assert_equal $res ""
}

test {CAD invalid param num} {
r set cad_key 123
catch {r cad cad_key} err
assert_match {*ERR*wrong*number*of*arguments*} $err

catch {r cad cad_key 123 234} err
assert_match {*ERR*wrong*number*of*arguments*} $err
}
}

0 comments on commit 1c021ae

Please sign in to comment.