Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the new encoding with 64bit size and expire time in milliseconds #1342

Merged
merged 27 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,9 @@ StatusOr<KeyMigrationResult> SlotMigrate::MigrateOneKey(const rocksdb::Slice &ke

Status SlotMigrate::MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes,
std::string *restore_cmds) {
std::vector<std::string> command = {"set", key.ToString(),
bytes.substr(Redis::STRING_HDR_SIZE, bytes.size() - Redis::STRING_HDR_SIZE)};
std::vector<std::string> command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
if (metadata.expire > 0) {
command.emplace_back("EXAT");
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += Redis::MultiBulkString(command, false);
Expand Down Expand Up @@ -727,7 +726,7 @@ Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata

// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += Redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
current_pipeline_size_++;
}

Expand Down Expand Up @@ -803,7 +802,7 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada

// Add TTL
if (metadata.expire > 0) {
*restore_cmds += Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += Redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
current_pipeline_size_++;
}

Expand Down
54 changes: 17 additions & 37 deletions src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*
*/

#include <cstdint>

#include "commander.h"
#include "commands/ttl_util.h"
#include "error_constants.h"
Expand Down Expand Up @@ -69,10 +71,10 @@ class CommandTTL : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
int ttl = 0;
int64_t ttl = 0;
auto s = redis.TTL(args_[1], &ttl);
if (s.ok()) {
*output = Redis::Integer(ttl);
*output = Redis::Integer(ttl > 0 ? ttl / 1000 : ttl);
return Status::OK();
}

Expand All @@ -84,16 +86,11 @@ class CommandPTTL : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
int ttl = 0;
int64_t ttl = 0;
auto s = redis.TTL(args_[1], &ttl);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

if (ttl > 0) {
*output = Redis::Integer(ttl * 1000);
} else {
*output = Redis::Integer(ttl);
}

*output = Redis::Integer(ttl);
return Status::OK();
}
};
Expand All @@ -115,25 +112,16 @@ class CommandExists : public Commander {
}
};

StatusOr<int> TTLToTimestamp(int ttl) {
int64_t now = Util::GetTimeStamp();
if (ttl >= INT32_MAX - now) {
return {Status::RedisParseErr, "the expire time was overflow"};
}

return ttl + now;
}

class CommandExpire : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
seconds_ = GET_OR_RET(TTLToTimestamp(GET_OR_RET(ParseInt<int>(args[2], 10))));
seconds_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10)) + Util::GetTimeStamp();
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
auto s = redis.Expire(args_[1], seconds_);
auto s = redis.Expire(args_[1], seconds_ * 1000);
if (s.ok()) {
*output = Redis::Integer(1);
} else {
Expand All @@ -143,13 +131,13 @@ class CommandExpire : public Commander {
}

private:
int seconds_ = 0;
uint64_t seconds_ = 0;
};

class CommandPExpire : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
seconds_ = GET_OR_RET(TTLToTimestamp(TTLMsToS(GET_OR_RET(ParseInt<int64_t>(args[2], 10)))));
seconds_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10)) + Util::GetTimeStampMS();
return Status::OK();
}

Expand All @@ -165,7 +153,7 @@ class CommandPExpire : public Commander {
}

private:
int seconds_ = 0;
uint64_t seconds_ = 0;
};

class CommandExpireAt : public Commander {
Expand All @@ -176,18 +164,14 @@ class CommandExpireAt : public Commander {
return {Status::RedisParseErr, errValueNotInteger};
}

if (*parse_result >= INT32_MAX) {
return {Status::RedisParseErr, "the expire time was overflow"};
}

timestamp_ = static_cast<int>(*parse_result);
timestamp_ = *parse_result;

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
auto s = redis.Expire(args_[1], timestamp_);
auto s = redis.Expire(args_[1], timestamp_ * 1000);
if (s.ok()) {
*output = Redis::Integer(1);
} else {
Expand All @@ -197,7 +181,7 @@ class CommandExpireAt : public Commander {
}

private:
int timestamp_ = 0;
uint64_t timestamp_ = 0;
};

class CommandPExpireAt : public Commander {
Expand All @@ -208,11 +192,7 @@ class CommandPExpireAt : public Commander {
return {Status::RedisParseErr, errValueNotInteger};
}

if (*parse_result / 1000 >= INT32_MAX) {
return {Status::RedisParseErr, "the expire time was overflow"};
}

timestamp_ = static_cast<int>(*parse_result / 1000);
timestamp_ = *parse_result;

return Commander::Parse(args);
}
Expand All @@ -229,13 +209,13 @@ class CommandPExpireAt : public Commander {
}

private:
int timestamp_ = 0;
uint64_t timestamp_ = 0;
};

class CommandPersist : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
int ttl = 0;
int64_t ttl = 0;
Redis::Database redis(svr->storage_, conn->GetNamespace());
auto s = redis.TTL(args_[1], &ttl);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
Expand Down
25 changes: 11 additions & 14 deletions src/commands/cmd_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*
*/

#include <cstdint>
#include <optional>

#include "commander.h"
Expand Down Expand Up @@ -95,7 +96,7 @@ class CommandGetEx : public Commander {
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
bool persist_ = false;
};

Expand Down Expand Up @@ -311,14 +312,14 @@ class CommandSet : public Commander {
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
enum { NONE, NX, XX } set_flag_ = NONE;
};

class CommandSetEX : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[2], 10);
auto parse_result = ParseInt<int64_t>(args[2], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
Expand All @@ -332,13 +333,13 @@ class CommandSetEX : public Commander {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::String string_db(svr->storage_, conn->GetNamespace());
auto s = string_db.SetEX(args_[1], args_[3], ttl_);
auto s = string_db.SetEX(args_[1], args_[3], ttl_ * 1000);
*output = Redis::SimpleString("OK");
return Status::OK();
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
};

class CommandPSetEX : public Commander {
Expand All @@ -351,11 +352,7 @@ class CommandPSetEX : public Commander {

if (*ttl_ms <= 0) return {Status::RedisParseErr, errInvalidExpireTime};

if (*ttl_ms < 1000) {
ttl_ = 1;
} else {
ttl_ = static_cast<int>(*ttl_ms / 1000);
}
ttl_ = *ttl_ms;

return Commander::Parse(args);
}
Expand All @@ -368,7 +365,7 @@ class CommandPSetEX : public Commander {
}

private:
int ttl_ = 0;
int64_t ttl_ = 0;
};

class CommandMSet : public Commander {
Expand Down Expand Up @@ -552,9 +549,9 @@ class CommandCAS : public Commander {
std::string_view flag;
while (parser.Good()) {
if (parser.EatEqICaseFlag("EX", flag)) {
ttl_ = GET_OR_RET(parser.TakeInt<int>(TTL_RANGE<int>));
ttl_ = GET_OR_RET(parser.TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000;
} else if (parser.EatEqICaseFlag("PX", flag)) {
ttl_ = static_cast<int>(TTLMsToS(GET_OR_RET(parser.TakeInt<int64_t>(TTL_RANGE<int64_t>))));
ttl_ = GET_OR_RET(parser.TakeInt<int64_t>(TTL_RANGE<int64_t>));
} else {
return parser.InvalidSyntax();
}
Expand All @@ -575,7 +572,7 @@ class CommandCAS : public Commander {
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
};

class CommandCAD : public Commander {
Expand Down
27 changes: 8 additions & 19 deletions src/commands/ttl_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,24 @@
#include "status.h"
#include "time_util.h"

template <typename T>
T TTLMsToS(T ttl) {
if (ttl <= 0) {
return ttl;
} else if (ttl < 1000) {
return 1;
} else {
return ttl / 1000;
}
}

inline int ExpireToTTL(int64_t expire) {
int64_t now = Util::GetTimeStamp();
return static_cast<int>(expire - now);
inline uint64_t ExpireToTTL(uint64_t expire) {
uint64_t now = Util::GetTimeStampMS();
return expire - now;
}

template <typename T>
constexpr auto TTL_RANGE = NumericRange<T>{1, std::numeric_limits<T>::max()};

template <typename T>
StatusOr<std::optional<int>> ParseTTL(CommandParser<T> &parser, std::string_view &curr_flag) {
StatusOr<std::optional<int64_t>> ParseTTL(CommandParser<T> &parser, std::string_view &curr_flag) {
if (parser.EatEqICaseFlag("EX", curr_flag)) {
return GET_OR_RET(parser.template TakeInt<int>(TTL_RANGE<int>));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000;
} else if (parser.EatEqICaseFlag("EXAT", curr_flag)) {
return ExpireToTTL(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)));
return ExpireToTTL(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000);
} else if (parser.EatEqICaseFlag("PX", curr_flag)) {
return TTLMsToS(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>));
} else if (parser.EatEqICaseFlag("PXAT", curr_flag)) {
return ExpireToTTL(TTLMsToS(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>))));
return ExpireToTTL(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)));
} else {
return std::nullopt;
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
Metadata metadata(kRedisNone);
metadata.Decode(value.ToString());
if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(5, value.size() - 5)};
command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
if (metadata.expire > 0) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
} else if (metadata.expire > 0) {
Expand All @@ -74,7 +74,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
}
auto cmd = static_cast<RedisCommand>(*parse_result);
if (cmd == kRedisCmdExpire) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bool SubKeyFilter::IsMetadataExpired(const InternalKey &ikey, const Metadata &me
//
// `Util::GetTimeStamp() - 300` means extending 5 minutes for expired items,
PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
// to prevent them from being recycled once they reach the expiration time.
int64_t lazy_expired_ts = Util::GetTimeStamp() - 300;
uint64_t lazy_expired_ts = Util::GetTimeStampMS() - 300000;
if (metadata.Type() == kRedisString // metadata key was overwrite by set command
|| metadata.ExpireAt(lazy_expired_ts) || ikey.GetVersion() != metadata.version) {
return true;
Expand Down
Loading