Skip to content

Commit

Permalink
Add support of EXPIRETIME and PEXPIRETIME (#1965)
Browse files Browse the repository at this point in the history
  • Loading branch information
kay011 authored Dec 26, 2023
1 parent 3d83f2e commit 7a08dfd
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 25 deletions.
36 changes: 36 additions & 0 deletions src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,40 @@ class CommandPExpireAt : public Commander {
uint64_t timestamp_ = 0;
};

class CommandExpireTime : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
uint64_t timestamp = 0;
auto s = redis.GetExpireTime(args_[1], &timestamp);
if (s.ok()) {
*output = timestamp > 0 ? redis::Integer(timestamp / 1000) : redis::Integer(-1);
} else if (s.IsNotFound()) {
*output = redis::Integer(-2);
} else {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}
};

class CommandPExpireTime : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
uint64_t timestamp = 0;
auto s = redis.GetExpireTime(args_[1], &timestamp);
if (s.ok()) {
*output = timestamp > 0 ? redis::Integer(timestamp) : redis::Integer(-1);
} else if (s.IsNotFound()) {
*output = redis::Integer(-2);
} else {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}
};

class CommandPersist : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -284,6 +318,8 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPExpire>("pexpire", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandExpireAt>("expireat", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandPExpireAt>("pexpireat", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandExpireTime>("expiretime", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPExpireTime>("pexpiretime", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandDel>("del", -2, "write", 1, -1, 1),
MakeCmdAttr<CommandDel>("unlink", -2, "write", 1, -1, 1), )

Expand Down
14 changes: 7 additions & 7 deletions src/stats/disk_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ rocksdb::Status Disk::GetStringSize(const Slice &ns_key, uint64_t *key_size) {

rocksdb::Status Disk::GetHashSize(const Slice &ns_key, uint64_t *key_size) {
HashMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisHash, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisHash}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key, storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
}

rocksdb::Status Disk::GetSetSize(const Slice &ns_key, uint64_t *key_size) {
SetMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisSet, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisSet}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key, storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size);
}

rocksdb::Status Disk::GetListSize(const Slice &ns_key, uint64_t *key_size) {
ListMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisList, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisList}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string buf;
PutFixed64(&buf, metadata.head);
Expand All @@ -100,7 +100,7 @@ rocksdb::Status Disk::GetListSize(const Slice &ns_key, uint64_t *key_size) {

rocksdb::Status Disk::GetZsetSize(const Slice &ns_key, uint64_t *key_size) {
ZSetMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisZSet, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisZSet}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string score_bytes;
PutDouble(&score_bytes, kMinScore);
Expand All @@ -112,15 +112,15 @@ rocksdb::Status Disk::GetZsetSize(const Slice &ns_key, uint64_t *key_size) {

rocksdb::Status Disk::GetBitmapSize(const Slice &ns_key, uint64_t *key_size) {
BitmapMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisBitmap, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisBitmap}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key, storage_->GetCFHandle(engine::kSubkeyColumnFamilyName), key_size,
std::to_string(0), std::to_string(0));
}

rocksdb::Status Disk::GetSortedintSize(const Slice &ns_key, uint64_t *key_size) {
SortedintMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisSortedint, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisSortedint}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string start_buf;
PutFixed64(&start_buf, 0);
Expand All @@ -130,7 +130,7 @@ rocksdb::Status Disk::GetSortedintSize(const Slice &ns_key, uint64_t *key_size)

rocksdb::Status Disk::GetStreamSize(const Slice &ns_key, uint64_t *key_size) {
StreamMetadata metadata(false);
rocksdb::Status s = Database::GetMetadata(kRedisStream, ns_key, &metadata);
rocksdb::Status s = Database::GetMetadata({kRedisStream}, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
return GetApproximateSizes(metadata, ns_key, storage_->GetCFHandle(engine::kStreamColumnFamilyName), key_size);
}
Expand Down
22 changes: 16 additions & 6 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata
return s;
}

rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key, Metadata *metadata) {
rocksdb::Status Database::GetMetadata(RedisTypes types, const Slice &ns_key, Metadata *metadata) {
std::string raw_value;
Slice rest;
return GetMetadata(type, ns_key, &raw_value, metadata, &rest);
return GetMetadata(types, ns_key, &raw_value, metadata, &rest);
}

rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key, std::string *raw_value, Metadata *metadata,
rocksdb::Status Database::GetMetadata(RedisTypes types, const Slice &ns_key, std::string *raw_value, Metadata *metadata,
Slice *rest) {
auto s = GetRawMetadata(ns_key, raw_value);
*rest = *raw_value;
if (!s.ok()) return s;
return ParseMetadata({type}, rest, metadata);
return ParseMetadata(types, rest, metadata);
}

rocksdb::Status Database::GetRawMetadata(const Slice &ns_key, std::string *bytes) {
Expand Down Expand Up @@ -225,6 +225,16 @@ rocksdb::Status Database::TTL(const Slice &user_key, int64_t *ttl) {
return rocksdb::Status::OK();
}

rocksdb::Status Database::GetExpireTime(const Slice &user_key, uint64_t *timestamp) {
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(kRedisNone, false);
auto s = GetMetadata(RedisTypes::All(), ns_key, &metadata);
if (!s.ok()) return s;
*timestamp = metadata.expire;

return rocksdb::Status::OK();
}

rocksdb::Status Database::GetKeyNumStats(const std::string &prefix, KeyNumStats *stats) {
return Keys(prefix, nullptr, stats);
}
Expand Down Expand Up @@ -488,7 +498,7 @@ rocksdb::Status Database::Dump(const Slice &user_key, std::vector<std::string> *

if (metadata.Type() == kRedisList) {
ListMetadata list_metadata(false);
s = GetMetadata(kRedisList, ns_key, &list_metadata);
s = GetMetadata({kRedisList}, ns_key, &list_metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
infos->emplace_back("head");
infos->emplace_back(std::to_string(list_metadata.head));
Expand Down Expand Up @@ -644,7 +654,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
uint64_t cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(type, false);
rocksdb::Status s = GetMetadata(type, ns_key, &metadata);
rocksdb::Status s = GetMetadata({type}, ns_key, &metadata);
if (!s.ok()) return s;

LatestSnapShot ss(storage_);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ class Database {

explicit Database(engine::Storage *storage, std::string ns = "");
[[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisType type, const Slice &ns_key, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisType type, const Slice &ns_key, std::string *raw_value,
[[nodiscard]] rocksdb::Status GetMetadata(RedisTypes types, const Slice &ns_key, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisTypes types, const Slice &ns_key, std::string *raw_value,
Metadata *metadata, Slice *rest);
[[nodiscard]] rocksdb::Status GetRawMetadata(const Slice &ns_key, std::string *bytes);
[[nodiscard]] rocksdb::Status Expire(const Slice &user_key, uint64_t timestamp);
[[nodiscard]] rocksdb::Status Del(const Slice &user_key);
[[nodiscard]] rocksdb::Status MDel(const std::vector<Slice> &keys, uint64_t *deleted_cnt);
[[nodiscard]] rocksdb::Status Exists(const std::vector<Slice> &keys, int *ret);
[[nodiscard]] rocksdb::Status TTL(const Slice &user_key, int64_t *ttl);
[[nodiscard]] rocksdb::Status GetExpireTime(const Slice &user_key, uint64_t *timestamp);
[[nodiscard]] rocksdb::Status Type(const Slice &user_key, RedisType *type);
[[nodiscard]] rocksdb::Status Dump(const Slice &user_key, std::vector<std::string> *infos);
[[nodiscard]] rocksdb::Status FlushDB();
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace redis {

rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata) {
return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
return Database::GetMetadata({kRedisBloomFilter}, ns_key, metadata);
}

std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata &metadata, uint16_t filters_index) {
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
namespace redis {

rocksdb::Status Hash::GetMetadata(const Slice &ns_key, HashMetadata *metadata) {
return Database::GetMetadata(kRedisHash, ns_key, metadata);
return Database::GetMetadata({kRedisHash}, ns_key, metadata);
}

rocksdb::Status Hash::Size(const Slice &user_key, uint64_t *size) {
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ rocksdb::Status Json::read(const Slice &ns_key, JsonMetadata *metadata, JsonValu
std::string bytes;
Slice rest;

auto s = GetMetadata(kRedisJson, ns_key, &bytes, metadata, &rest);
auto s = GetMetadata({kRedisJson}, ns_key, &bytes, metadata, &rest);
if (!s.ok()) return s;

return parse(*metadata, rest, value);
Expand Down Expand Up @@ -105,7 +105,7 @@ rocksdb::Status Json::Info(const std::string &user_key, JsonStorageFormat *stora
Slice rest;
JsonMetadata metadata;

auto s = GetMetadata(kRedisJson, ns_key, &bytes, &metadata, &rest);
auto s = GetMetadata({kRedisJson}, ns_key, &bytes, &metadata, &rest);
if (!s.ok()) return s;

*storage_format = metadata.format;
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
namespace redis {

rocksdb::Status List::GetMetadata(const Slice &ns_key, ListMetadata *metadata) {
return Database::GetMetadata(kRedisList, ns_key, metadata);
return Database::GetMetadata({kRedisList}, ns_key, metadata);
}

rocksdb::Status List::Size(const Slice &user_key, uint64_t *size) {
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace redis {

rocksdb::Status Set::GetMetadata(const Slice &ns_key, SetMetadata *metadata) {
return Database::GetMetadata(kRedisSet, ns_key, metadata);
return Database::GetMetadata({kRedisSet}, ns_key, metadata);
}

// Make sure members are uniq before use Overwrite
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_sortedint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace redis {

rocksdb::Status Sortedint::GetMetadata(const Slice &ns_key, SortedintMetadata *metadata) {
return Database::GetMetadata(kRedisSortedint, ns_key, metadata);
return Database::GetMetadata({kRedisSortedint}, ns_key, metadata);
}

rocksdb::Status Sortedint::Add(const Slice &user_key, const std::vector<uint64_t> &ids, uint64_t *added_cnt) {
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const char *errXGroupSubcommandRequiresKeyExist =
Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.";

rocksdb::Status Stream::GetMetadata(const Slice &stream_name, StreamMetadata *metadata) {
return Database::GetMetadata(kRedisStream, stream_name, metadata);
return Database::GetMetadata({kRedisStream}, stream_name, metadata);
}

rocksdb::Status Stream::GetLastGeneratedID(const Slice &stream_name, StreamEntryID *id) {
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
namespace redis {

rocksdb::Status ZSet::GetMetadata(const Slice &ns_key, ZSetMetadata *metadata) {
return Database::GetMetadata(kRedisZSet, ns_key, metadata);
return Database::GetMetadata({kRedisZSet}, ns_key, metadata);
}

rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, uint64_t *added_cnt) {
Expand Down
79 changes: 78 additions & 1 deletion tests/cppunit/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <thread>

#include "storage/redis_metadata.h"
#include "test_base.h"
Expand Down Expand Up @@ -90,7 +92,7 @@ TEST_F(RedisTypeTest, GetMetadata) {
EXPECT_TRUE(s.ok() && fvs.size() == ret);
HashMetadata metadata;
std::string ns_key = redis_->AppendNamespacePrefix(key_);
s = redis_->GetMetadata(kRedisHash, ns_key, &metadata);
s = redis_->GetMetadata({kRedisHash}, ns_key, &metadata);
EXPECT_EQ(fvs.size(), metadata.size);
s = redis_->Del(key_);
EXPECT_TRUE(s.ok());
Expand All @@ -114,6 +116,81 @@ TEST_F(RedisTypeTest, Expire) {
s = redis_->Del(key_);
}

TEST_F(RedisTypeTest, ExpireTime) {
uint64_t ret = 0;
std::vector<FieldValue> fvs;
for (size_t i = 0; i < fields_.size(); i++) {
fvs.emplace_back(fields_[i].ToString(), values_[i].ToString());
}
rocksdb::Status s = hash_->MSet(key_, fvs, false, &ret);
EXPECT_TRUE(s.ok() && fvs.size() == ret);
int64_t now = 0;
rocksdb::Env::Default()->GetCurrentTime(&now);
uint64_t ms_offset = 2314;
uint64_t expire_timestamp_ms = now * 1000 + ms_offset;
s = redis_->Expire(key_, expire_timestamp_ms);
EXPECT_TRUE(s.ok());
uint64_t timestamp = 0;
s = redis_->GetExpireTime(key_, &timestamp);
EXPECT_TRUE(s.ok() && timestamp != 0);
if (METADATA_ENCODING_VERSION != 0) {
EXPECT_EQ(timestamp, expire_timestamp_ms);
} else {
EXPECT_EQ(timestamp, Metadata::ExpireMsToS(expire_timestamp_ms) * 1000);
}
s = redis_->Del(key_);
}

TEST_F(RedisTypeTest, ExpireTimeKeyNoExpireTime) {
uint64_t ret = 0;
std::vector<FieldValue> fvs;
for (size_t i = 0; i < fields_.size(); i++) {
fvs.emplace_back(fields_[i].ToString(), values_[i].ToString());
}
rocksdb::Status s = hash_->MSet(key_, fvs, false, &ret);
EXPECT_TRUE(s.ok() && fvs.size() == ret);
uint64_t timestamp = 0;
s = redis_->GetExpireTime(key_, &timestamp);
EXPECT_TRUE(s.ok() && timestamp == 0);
s = redis_->Del(key_);
}

TEST_F(RedisTypeTest, ExpireTimeKeyExpired) {
uint64_t ret = 0;
std::vector<FieldValue> fvs;
for (size_t i = 0; i < fields_.size(); i++) {
fvs.emplace_back(fields_[i].ToString(), values_[i].ToString());
}
rocksdb::Status s = hash_->MSet(key_, fvs, false, &ret);
EXPECT_TRUE(s.ok() && fvs.size() == ret);
int64_t now = 0;
rocksdb::Env::Default()->GetCurrentTime(&now);
uint64_t ms_offset = 1120;
uint64_t expire_timestamp_ms = now * 1000 + ms_offset;
s = redis_->Expire(key_, expire_timestamp_ms);
EXPECT_TRUE(s.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
uint64_t timestamp = 0;
s = redis_->GetExpireTime(key_, &timestamp);
EXPECT_TRUE(s.IsNotFound() && timestamp == 0);
s = redis_->Del(key_);
}

TEST_F(RedisTypeTest, ExpireTimeKeyNotExisted) {
uint64_t ret = 0;
std::vector<FieldValue> fvs;
for (size_t i = 0; i < fields_.size(); i++) {
fvs.emplace_back(fields_[i].ToString(), values_[i].ToString());
}
rocksdb::Status s = hash_->MSet(key_, fvs, false, &ret);
EXPECT_TRUE(s.ok() && fvs.size() == ret);
uint64_t timestamp = 0;
s = redis_->GetExpireTime(key_ + "test", &timestamp);
EXPECT_TRUE(s.IsNotFound() && timestamp == 0);

s = redis_->Del(key_);
}

TEST(Metadata, MetadataDecodingBackwardCompatibleSimpleKey) {
auto expire_at = (util::GetTimeStamp() + 10) * 1000;
Metadata md_old(kRedisString, true, false);
Expand Down
Loading

0 comments on commit 7a08dfd

Please sign in to comment.