Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the new encoding with 64bit size and expire time in milliseconds #1342

Merged
merged 27 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ jobs:
os: ubuntu-20.04
without_luajit: -DUSE_LUAJIT=OFF
compiler: clang
- name: Ubuntu GCC with new encoding
os: ubuntu-20.04
compiler: gcc
new_encoding: -DENABLE_NEW_ENCODING=TRUE
- name: Ubuntu Clang with new encoding
os: ubuntu-20.04
compiler: clang
new_encoding: -DENABLE_NEW_ENCODING=TRUE

runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -182,7 +190,7 @@ jobs:
- name: Build Kvrocks
run: |
./x.py build -j$NPROC --unittest --compiler ${{ matrix.compiler }} ${{ matrix.without_jemalloc }} ${{ matrix.without_luajit }} \
${{ matrix.with_ninja }} ${{ matrix.with_sanitizer }} ${{ matrix.with_openssl }} ${{ env.CMAKE_EXTRA_DEFS }}
${{ matrix.with_ninja }} ${{ matrix.with_sanitizer }} ${{ matrix.with_openssl }} ${{ matrix.new_encoding }} ${{ env.CMAKE_EXTRA_DEFS }}

- name: Setup Coredump
if: ${{ startsWith(matrix.os, 'ubuntu') }}
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ option(ENABLE_OPENSSL "enable openssl to support tls connection" OFF)
option(ENABLE_IPO "enable interprocedural optimization" ON)
option(ENABLE_UNWIND "enable libunwind in glog" ON)
option(PORTABLE "build a portable binary (disable arch-specific optimizations)" OFF)
# TODO: set ENABLE_NEW_ENCODING to ON when we are ready
option(ENABLE_NEW_ENCODING "enable new encoding (#1033) for storing 64bit size and expire time in milliseconds" OFF)

if (CMAKE_VERSION VERSION_GREATER_EQUAL "3.24.0")
cmake_policy(SET CMP0135 NEW)
Expand Down Expand Up @@ -194,6 +196,9 @@ target_link_libraries(kvrocks_objs PUBLIC ${EXTERNAL_LIBS})
if(ENABLE_OPENSSL)
target_compile_definitions(kvrocks_objs PUBLIC ENABLE_OPENSSL)
endif()
if(ENABLE_NEW_ENCODING)
target_compile_definitions(kvrocks_objs PUBLIC ENABLE_NEW_ENCODING)
endif()

if(ENABLE_IPO)
include(CheckIPOSupported)
Expand Down
9 changes: 4 additions & 5 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,9 @@ StatusOr<KeyMigrationResult> SlotMigrate::MigrateOneKey(const rocksdb::Slice &ke

Status SlotMigrate::MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes,
std::string *restore_cmds) {
std::vector<std::string> command = {"set", key.ToString(),
bytes.substr(Redis::STRING_HDR_SIZE, bytes.size() - Redis::STRING_HDR_SIZE)};
std::vector<std::string> command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
if (metadata.expire > 0) {
command.emplace_back("EXAT");
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += Redis::MultiBulkString(command, false);
Expand Down Expand Up @@ -727,7 +726,7 @@ Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata

// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += Redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
current_pipeline_size_++;
}

Expand Down Expand Up @@ -803,7 +802,7 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada

// Add TTL
if (metadata.expire > 0) {
*restore_cmds += Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += Redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
current_pipeline_size_++;
}

Expand Down
54 changes: 17 additions & 37 deletions src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*
*/

#include <cstdint>

#include "commander.h"
#include "commands/ttl_util.h"
#include "error_constants.h"
Expand Down Expand Up @@ -69,10 +71,10 @@ class CommandTTL : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
int ttl = 0;
int64_t ttl = 0;
auto s = redis.TTL(args_[1], &ttl);
if (s.ok()) {
*output = Redis::Integer(ttl);
*output = Redis::Integer(ttl > 0 ? ttl / 1000 : ttl);
return Status::OK();
}

Expand All @@ -84,16 +86,11 @@ class CommandPTTL : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
int ttl = 0;
int64_t ttl = 0;
auto s = redis.TTL(args_[1], &ttl);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

if (ttl > 0) {
*output = Redis::Integer(ttl * 1000);
} else {
*output = Redis::Integer(ttl);
}

*output = Redis::Integer(ttl);
return Status::OK();
}
};
Expand All @@ -115,25 +112,16 @@ class CommandExists : public Commander {
}
};

StatusOr<int> TTLToTimestamp(int ttl) {
int64_t now = Util::GetTimeStamp();
if (ttl >= INT32_MAX - now) {
return {Status::RedisParseErr, "the expire time was overflow"};
}

return ttl + now;
}

class CommandExpire : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
seconds_ = GET_OR_RET(TTLToTimestamp(GET_OR_RET(ParseInt<int>(args[2], 10))));
seconds_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10)) + Util::GetTimeStamp();
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
auto s = redis.Expire(args_[1], seconds_);
auto s = redis.Expire(args_[1], seconds_ * 1000);
if (s.ok()) {
*output = Redis::Integer(1);
} else {
Expand All @@ -143,13 +131,13 @@ class CommandExpire : public Commander {
}

private:
int seconds_ = 0;
uint64_t seconds_ = 0;
};

class CommandPExpire : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
seconds_ = GET_OR_RET(TTLToTimestamp(TTLMsToS(GET_OR_RET(ParseInt<int64_t>(args[2], 10)))));
seconds_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10)) + Util::GetTimeStampMS();
return Status::OK();
}

Expand All @@ -165,7 +153,7 @@ class CommandPExpire : public Commander {
}

private:
int seconds_ = 0;
uint64_t seconds_ = 0;
};

class CommandExpireAt : public Commander {
Expand All @@ -176,18 +164,14 @@ class CommandExpireAt : public Commander {
return {Status::RedisParseErr, errValueNotInteger};
}

if (*parse_result >= INT32_MAX) {
return {Status::RedisParseErr, "the expire time was overflow"};
}

timestamp_ = static_cast<int>(*parse_result);
timestamp_ = *parse_result;

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Database redis(svr->storage_, conn->GetNamespace());
auto s = redis.Expire(args_[1], timestamp_);
auto s = redis.Expire(args_[1], timestamp_ * 1000);
if (s.ok()) {
*output = Redis::Integer(1);
} else {
Expand All @@ -197,7 +181,7 @@ class CommandExpireAt : public Commander {
}

private:
int timestamp_ = 0;
uint64_t timestamp_ = 0;
};

class CommandPExpireAt : public Commander {
Expand All @@ -208,11 +192,7 @@ class CommandPExpireAt : public Commander {
return {Status::RedisParseErr, errValueNotInteger};
}

if (*parse_result / 1000 >= INT32_MAX) {
return {Status::RedisParseErr, "the expire time was overflow"};
}

timestamp_ = static_cast<int>(*parse_result / 1000);
timestamp_ = *parse_result;

return Commander::Parse(args);
}
Expand All @@ -229,13 +209,13 @@ class CommandPExpireAt : public Commander {
}

private:
int timestamp_ = 0;
uint64_t timestamp_ = 0;
};

class CommandPersist : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
int ttl = 0;
int64_t ttl = 0;
Redis::Database redis(svr->storage_, conn->GetNamespace());
auto s = redis.TTL(args_[1], &ttl);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
Expand Down
25 changes: 11 additions & 14 deletions src/commands/cmd_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*
*/

#include <cstdint>
#include <optional>

#include "commander.h"
Expand Down Expand Up @@ -95,7 +96,7 @@ class CommandGetEx : public Commander {
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
bool persist_ = false;
};

Expand Down Expand Up @@ -311,14 +312,14 @@ class CommandSet : public Commander {
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
enum { NONE, NX, XX } set_flag_ = NONE;
};

class CommandSetEX : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[2], 10);
auto parse_result = ParseInt<int64_t>(args[2], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
Expand All @@ -332,13 +333,13 @@ class CommandSetEX : public Commander {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::String string_db(svr->storage_, conn->GetNamespace());
auto s = string_db.SetEX(args_[1], args_[3], ttl_);
auto s = string_db.SetEX(args_[1], args_[3], ttl_ * 1000);
*output = Redis::SimpleString("OK");
return Status::OK();
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
};

class CommandPSetEX : public Commander {
Expand All @@ -351,11 +352,7 @@ class CommandPSetEX : public Commander {

if (*ttl_ms <= 0) return {Status::RedisParseErr, errInvalidExpireTime};

if (*ttl_ms < 1000) {
ttl_ = 1;
} else {
ttl_ = static_cast<int>(*ttl_ms / 1000);
}
ttl_ = *ttl_ms;

return Commander::Parse(args);
}
Expand All @@ -368,7 +365,7 @@ class CommandPSetEX : public Commander {
}

private:
int ttl_ = 0;
int64_t ttl_ = 0;
};

class CommandMSet : public Commander {
Expand Down Expand Up @@ -552,9 +549,9 @@ class CommandCAS : public Commander {
std::string_view flag;
while (parser.Good()) {
if (parser.EatEqICaseFlag("EX", flag)) {
ttl_ = GET_OR_RET(parser.TakeInt<int>(TTL_RANGE<int>));
ttl_ = GET_OR_RET(parser.TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000;
} else if (parser.EatEqICaseFlag("PX", flag)) {
ttl_ = static_cast<int>(TTLMsToS(GET_OR_RET(parser.TakeInt<int64_t>(TTL_RANGE<int64_t>))));
ttl_ = GET_OR_RET(parser.TakeInt<int64_t>(TTL_RANGE<int64_t>));
} else {
return parser.InvalidSyntax();
}
Expand All @@ -575,7 +572,7 @@ class CommandCAS : public Commander {
}

private:
int ttl_ = 0;
uint64_t ttl_ = 0;
};

class CommandCAD : public Commander {
Expand Down
26 changes: 5 additions & 21 deletions src/commands/ttl_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,19 @@
#include "status.h"
#include "time_util.h"

template <typename T>
T TTLMsToS(T ttl) {
if (ttl <= 0) {
return ttl;
} else if (ttl < 1000) {
return 1;
} else {
return ttl / 1000;
}
}

inline int ExpireToTTL(int64_t expire) {
int64_t now = Util::GetTimeStamp();
return static_cast<int>(expire - now);
}

template <typename T>
constexpr auto TTL_RANGE = NumericRange<T>{1, std::numeric_limits<T>::max()};

template <typename T>
StatusOr<std::optional<int>> ParseTTL(CommandParser<T> &parser, std::string_view &curr_flag) {
StatusOr<std::optional<int64_t>> ParseTTL(CommandParser<T> &parser, std::string_view &curr_flag) {
if (parser.EatEqICaseFlag("EX", curr_flag)) {
return GET_OR_RET(parser.template TakeInt<int>(TTL_RANGE<int>));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000;
} else if (parser.EatEqICaseFlag("EXAT", curr_flag)) {
return ExpireToTTL(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000 - Util::GetTimeStampMS();
} else if (parser.EatEqICaseFlag("PX", curr_flag)) {
return TTLMsToS(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>));
} else if (parser.EatEqICaseFlag("PXAT", curr_flag)) {
return ExpireToTTL(TTLMsToS(GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>))));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) - Util::GetTimeStampMS();
} else {
return std::nullopt;
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
Metadata metadata(kRedisNone);
metadata.Decode(value.ToString());
if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(5, value.size() - 5)};
command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
if (metadata.expire > 0) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
} else if (metadata.expire > 0) {
Expand All @@ -74,7 +74,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
}
auto cmd = static_cast<RedisCommand>(*parse_result);
if (cmd == kRedisCmdExpire) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ bool SubKeyFilter::IsMetadataExpired(const InternalKey &ikey, const Metadata &me
// lazy delete to avoid race condition between command Expire and subkey Compaction
// Related issue:https://github.com/apache/incubator-kvrocks/issues/1298
//
// `Util::GetTimeStamp() - 300` means extending 5 minutes for expired items,
// `Util::GetTimeStampMS() - 300000` means extending 5 minutes for expired items,
// to prevent them from being recycled once they reach the expiration time.
int64_t lazy_expired_ts = Util::GetTimeStamp() - 300;
uint64_t lazy_expired_ts = Util::GetTimeStampMS() - 300000;
if (metadata.Type() == kRedisString // metadata key was overwrite by set command
|| metadata.ExpireAt(lazy_expired_ts) || ikey.GetVersion() != metadata.version) {
return true;
Expand Down
Loading