Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MDel do the batch delete and use MultiLockGuard guarantee atomicity #1712

Merged
merged 14 commits into from
Sep 4, 2023
Merged
14 changes: 10 additions & 4 deletions src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class CommandExists : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
std::vector<rocksdb::Slice> keys;
keys.reserve(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
keys.emplace_back(args_[i]);
}
Expand Down Expand Up @@ -254,12 +255,17 @@ class CommandPersist : public Commander {
class CommandDel : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
int cnt = 0;
redis::Database redis(svr->storage, conn->GetNamespace());
std::vector<rocksdb::Slice> keys;
keys.reserve(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
auto s = redis.Del(args_[i]);
if (s.ok()) cnt++;
keys.emplace_back(args_[i]);
}

uint64_t cnt = 0;
redis::Database redis(svr->storage, conn->GetNamespace());
auto s = redis.MDel(keys, &cnt);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(cnt);
return Status::OK();
}
Expand Down
47 changes: 47 additions & 0 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,52 @@ rocksdb::Status Database::Del(const Slice &user_key) {
return storage_->Delete(storage_->DefaultWriteOptions(), metadata_cf_handle_, ns_key);
}

rocksdb::Status Database::MDel(const std::vector<Slice> &keys, uint64_t *deleted_cnt) {
*deleted_cnt = 0;

std::vector<std::string> lock_keys;
lock_keys.reserve(keys.size());
for (const auto &key : keys) {
std::string ns_key = AppendNamespacePrefix(key);
lock_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
batch->PutLogData(log_data.Encode());

std::vector<Slice> slice_keys;
slice_keys.reserve(lock_keys.size());
for (const auto &ns_key : lock_keys) {
slice_keys.emplace_back(ns_key);
}

LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
read_options.snapshot = ss.GetSnapShot();
std::vector<rocksdb::Status> statuses(slice_keys.size());
std::vector<rocksdb::PinnableSlice> pin_values(slice_keys.size());
storage_->MultiGet(read_options, metadata_cf_handle_, slice_keys.size(), slice_keys.data(), pin_values.data(),
statuses.data());

for (size_t i = 0; i < slice_keys.size(); i++) {
if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i];
if (!statuses[i].IsNotFound()) continue;

Metadata metadata(kRedisNone, false);
metadata.Decode(pin_values[i]);
if (metadata.Expired()) continue;

batch->Delete(metadata_cf_handle_, lock_keys[i]);
*deleted_cnt += 1;
}

if (*deleted_cnt == 0) return rocksdb::Status::OK();

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Database::Exists(const std::vector<Slice> &keys, int *ret) {
*ret = 0;
LatestSnapShot ss(storage_);
Expand All @@ -132,6 +178,7 @@ rocksdb::Status Database::Exists(const std::vector<Slice> &keys, int *ret) {
for (const auto &key : keys) {
std::string ns_key = AppendNamespacePrefix(key);
s = storage_->Get(read_options, metadata_cf_handle_, ns_key, &value);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
Metadata metadata(kRedisNone, false);
metadata.Decode(value);
Expand Down
1 change: 1 addition & 0 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Database {
rocksdb::Status GetRawMetadataByUserKey(const Slice &user_key, std::string *bytes);
rocksdb::Status Expire(const Slice &user_key, uint64_t timestamp);
rocksdb::Status Del(const Slice &user_key);
rocksdb::Status MDel(const std::vector<Slice> &keys, uint64_t *deleted_cnt);
rocksdb::Status Exists(const std::vector<Slice> &keys, int *ret);
rocksdb::Status TTL(const Slice &user_key, int64_t *ttl);
rocksdb::Status Type(const Slice &user_key, RedisType *type);
Expand Down
50 changes: 50 additions & 0 deletions tests/gocase/unit/introspection/introspection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,56 @@ func TestIntrospection(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

for _, command := range []string{"DEL", "UNLINK"} {
t.Run(fmt.Sprintf("%s can remove the specified keys", command), func(t *testing.T) {
// string type
rdb.Do(ctx, "mset", "string_key1", "v1", "string_key2", "v2", "string_key3", "v3")
require.EqualValues(t, 1, rdb.Do(ctx, command, "string_key1").Val())
require.EqualValues(t, 2, rdb.Do(ctx, command, "string_key2", "string_key3").Val())
require.EqualValues(t, 0, rdb.Do(ctx, "exists", "string_key1", "string_key2", "string_key3").Val())

// list type
rdb.Do(ctx, "lpush", "list_key1", "v1")
rdb.Do(ctx, "lpush", "list_key2", "v1", "v2")
rdb.Do(ctx, "lpush", "list_key3", "v1", "v2", "v3")
require.EqualValues(t, 1, rdb.Do(ctx, command, "list_key1").Val())
require.EqualValues(t, 2, rdb.Do(ctx, command, "list_key2", "list_key3").Val())
require.EqualValues(t, 0, rdb.Do(ctx, "exists", "list_key1", "list_key2", "list_key3").Val())

// set type
rdb.Do(ctx, "sadd", "set_key1", "v1")
rdb.Do(ctx, "sadd", "set_key2", "v2", "v2")
rdb.Do(ctx, "sadd", "set_key3", "v3", "v3", "v3")
require.EqualValues(t, 1, rdb.Do(ctx, command, "set_key1").Val())
require.EqualValues(t, 2, rdb.Do(ctx, command, "set_key2", "set_key3").Val())
require.EqualValues(t, 0, rdb.Do(ctx, "exists", "set_key1", "set_key2", "set_key3").Val())

// hash type
rdb.Do(ctx, "hset", "hash_key1", "k1", "v1")
rdb.Do(ctx, "hset", "hash_key2", "k1", "v1", "k2", "v2")
rdb.Do(ctx, "hset", "hash_key3", "k1", "v1", "k2", "v2", "k3", "v3")
require.EqualValues(t, 1, rdb.Do(ctx, command, "hash_key1").Val())
require.EqualValues(t, 2, rdb.Do(ctx, command, "hash_key2", "hash_key3").Val())
require.EqualValues(t, 0, rdb.Do(ctx, "exists", "hash_key1", "hash_key2", "hash_key3").Val())

// zset type
rdb.Do(ctx, "zadd", "zset_key1", "10", "m1")
rdb.Do(ctx, "zadd", "zset_key2", "10", "m1", "20", "m2")
rdb.Do(ctx, "zadd", "zset_key3", "10", "m1", "20", "m2", "30", "m3")
require.EqualValues(t, 1, rdb.Do(ctx, command, "zset_key1").Val())
require.EqualValues(t, 2, rdb.Do(ctx, command, "zset_key2", "zset_key3").Val())
require.EqualValues(t, 0, rdb.Do(ctx, "exists", "zset_key1", "zset_key2", "zset_key3").Val())

// sortint type
rdb.Do(ctx, "siadd", "si_key1", "1")
rdb.Do(ctx, "siadd", "si_key2", "1", "2")
rdb.Do(ctx, "siadd", "si_key3", "1", "2", "3")
require.EqualValues(t, 1, rdb.Do(ctx, command, "si_key1").Val())
require.EqualValues(t, 2, rdb.Do(ctx, command, "si_key2", "si_key3").Val())
require.EqualValues(t, 0, rdb.Do(ctx, "exists", "si_key1", "si_key2", "si_key3").Val())
})
}

t.Run("TIME", func(t *testing.T) {
nowUnix := int(time.Now().Unix())

Expand Down