diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc index efcf69f4ef5..505c4aa7be1 100644 --- a/src/commands/cmd_key.cc +++ b/src/commands/cmd_key.cc @@ -117,6 +117,7 @@ class CommandExists : public Commander { public: Status Execute(Server *svr, Connection *conn, std::string *output) override { std::vector keys; + keys.reserve(args_.size() - 1); for (size_t i = 1; i < args_.size(); i++) { keys.emplace_back(args_[i]); } @@ -254,12 +255,17 @@ class CommandPersist : public Commander { class CommandDel : public Commander { public: Status Execute(Server *svr, Connection *conn, std::string *output) override { - int cnt = 0; - redis::Database redis(svr->storage, conn->GetNamespace()); + std::vector keys; + keys.reserve(args_.size() - 1); for (size_t i = 1; i < args_.size(); i++) { - auto s = redis.Del(args_[i]); - if (s.ok()) cnt++; + keys.emplace_back(args_[i]); } + + uint64_t cnt = 0; + redis::Database redis(svr->storage, conn->GetNamespace()); + auto s = redis.MDel(keys, &cnt); + if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; + *output = redis::Integer(cnt); return Status::OK(); } diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index b4904949614..948cbf5764a 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -121,6 +121,52 @@ rocksdb::Status Database::Del(const Slice &user_key) { return storage_->Delete(storage_->DefaultWriteOptions(), metadata_cf_handle_, ns_key); } +rocksdb::Status Database::MDel(const std::vector &keys, uint64_t *deleted_cnt) { + *deleted_cnt = 0; + + std::vector lock_keys; + lock_keys.reserve(keys.size()); + for (const auto &key : keys) { + std::string ns_key = AppendNamespacePrefix(key); + lock_keys.emplace_back(std::move(ns_key)); + } + MultiLockGuard guard(storage_->GetLockManager(), lock_keys); + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisNone); + batch->PutLogData(log_data.Encode()); + + std::vector slice_keys; + slice_keys.reserve(lock_keys.size()); + for (const auto &ns_key : lock_keys) { + slice_keys.emplace_back(ns_key); + } + + LatestSnapShot ss(storage_); + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + read_options.snapshot = ss.GetSnapShot(); + std::vector statuses(slice_keys.size()); + std::vector pin_values(slice_keys.size()); + storage_->MultiGet(read_options, metadata_cf_handle_, slice_keys.size(), slice_keys.data(), pin_values.data(), + statuses.data()); + + for (size_t i = 0; i < slice_keys.size(); i++) { + if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i]; + if (statuses[i].IsNotFound()) continue; + + Metadata metadata(kRedisNone, false); + metadata.Decode(pin_values[i]); + if (metadata.Expired()) continue; + + batch->Delete(metadata_cf_handle_, lock_keys[i]); + *deleted_cnt += 1; + } + + if (*deleted_cnt == 0) return rocksdb::Status::OK(); + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status Database::Exists(const std::vector &keys, int *ret) { *ret = 0; LatestSnapShot ss(storage_); @@ -132,6 +178,7 @@ rocksdb::Status Database::Exists(const std::vector &keys, int *ret) { for (const auto &key : keys) { std::string ns_key = AppendNamespacePrefix(key); s = storage_->Get(read_options, metadata_cf_handle_, ns_key, &value); + if (!s.ok() && !s.IsNotFound()) return s; if (s.ok()) { Metadata metadata(kRedisNone, false); metadata.Decode(value); diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index 51adc6cd164..86b852f8905 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -39,6 +39,7 @@ class Database { rocksdb::Status GetRawMetadataByUserKey(const Slice &user_key, std::string *bytes); rocksdb::Status Expire(const Slice &user_key, uint64_t timestamp); rocksdb::Status Del(const Slice &user_key); + rocksdb::Status MDel(const std::vector &keys, uint64_t *deleted_cnt); rocksdb::Status Exists(const std::vector &keys, int *ret); rocksdb::Status TTL(const Slice &user_key, int64_t *ttl); rocksdb::Status Type(const Slice &user_key, RedisType *type); diff --git a/tests/gocase/unit/introspection/introspection_test.go b/tests/gocase/unit/introspection/introspection_test.go index 74bf66b8920..af15117dfbf 100644 --- a/tests/gocase/unit/introspection/introspection_test.go +++ b/tests/gocase/unit/introspection/introspection_test.go @@ -40,6 +40,56 @@ func TestIntrospection(t *testing.T) { rdb := srv.NewClient() defer func() { require.NoError(t, rdb.Close()) }() + for _, command := range []string{"DEL", "UNLINK"} { + t.Run(fmt.Sprintf("%s can remove the specified keys", command), func(t *testing.T) { + // string type + rdb.Do(ctx, "mset", "string_key1", "v1", "string_key2", "v2", "string_key3", "v3") + require.EqualValues(t, 1, rdb.Do(ctx, command, "string_key1").Val()) + require.EqualValues(t, 2, rdb.Do(ctx, command, "string_key2", "string_key3").Val()) + require.EqualValues(t, 0, rdb.Do(ctx, "exists", "string_key1", "string_key2", "string_key3").Val()) + + // list type + rdb.Do(ctx, "lpush", "list_key1", "v1") + rdb.Do(ctx, "lpush", "list_key2", "v1", "v2") + rdb.Do(ctx, "lpush", "list_key3", "v1", "v2", "v3") + require.EqualValues(t, 1, rdb.Do(ctx, command, "list_key1").Val()) + require.EqualValues(t, 2, rdb.Do(ctx, command, "list_key2", "list_key3").Val()) + require.EqualValues(t, 0, rdb.Do(ctx, "exists", "list_key1", "list_key2", "list_key3").Val()) + + // set type + rdb.Do(ctx, "sadd", "set_key1", "v1") + rdb.Do(ctx, "sadd", "set_key2", "v2", "v2") + rdb.Do(ctx, "sadd", "set_key3", "v3", "v3", "v3") + require.EqualValues(t, 1, rdb.Do(ctx, command, "set_key1").Val()) + require.EqualValues(t, 2, rdb.Do(ctx, command, "set_key2", "set_key3").Val()) + require.EqualValues(t, 0, rdb.Do(ctx, "exists", "set_key1", "set_key2", "set_key3").Val()) + + // hash type + rdb.Do(ctx, "hset", "hash_key1", "k1", "v1") + rdb.Do(ctx, "hset", "hash_key2", "k1", "v1", "k2", "v2") + rdb.Do(ctx, "hset", "hash_key3", "k1", "v1", "k2", "v2", "k3", "v3") + require.EqualValues(t, 1, rdb.Do(ctx, command, "hash_key1").Val()) + require.EqualValues(t, 2, rdb.Do(ctx, command, "hash_key2", "hash_key3").Val()) + require.EqualValues(t, 0, rdb.Do(ctx, "exists", "hash_key1", "hash_key2", "hash_key3").Val()) + + // zset type + rdb.Do(ctx, "zadd", "zset_key1", "10", "m1") + rdb.Do(ctx, "zadd", "zset_key2", "10", "m1", "20", "m2") + rdb.Do(ctx, "zadd", "zset_key3", "10", "m1", "20", "m2", "30", "m3") + require.EqualValues(t, 1, rdb.Do(ctx, command, "zset_key1").Val()) + require.EqualValues(t, 2, rdb.Do(ctx, command, "zset_key2", "zset_key3").Val()) + require.EqualValues(t, 0, rdb.Do(ctx, "exists", "zset_key1", "zset_key2", "zset_key3").Val()) + + // sortint type + rdb.Do(ctx, "siadd", "si_key1", "1") + rdb.Do(ctx, "siadd", "si_key2", "1", "2") + rdb.Do(ctx, "siadd", "si_key3", "1", "2", "3") + require.EqualValues(t, 1, rdb.Do(ctx, command, "si_key1").Val()) + require.EqualValues(t, 2, rdb.Do(ctx, command, "si_key2", "si_key3").Val()) + require.EqualValues(t, 0, rdb.Do(ctx, "exists", "si_key1", "si_key2", "si_key3").Val()) + }) + } + t.Run("TIME", func(t *testing.T) { nowUnix := int(time.Now().Unix())