diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml index fdd173e045f..e98a766f6bb 100644 --- a/.github/workflows/kvrocks.yaml +++ b/.github/workflows/kvrocks.yaml @@ -140,6 +140,14 @@ jobs: os: ubuntu-20.04 without_luajit: -DUSE_LUAJIT=OFF compiler: clang + - name: Ubuntu GCC with new encoding + os: ubuntu-20.04 + compiler: gcc + new_encoding: -DENABLE_NEW_ENCODING=TRUE + - name: Ubuntu Clang with new encoding + os: ubuntu-20.04 + compiler: clang + new_encoding: -DENABLE_NEW_ENCODING=TRUE runs-on: ${{ matrix.os }} steps: @@ -182,7 +190,7 @@ jobs: - name: Build Kvrocks run: | ./x.py build -j$NPROC --unittest --compiler ${{ matrix.compiler }} ${{ matrix.without_jemalloc }} ${{ matrix.without_luajit }} \ - ${{ matrix.with_ninja }} ${{ matrix.with_sanitizer }} ${{ matrix.with_openssl }} ${{ env.CMAKE_EXTRA_DEFS }} + ${{ matrix.with_ninja }} ${{ matrix.with_sanitizer }} ${{ matrix.with_openssl }} ${{ matrix.new_encoding }} ${{ env.CMAKE_EXTRA_DEFS }} - name: Setup Coredump if: ${{ startsWith(matrix.os, 'ubuntu') }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 30352e3db86..0c522074da1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,8 @@ option(ENABLE_OPENSSL "enable openssl to support tls connection" OFF) option(ENABLE_IPO "enable interprocedural optimization" ON) option(ENABLE_UNWIND "enable libunwind in glog" ON) option(PORTABLE "build a portable binary (disable arch-specific optimizations)" OFF) +# TODO: set ENABLE_NEW_ENCODING to ON when we are ready +option(ENABLE_NEW_ENCODING "enable new encoding (#1033) for storing 64bit size and expire time in milliseconds" OFF) if (CMAKE_VERSION VERSION_GREATER_EQUAL "3.24.0") cmake_policy(SET CMP0135 NEW) @@ -194,6 +196,9 @@ target_link_libraries(kvrocks_objs PUBLIC ${EXTERNAL_LIBS}) if(ENABLE_OPENSSL) target_compile_definitions(kvrocks_objs PUBLIC ENABLE_OPENSSL) endif() +if(ENABLE_NEW_ENCODING) + target_compile_definitions(kvrocks_objs PUBLIC ENABLE_NEW_ENCODING) +endif() if(ENABLE_IPO) include(CheckIPOSupported) diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index b62562aceaf..23e187188ee 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -615,10 +615,9 @@ StatusOr SlotMigrate::MigrateOneKey(const rocksdb::Slice &ke Status SlotMigrate::MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes, std::string *restore_cmds) { - std::vector command = {"set", key.ToString(), - bytes.substr(Redis::STRING_HDR_SIZE, bytes.size() - Redis::STRING_HDR_SIZE)}; + std::vector command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))}; if (metadata.expire > 0) { - command.emplace_back("EXAT"); + command.emplace_back("PXAT"); command.emplace_back(std::to_string(metadata.expire)); } *restore_cmds += Redis::MultiBulkString(command, false); @@ -727,7 +726,7 @@ Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata // Add TTL for complex key if (metadata.expire > 0) { - *restore_cmds += Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false); + *restore_cmds += Redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false); current_pipeline_size_++; } @@ -803,7 +802,7 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada // Add TTL if (metadata.expire > 0) { - *restore_cmds += Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false); + *restore_cmds += Redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false); current_pipeline_size_++; } diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc index 0df304c0998..8a75b2db616 100644 --- a/src/commands/cmd_key.cc +++ b/src/commands/cmd_key.cc @@ -18,6 +18,8 @@ * */ +#include + #include "commander.h" #include "commands/ttl_util.h" #include "error_constants.h" @@ -69,10 +71,10 @@ class CommandTTL : public Commander { public: Status Execute(Server *svr, Connection *conn, std::string *output) override { Redis::Database redis(svr->storage_, conn->GetNamespace()); - int ttl = 0; + int64_t ttl = 0; auto s = redis.TTL(args_[1], &ttl); if (s.ok()) { - *output = Redis::Integer(ttl); + *output = Redis::Integer(ttl > 0 ? ttl / 1000 : ttl); return Status::OK(); } @@ -84,16 +86,11 @@ class CommandPTTL : public Commander { public: Status Execute(Server *svr, Connection *conn, std::string *output) override { Redis::Database redis(svr->storage_, conn->GetNamespace()); - int ttl = 0; + int64_t ttl = 0; auto s = redis.TTL(args_[1], &ttl); if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - if (ttl > 0) { - *output = Redis::Integer(ttl * 1000); - } else { - *output = Redis::Integer(ttl); - } - + *output = Redis::Integer(ttl); return Status::OK(); } }; @@ -115,25 +112,16 @@ class CommandExists : public Commander { } }; -StatusOr TTLToTimestamp(int ttl) { - int64_t now = Util::GetTimeStamp(); - if (ttl >= INT32_MAX - now) { - return {Status::RedisParseErr, "the expire time was overflow"}; - } - - return ttl + now; -} - class CommandExpire : public Commander { public: Status Parse(const std::vector &args) override { - seconds_ = GET_OR_RET(TTLToTimestamp(GET_OR_RET(ParseInt(args[2], 10)))); + seconds_ = GET_OR_RET(ParseInt(args[2], 10)) + Util::GetTimeStamp(); return Status::OK(); } Status Execute(Server *svr, Connection *conn, std::string *output) override { Redis::Database redis(svr->storage_, conn->GetNamespace()); - auto s = redis.Expire(args_[1], seconds_); + auto s = redis.Expire(args_[1], seconds_ * 1000); if (s.ok()) { *output = Redis::Integer(1); } else { @@ -143,13 +131,13 @@ class CommandExpire : public Commander { } private: - int seconds_ = 0; + uint64_t seconds_ = 0; }; class CommandPExpire : public Commander { public: Status Parse(const std::vector &args) override { - seconds_ = GET_OR_RET(TTLToTimestamp(TTLMsToS(GET_OR_RET(ParseInt(args[2], 10))))); + seconds_ = GET_OR_RET(ParseInt(args[2], 10)) + Util::GetTimeStampMS(); return Status::OK(); } @@ -165,7 +153,7 @@ class CommandPExpire : public Commander { } private: - int seconds_ = 0; + uint64_t seconds_ = 0; }; class CommandExpireAt : public Commander { @@ -176,18 +164,14 @@ class CommandExpireAt : public Commander { return {Status::RedisParseErr, errValueNotInteger}; } - if (*parse_result >= INT32_MAX) { - return {Status::RedisParseErr, "the expire time was overflow"}; - } - - timestamp_ = static_cast(*parse_result); + timestamp_ = *parse_result; return Commander::Parse(args); } Status Execute(Server *svr, Connection *conn, std::string *output) override { Redis::Database redis(svr->storage_, conn->GetNamespace()); - auto s = redis.Expire(args_[1], timestamp_); + auto s = redis.Expire(args_[1], timestamp_ * 1000); if (s.ok()) { *output = Redis::Integer(1); } else { @@ -197,7 +181,7 @@ class CommandExpireAt : public Commander { } private: - int timestamp_ = 0; + uint64_t timestamp_ = 0; }; class CommandPExpireAt : public Commander { @@ -208,11 +192,7 @@ class CommandPExpireAt : public Commander { return {Status::RedisParseErr, errValueNotInteger}; } - if (*parse_result / 1000 >= INT32_MAX) { - return {Status::RedisParseErr, "the expire time was overflow"}; - } - - timestamp_ = static_cast(*parse_result / 1000); + timestamp_ = *parse_result; return Commander::Parse(args); } @@ -229,13 +209,13 @@ class CommandPExpireAt : public Commander { } private: - int timestamp_ = 0; + uint64_t timestamp_ = 0; }; class CommandPersist : public Commander { public: Status Execute(Server *svr, Connection *conn, std::string *output) override { - int ttl = 0; + int64_t ttl = 0; Redis::Database redis(svr->storage_, conn->GetNamespace()); auto s = redis.TTL(args_[1], &ttl); if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; diff --git a/src/commands/cmd_string.cc b/src/commands/cmd_string.cc index 9184a3d69c2..3b3d56a5875 100644 --- a/src/commands/cmd_string.cc +++ b/src/commands/cmd_string.cc @@ -18,6 +18,7 @@ * */ +#include #include #include "commander.h" @@ -95,7 +96,7 @@ class CommandGetEx : public Commander { } private: - int ttl_ = 0; + uint64_t ttl_ = 0; bool persist_ = false; }; @@ -311,14 +312,14 @@ class CommandSet : public Commander { } private: - int ttl_ = 0; + uint64_t ttl_ = 0; enum { NONE, NX, XX } set_flag_ = NONE; }; class CommandSetEX : public Commander { public: Status Parse(const std::vector &args) override { - auto parse_result = ParseInt(args[2], 10); + auto parse_result = ParseInt(args[2], 10); if (!parse_result) { return {Status::RedisParseErr, errValueNotInteger}; } @@ -332,13 +333,13 @@ class CommandSetEX : public Commander { Status Execute(Server *svr, Connection *conn, std::string *output) override { Redis::String string_db(svr->storage_, conn->GetNamespace()); - auto s = string_db.SetEX(args_[1], args_[3], ttl_); + auto s = string_db.SetEX(args_[1], args_[3], ttl_ * 1000); *output = Redis::SimpleString("OK"); return Status::OK(); } private: - int ttl_ = 0; + uint64_t ttl_ = 0; }; class CommandPSetEX : public Commander { @@ -351,11 +352,7 @@ class CommandPSetEX : public Commander { if (*ttl_ms <= 0) return {Status::RedisParseErr, errInvalidExpireTime}; - if (*ttl_ms < 1000) { - ttl_ = 1; - } else { - ttl_ = static_cast(*ttl_ms / 1000); - } + ttl_ = *ttl_ms; return Commander::Parse(args); } @@ -368,7 +365,7 @@ class CommandPSetEX : public Commander { } private: - int ttl_ = 0; + int64_t ttl_ = 0; }; class CommandMSet : public Commander { @@ -552,9 +549,9 @@ class CommandCAS : public Commander { std::string_view flag; while (parser.Good()) { if (parser.EatEqICaseFlag("EX", flag)) { - ttl_ = GET_OR_RET(parser.TakeInt(TTL_RANGE)); + ttl_ = GET_OR_RET(parser.TakeInt(TTL_RANGE)) * 1000; } else if (parser.EatEqICaseFlag("PX", flag)) { - ttl_ = static_cast(TTLMsToS(GET_OR_RET(parser.TakeInt(TTL_RANGE)))); + ttl_ = GET_OR_RET(parser.TakeInt(TTL_RANGE)); } else { return parser.InvalidSyntax(); } @@ -575,7 +572,7 @@ class CommandCAS : public Commander { } private: - int ttl_ = 0; + uint64_t ttl_ = 0; }; class CommandCAD : public Commander { diff --git a/src/commands/ttl_util.h b/src/commands/ttl_util.h index abed93d299d..64f11a21eaf 100644 --- a/src/commands/ttl_util.h +++ b/src/commands/ttl_util.h @@ -27,35 +27,19 @@ #include "status.h" #include "time_util.h" -template -T TTLMsToS(T ttl) { - if (ttl <= 0) { - return ttl; - } else if (ttl < 1000) { - return 1; - } else { - return ttl / 1000; - } -} - -inline int ExpireToTTL(int64_t expire) { - int64_t now = Util::GetTimeStamp(); - return static_cast(expire - now); -} - template constexpr auto TTL_RANGE = NumericRange{1, std::numeric_limits::max()}; template -StatusOr> ParseTTL(CommandParser &parser, std::string_view &curr_flag) { +StatusOr> ParseTTL(CommandParser &parser, std::string_view &curr_flag) { if (parser.EatEqICaseFlag("EX", curr_flag)) { - return GET_OR_RET(parser.template TakeInt(TTL_RANGE)); + return GET_OR_RET(parser.template TakeInt(TTL_RANGE)) * 1000; } else if (parser.EatEqICaseFlag("EXAT", curr_flag)) { - return ExpireToTTL(GET_OR_RET(parser.template TakeInt(TTL_RANGE))); + return GET_OR_RET(parser.template TakeInt(TTL_RANGE)) * 1000 - Util::GetTimeStampMS(); } else if (parser.EatEqICaseFlag("PX", curr_flag)) { - return TTLMsToS(GET_OR_RET(parser.template TakeInt(TTL_RANGE))); + return GET_OR_RET(parser.template TakeInt(TTL_RANGE)); } else if (parser.EatEqICaseFlag("PXAT", curr_flag)) { - return ExpireToTTL(TTLMsToS(GET_OR_RET(parser.template TakeInt(TTL_RANGE)))); + return GET_OR_RET(parser.template TakeInt(TTL_RANGE)) - Util::GetTimeStampMS(); } else { return std::nullopt; } diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index ac57ffdc5a3..cd7453ab13c 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -59,10 +59,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic Metadata metadata(kRedisNone); metadata.Decode(value.ToString()); if (metadata.Type() == kRedisString) { - command_args = {"SET", user_key, value.ToString().substr(5, value.size() - 5)}; + command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}; resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args)); if (metadata.expire > 0) { - command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)}; + command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args)); } } else if (metadata.expire > 0) { @@ -74,7 +74,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic } auto cmd = static_cast(*parse_result); if (cmd == kRedisCmdExpire) { - command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)}; + command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args)); } } diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc index 812544d8ff7..3b4848139e2 100644 --- a/src/storage/compact_filter.cc +++ b/src/storage/compact_filter.cc @@ -90,9 +90,9 @@ bool SubKeyFilter::IsMetadataExpired(const InternalKey &ikey, const Metadata &me // lazy delete to avoid race condition between command Expire and subkey Compaction // Related issue:https://github.com/apache/incubator-kvrocks/issues/1298 // - // `Util::GetTimeStamp() - 300` means extending 5 minutes for expired items, + // `Util::GetTimeStampMS() - 300000` means extending 5 minutes for expired items, // to prevent them from being recycled once they reach the expiration time. - int64_t lazy_expired_ts = Util::GetTimeStamp() - 300; + uint64_t lazy_expired_ts = Util::GetTimeStampMS() - 300000; if (metadata.Type() == kRedisString // metadata key was overwrite by set command || metadata.ExpireAt(lazy_expired_ts) || ikey.GetVersion() != metadata.version) { return true; diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 82f4174b52f..c347caa485f 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -30,6 +30,7 @@ #include "rocksdb/iterator.h" #include "server/server.h" #include "storage/redis_metadata.h" +#include "time_util.h" namespace Redis { @@ -74,7 +75,7 @@ rocksdb::Status Database::GetRawMetadataByUserKey(const Slice &user_key, std::st return GetRawMetadata(ns_key, bytes); } -rocksdb::Status Database::Expire(const Slice &user_key, int timestamp) { +rocksdb::Status Database::Expire(const Slice &user_key, uint64_t timestamp) { std::string ns_key; AppendNamespacePrefix(user_key, &ns_key); @@ -92,14 +93,16 @@ rocksdb::Status Database::Expire(const Slice &user_key, int timestamp) { } if (metadata.expire == timestamp) return rocksdb::Status::OK(); - auto buf = std::make_unique(value.size()); - memcpy(buf.get(), value.data(), value.size()); // +1 to skip the flags - EncodeFixed32(buf.get() + 1, (uint32_t)timestamp); + if (metadata.Is64BitEncoded()) { + EncodeFixed64(value.data() + 1, timestamp); + } else { + EncodeFixed32(value.data() + 1, Metadata::ExpireMsToS(timestamp)); + } auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)}); batch->PutLogData(log_data.Encode()); - batch->Put(metadata_cf_handle_, ns_key, Slice(buf.get(), value.size())); + batch->Put(metadata_cf_handle_, ns_key, value); s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); return s; } @@ -140,7 +143,7 @@ rocksdb::Status Database::Exists(const std::vector &keys, int *ret) { return rocksdb::Status::OK(); } -rocksdb::Status Database::TTL(const Slice &user_key, int *ttl) { +rocksdb::Status Database::TTL(const Slice &user_key, int64_t *ttl) { std::string ns_key; AppendNamespacePrefix(user_key, &ns_key); @@ -155,6 +158,7 @@ rocksdb::Status Database::TTL(const Slice &user_key, int *ttl) { Metadata metadata(kRedisNone, false); metadata.Decode(value); *ttl = metadata.TTL(); + return rocksdb::Status::OK(); } @@ -196,7 +200,7 @@ void Database::Keys(const std::string &prefix, std::vector *keys, K continue; } if (stats) { - int32_t ttl = metadata.TTL(); + int64_t ttl = metadata.TTL(); stats->n_key++; if (ttl != -1) { stats->n_expires++; @@ -219,7 +223,7 @@ void Database::Keys(const std::string &prefix, std::vector *keys, K } if (stats && stats->n_expires > 0) { - stats->avg_ttl = ttl_sum / stats->n_expires; + stats->avg_ttl = ttl_sum / stats->n_expires / 1000; } } @@ -399,9 +403,13 @@ rocksdb::Status Database::Dump(const Slice &user_key, std::vector * infos->emplace_back("version"); infos->emplace_back(std::to_string(metadata.version)); infos->emplace_back("expire"); + infos->emplace_back(std::to_string(Metadata::ExpireMsToS(metadata.expire))); + infos->emplace_back("pexpire"); infos->emplace_back(std::to_string(metadata.expire)); infos->emplace_back("size"); infos->emplace_back(std::to_string(metadata.size)); + infos->emplace_back("is_64bit_common_field"); + infos->emplace_back(std::to_string(metadata.Is64BitEncoded())); infos->emplace_back("created_at"); struct timeval created_at = metadata.Time(); diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index f19322b7f13..18012bf5aee 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -35,10 +35,10 @@ class Database { rocksdb::Status GetMetadata(RedisType type, const Slice &ns_key, Metadata *metadata); rocksdb::Status GetRawMetadata(const Slice &ns_key, std::string *bytes); rocksdb::Status GetRawMetadataByUserKey(const Slice &user_key, std::string *bytes); - rocksdb::Status Expire(const Slice &user_key, int timestamp); + rocksdb::Status Expire(const Slice &user_key, uint64_t timestamp); rocksdb::Status Del(const Slice &user_key); rocksdb::Status Exists(const std::vector &keys, int *ret); - rocksdb::Status TTL(const Slice &user_key, int *ttl); + rocksdb::Status TTL(const Slice &user_key, int64_t *ttl); rocksdb::Status Type(const Slice &user_key, RedisType *type); rocksdb::Status Dump(const Slice &user_key, std::vector *infos); rocksdb::Status FlushDB(); diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index e4ecd1ad09b..c6a8c850467 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -24,16 +24,18 @@ #include #include +#include #include #include #include "cluster/redis_slot.h" +#include "encoding.h" #include "time_util.h" // 52 bit for microseconds and 11 bit for counter const int VersionCounterBits = 11; -static std::atomic version_counter_ = {0}; +static std::atomic version_counter_ = 0; constexpr const char *kErrMetadataTooShort = "metadata is too short"; @@ -145,33 +147,39 @@ void ComposeSlotKeyPrefix(const Slice &ns, int slotid, std::string *output) { PutFixed16(output, static_cast(slotid)); } -Metadata::Metadata(RedisType type, bool generate_version) - : flags((uint8_t)0x0f & type), expire(0), version(0), size(0) { - if (generate_version) version = generateVersion(); -} +Metadata::Metadata(RedisType type, bool generate_version, bool use_64bit_common_field) + : flags((use_64bit_common_field ? METADATA_64BIT_ENCODING_MASK : 0) | (METADATA_TYPE_MASK & type)), + expire(0), + version(generate_version ? generateVersion() : 0), + size(0) {} rocksdb::Status Metadata::Decode(const std::string &bytes) { - // flags(1byte) + expire (4byte) - if (bytes.size() < 5) { + Slice input(bytes); + if (!GetFixed8(&input, &flags)) { return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); } - Slice input(bytes); - GetFixed8(&input, &flags); - GetFixed32(&input, reinterpret_cast(&expire)); + + if (!GetExpire(&input)) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + if (Type() != kRedisString) { - if (input.size() < 12) return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + if (input.size() < 8 + CommonEncodedSize()) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } GetFixed64(&input, &version); - GetFixed32(&input, &size); + GetFixedCommon(&input, &size); } + return rocksdb::Status::OK(); } void Metadata::Encode(std::string *dst) { PutFixed8(dst, flags); - PutFixed32(dst, (uint32_t)expire); + PutExpire(dst); if (Type() != kRedisString) { PutFixed64(dst, version); - PutFixed32(dst, size); + PutFixedCommon(dst, size); } } @@ -198,17 +206,94 @@ bool Metadata::operator==(const Metadata &that) const { return true; } -RedisType Metadata::Type() const { return static_cast(flags & (uint8_t)0x0f); } +RedisType Metadata::Type() const { return static_cast(flags & METADATA_TYPE_MASK); } + +size_t Metadata::GetOffsetAfterExpire(uint8_t flags) { + if (flags & METADATA_64BIT_ENCODING_MASK) { + return 1 + 8; + } + + return 1 + 4; +} + +size_t Metadata::GetOffsetAfterSize(uint8_t flags) { + if (flags & METADATA_64BIT_ENCODING_MASK) { + return 1 + 8 + 8 + 8; + } + + return 1 + 4 + 8 + 4; +} + +uint64_t Metadata::ExpireMsToS(uint64_t ms) { + if (ms == 0) { + return 0; + } + + if (ms < 1000) { + return 1; + } + + return ms / 1000; +} + +bool Metadata::Is64BitEncoded() const { return flags & METADATA_64BIT_ENCODING_MASK; } + +size_t Metadata::CommonEncodedSize() const { return Is64BitEncoded() ? 8 : 4; } + +bool Metadata::GetFixedCommon(rocksdb::Slice *input, uint64_t *value) const { + if (Is64BitEncoded()) { + return GetFixed64(input, value); + } else { + uint32_t v = 0; + bool res = GetFixed32(input, &v); + *value = v; + return res; + } +} + +bool Metadata::GetExpire(rocksdb::Slice *input) { + uint64_t v = 0; + + if (!GetFixedCommon(input, &v)) { + return false; + } + + if (Is64BitEncoded()) { + expire = v; + } else { + expire = v * 1000; + } + + return true; +} + +void Metadata::PutFixedCommon(std::string *dst, uint64_t value) const { + if (Is64BitEncoded()) { + PutFixed64(dst, value); + } else { + PutFixed32(dst, value); + } +} + +void Metadata::PutExpire(std::string *dst) { + if (Is64BitEncoded()) { + PutFixed64(dst, expire); + } else { + PutFixed32(dst, ExpireMsToS(expire)); + } +} -int32_t Metadata::TTL() const { - if (expire <= 0) { +int64_t Metadata::TTL() const { + if (expire == 0) { return -1; } - auto now = Util::GetTimeStamp(); + + auto now = Util::GetTimeStampMS(); if (expire < now) { return -2; } - return int32_t(expire - now); + + return int64_t(expire - now); } timeval Metadata::Time() const { @@ -219,17 +304,18 @@ timeval Metadata::Time() const { return created_at; } -bool Metadata::ExpireAt(int64_t expired_ts) const { +bool Metadata::ExpireAt(uint64_t expired_ts) const { if (Type() != kRedisString && Type() != kRedisStream && size == 0) { return true; } - if (expire <= 0) { + if (expire == 0) { return false; } + return expire < expired_ts; } -bool Metadata::Expired() const { return ExpireAt(Util::GetTimeStamp()); } +bool Metadata::Expired() const { return ExpireAt(Util::GetTimeStampMS()); } ListMetadata::ListMetadata(bool generate_version) : Metadata(kRedisList, generate_version), head(UINT64_MAX / 2), tail(head) {} @@ -243,14 +329,14 @@ void ListMetadata::Encode(std::string *dst) { rocksdb::Status ListMetadata::Decode(const std::string &bytes) { Slice input(bytes); GetFixed8(&input, &flags); - GetFixed32(&input, reinterpret_cast(&expire)); + GetExpire(&input); if (Type() != kRedisString) { - if (input.size() < 12) return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + if (input.size() < 8 + CommonEncodedSize()) return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); GetFixed64(&input, &version); - GetFixed32(&input, &size); + GetFixedCommon(&input, &size); } if (Type() == kRedisList) { - if (input.size() < 16) return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + if (input.size() < 8 + 8) return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); GetFixed64(&input, &head); GetFixed64(&input, &tail); } @@ -279,21 +365,20 @@ void StreamMetadata::Encode(std::string *dst) { } rocksdb::Status StreamMetadata::Decode(const std::string &bytes) { - // flags(1byte) + expire (4byte) - if (bytes.size() < 5) { + Slice input(bytes); + if (!GetFixed8(&input, &flags)) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + if (!GetExpire(&input)) { return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); } - Slice input(bytes); - GetFixed8(&input, &flags); - GetFixed32(&input, reinterpret_cast(&expire)); - - if (input.size() < 12) { + if (input.size() < 8 + CommonEncodedSize()) { return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); } GetFixed64(&input, &version); - GetFixed32(&input, &size); + GetFixedCommon(&input, &size); if (input.size() < 88) { return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 5d052d07698..062dfce0904 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -29,6 +29,14 @@ #include "encoding.h" #include "types/redis_stream_base.h" +constexpr bool USE_64BIT_COMMON_FIELD_DEFAULT = +#ifdef ENABLE_NEW_ENCODING + true +#else + false +#endif + ; + enum RedisType { kRedisNone, kRedisString, @@ -97,22 +105,47 @@ class InternalKey { bool slot_id_encoded_; }; +constexpr uint8_t METADATA_64BIT_ENCODING_MASK = 0x80; +constexpr uint8_t METADATA_TYPE_MASK = 0x0f; + class Metadata { public: + // metadata flags + // <(1-bit) 64bit-common-field-indicator> 0 0 0 <(4-bit) redis-type> + // 64bit-common-field-indicator: make `expire` and `size` 64bit instead of 32bit + // NOTE: `expire` is stored in milliseconds for 64bit, seconds for 32bit + // redis-type: RedisType for the key-value uint8_t flags; - int expire; + + // expire timestamp, in milliseconds + uint64_t expire; + + // the current version: 53bit timestamp + 11bit counter uint64_t version; - uint32_t size; - public: - explicit Metadata(RedisType type, bool generate_version = true); + // element size of the key-value + uint64_t size; + + explicit Metadata(RedisType type, bool generate_version = true, + bool use_64bit_common_field = USE_64BIT_COMMON_FIELD_DEFAULT); static void InitVersionCounter(); + static size_t GetOffsetAfterExpire(uint8_t flags); + static size_t GetOffsetAfterSize(uint8_t flags); + static uint64_t ExpireMsToS(uint64_t ms); + + bool Is64BitEncoded() const; + bool GetFixedCommon(rocksdb::Slice *input, uint64_t *value) const; + bool GetExpire(rocksdb::Slice *input); + void PutFixedCommon(std::string *dst, uint64_t value) const; + void PutExpire(std::string *dst); + RedisType Type() const; - virtual int32_t TTL() const; - virtual timeval Time() const; - virtual bool Expired() const; - virtual bool ExpireAt(int64_t expired_ts) const; + size_t CommonEncodedSize() const; + int64_t TTL() const; + timeval Time() const; + bool Expired() const; + bool ExpireAt(uint64_t expired_ts) const; virtual void Encode(std::string *dst); virtual rocksdb::Status Decode(const std::string &bytes); bool operator==(const Metadata &that) const; diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index 2f01164ee0d..bf5fbf115c0 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -193,8 +193,8 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool new_ if (!s.ok() && !s.IsNotFound()) return s; } uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes; - uint32_t used_size = index + byte_index + 1; - uint32_t bitmap_size = std::max(used_size, metadata.size); + uint64_t used_size = index + byte_index + 1; + uint64_t bitmap_size = std::max(used_size, metadata.size); if (byte_index >= value.size()) { // expand the bitmap size_t expand_size = 0; if (byte_index >= value.size() * 2) { @@ -240,9 +240,9 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key, int64_t start, int64_t s return bitmap_string_db.BitCount(raw_value, start, stop, cnt); } - if (start < 0) start += metadata.size + 1; - if (stop < 0) stop += metadata.size + 1; - if (stop > static_cast(metadata.size)) stop = metadata.size; + if (start < 0) start += static_cast(metadata.size) + 1; + if (stop < 0) stop += static_cast(metadata.size) + 1; + if (stop > static_cast(metadata.size)) stop = static_cast(metadata.size); if (start < 0 || stop <= 0 || start >= stop) return rocksdb::Status::OK(); auto u_start = static_cast(start); @@ -289,8 +289,8 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos); } - if (start < 0) start += metadata.size + 1; - if (stop < 0) stop += metadata.size + 1; + if (start < 0) start += static_cast(metadata.size) + 1; + if (stop < 0) stop += static_cast(metadata.size) + 1; if (start < 0 || stop < 0 || start > stop) { *pos = -1; return rocksdb::Status::OK(); diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc index d68cd656658..de3a2fac2fc 100644 --- a/src/types/redis_bitmap_string.cc +++ b/src/types/redis_bitmap_string.cc @@ -23,13 +23,14 @@ #include #include "redis_string.h" +#include "storage/redis_metadata.h" namespace Redis { extern const uint8_t kNum2Bits[256]; rocksdb::Status BitmapString::GetBit(const std::string &raw_value, uint32_t offset, bool *bit) { - auto string_value = raw_value.substr(STRING_HDR_SIZE, raw_value.size() - STRING_HDR_SIZE); + auto string_value = raw_value.substr(Metadata::GetOffsetAfterExpire(raw_value[0])); uint32_t byte_index = offset >> 3; uint32_t bit_val = 0; uint32_t bit_offset = 7 - (offset & 0x7); @@ -42,7 +43,8 @@ rocksdb::Status BitmapString::GetBit(const std::string &raw_value, uint32_t offs rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string *raw_value, uint32_t offset, bool new_bit, bool *old_bit) { - auto string_value = raw_value->substr(STRING_HDR_SIZE, raw_value->size() - STRING_HDR_SIZE); + size_t header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]); + auto string_value = raw_value->substr(header_offset); uint32_t byte_index = offset >> 3; if (byte_index >= string_value.size()) { // expand the bitmap string_value.append(byte_index - string_value.size() + 1, 0); @@ -55,7 +57,7 @@ rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string *raw_value byteval = static_cast(byteval | ((new_bit & 0x1) << bit_offset)); string_value[byte_index] = byteval; - *raw_value = raw_value->substr(0, STRING_HDR_SIZE); + *raw_value = raw_value->substr(0, header_offset); raw_value->append(string_value); auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisString); @@ -66,7 +68,7 @@ rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string *raw_value rocksdb::Status BitmapString::BitCount(const std::string &raw_value, int64_t start, int64_t stop, uint32_t *cnt) { *cnt = 0; - auto string_value = raw_value.substr(STRING_HDR_SIZE, raw_value.size() - STRING_HDR_SIZE); + auto string_value = raw_value.substr(Metadata::GetOffsetAfterExpire(raw_value[0])); /* Convert negative indexes */ if (start < 0 && stop < 0 && start > stop) { return rocksdb::Status::OK(); @@ -89,7 +91,7 @@ rocksdb::Status BitmapString::BitCount(const std::string &raw_value, int64_t sta rocksdb::Status BitmapString::BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos) { - auto string_value = raw_value.substr(STRING_HDR_SIZE, raw_value.size() - STRING_HDR_SIZE); + auto string_value = raw_value.substr(Metadata::GetOffsetAfterExpire(raw_value[0])); auto strlen = static_cast(string_value.size()); /* Convert negative indexes */ if (start < 0) start = strlen + start; diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc index 2a52fdb10b2..8e914ac1237 100644 --- a/src/types/redis_hash.cc +++ b/src/types/redis_hash.cc @@ -189,10 +189,7 @@ rocksdb::Status Hash::MGet(const Slice &user_key, const std::vector &fiel } rocksdb::Status Hash::Set(const Slice &user_key, const Slice &field, const Slice &value, int *ret) { - FieldValue fv = {field.ToString(), value.ToString()}; - std::vector fvs; - fvs.emplace_back(std::move(fv)); - return MSet(user_key, fvs, false, ret); + return MSet(user_key, {{field.ToString(), value.ToString()}}, false, ret); } rocksdb::Status Hash::Delete(const Slice &user_key, const std::vector &fields, int *ret) { diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc index 3024dd0c4e3..edb04c8e736 100644 --- a/src/types/redis_string.cc +++ b/src/types/redis_string.cc @@ -21,10 +21,13 @@ #include "redis_string.h" #include +#include +#include #include #include #include "parse_util.h" +#include "storage/redis_metadata.h" #include "time_util.h" namespace Redis { @@ -86,7 +89,8 @@ rocksdb::Status String::getValue(const std::string &ns_key, std::string *value) std::string raw_value; auto s = getRawValue(ns_key, &raw_value); if (!s.ok()) return s; - *value = raw_value.substr(STRING_HDR_SIZE, raw_value.size() - STRING_HDR_SIZE); + size_t offset = Metadata::GetOffsetAfterExpire(raw_value[0]); + *value = raw_value.substr(offset); return rocksdb::Status::OK(); } @@ -94,7 +98,8 @@ std::vector String::getValues(const std::vector &ns_keys auto statuses = getRawValues(ns_keys, values); for (size_t i = 0; i < ns_keys.size(); i++) { if (!statuses[i].ok()) continue; - (*values)[i] = (*values)[i].substr(STRING_HDR_SIZE, (*values)[i].size() - STRING_HDR_SIZE); + size_t offset = Metadata::GetOffsetAfterExpire((*values)[i][0]); + (*values)[i] = (*values)[i].substr(offset, (*values)[i].size() - offset); } return statuses; } @@ -121,7 +126,7 @@ rocksdb::Status String::Append(const std::string &user_key, const std::string &v metadata.Encode(&raw_value); } raw_value.append(value); - *ret = static_cast(raw_value.size() - STRING_HDR_SIZE); + *ret = static_cast(raw_value.size() - Metadata::GetOffsetAfterExpire(raw_value[0])); return updateRawValue(ns_key, raw_value); } @@ -147,11 +152,11 @@ rocksdb::Status String::Get(const std::string &user_key, std::string *value) { return getValue(ns_key, value); } -rocksdb::Status String::GetEx(const std::string &user_key, std::string *value, int ttl) { - int expire = 0; +rocksdb::Status String::GetEx(const std::string &user_key, std::string *value, uint64_t ttl) { + uint64_t expire = 0; if (ttl > 0) { - int64_t now = Util::GetTimeStamp(); - expire = int(now) + ttl; + uint64_t now = Util::GetTimeStampMS(); + expire = now + ttl; } std::string ns_key; AppendNamespacePrefix(user_key, &ns_key); @@ -206,23 +211,23 @@ rocksdb::Status String::Set(const std::string &user_key, const std::string &valu return MSet(pairs, 0); } -rocksdb::Status String::SetEX(const std::string &user_key, const std::string &value, int ttl) { +rocksdb::Status String::SetEX(const std::string &user_key, const std::string &value, uint64_t ttl) { std::vector pairs{StringPair{user_key, value}}; return MSet(pairs, ttl); } -rocksdb::Status String::SetNX(const std::string &user_key, const std::string &value, int ttl, int *ret) { +rocksdb::Status String::SetNX(const std::string &user_key, const std::string &value, uint64_t ttl, int *ret) { std::vector pairs{StringPair{user_key, value}}; return MSetNX(pairs, ttl, ret); } -rocksdb::Status String::SetXX(const std::string &user_key, const std::string &value, int ttl, int *ret) { +rocksdb::Status String::SetXX(const std::string &user_key, const std::string &value, uint64_t ttl, int *ret) { *ret = 0; int exists = 0; - int expire = 0; + uint64_t expire = 0; if (ttl > 0) { - int64_t now = Util::GetTimeStamp(); - expire = int(now) + ttl; + uint64_t now = Util::GetTimeStampMS(); + expire = now + ttl; } std::string ns_key; @@ -240,7 +245,7 @@ rocksdb::Status String::SetXX(const std::string &user_key, const std::string &va return updateRawValue(ns_key, raw_value); } -rocksdb::Status String::SetRange(const std::string &user_key, int offset, const std::string &value, int *ret) { +rocksdb::Status String::SetRange(const std::string &user_key, size_t offset, const std::string &value, int *ret) { std::string ns_key; AppendNamespacePrefix(user_key, &ns_key); @@ -260,14 +265,15 @@ rocksdb::Status String::SetRange(const std::string &user_key, int offset, const metadata.Encode(&raw_value); } - int size = static_cast(raw_value.size()); - offset += STRING_HDR_SIZE; + size_t size = raw_value.size(); + size_t header_offset = Metadata::GetOffsetAfterExpire(raw_value[0]); + offset += header_offset; if (offset > size) { // padding the value with zero byte while offset is longer than value size - int paddings = offset - size; + size_t paddings = offset - size; raw_value.append(paddings, '\0'); } - if (offset + static_cast(value.size()) >= size) { + if (offset + value.size() >= size) { raw_value = raw_value.substr(0, offset); raw_value.append(value); } else { @@ -275,7 +281,7 @@ rocksdb::Status String::SetRange(const std::string &user_key, int offset, const raw_value[offset + i] = value[i]; } } - *ret = static_cast(raw_value.size() - STRING_HDR_SIZE); + *ret = static_cast(raw_value.size() - header_offset); return updateRawValue(ns_key, raw_value); } @@ -292,7 +298,8 @@ rocksdb::Status String::IncrBy(const std::string &user_key, int64_t increment, i metadata.Encode(&raw_value); } - value = raw_value.substr(STRING_HDR_SIZE, raw_value.size() - STRING_HDR_SIZE); + size_t offset = Metadata::GetOffsetAfterExpire(raw_value[0]); + value = raw_value.substr(offset); int64_t n = 0; if (!value.empty()) { auto parse_result = ParseInt(value, 10); @@ -311,7 +318,7 @@ rocksdb::Status String::IncrBy(const std::string &user_key, int64_t increment, i n += increment; *ret = n; - raw_value = raw_value.substr(0, STRING_HDR_SIZE); + raw_value = raw_value.substr(0, offset); raw_value.append(std::to_string(n)); return updateRawValue(ns_key, raw_value); } @@ -328,7 +335,8 @@ rocksdb::Status String::IncrByFloat(const std::string &user_key, double incremen Metadata metadata(kRedisString, false); metadata.Encode(&raw_value); } - value = raw_value.substr(STRING_HDR_SIZE, raw_value.size() - STRING_HDR_SIZE); + size_t offset = Metadata::GetOffsetAfterExpire(raw_value[0]); + value = raw_value.substr(offset); double n = 0; if (!value.empty()) { auto n_stat = ParseFloat(value); @@ -344,16 +352,16 @@ rocksdb::Status String::IncrByFloat(const std::string &user_key, double incremen } *ret = n; - raw_value = raw_value.substr(0, STRING_HDR_SIZE); + raw_value = raw_value.substr(0, offset); raw_value.append(std::to_string(n)); return updateRawValue(ns_key, raw_value); } -rocksdb::Status String::MSet(const std::vector &pairs, int ttl) { - int expire = 0; +rocksdb::Status String::MSet(const std::vector &pairs, uint64_t ttl) { + uint64_t expire = 0; if (ttl > 0) { - int64_t now = Util::GetTimeStamp(); - expire = int(now) + ttl; + uint64_t now = Util::GetTimeStampMS(); + expire = now + ttl; } // Data race, key string maybe overwrite by other key while didn't lock the key here, @@ -377,13 +385,13 @@ rocksdb::Status String::MSet(const std::vector &pairs, int ttl) { return rocksdb::Status::OK(); } -rocksdb::Status String::MSetNX(const std::vector &pairs, int ttl, int *ret) { +rocksdb::Status String::MSetNX(const std::vector &pairs, uint64_t ttl, int *ret) { *ret = 0; - int expire = 0; + uint64_t expire = 0; if (ttl > 0) { - int64_t now = Util::GetTimeStamp(); - expire = int(now) + ttl; + uint64_t now = Util::GetTimeStampMS(); + expire = now + ttl; } int exists = 0; @@ -425,7 +433,7 @@ rocksdb::Status String::MSetNX(const std::vector &pairs, int ttl, in // -1 if the user_key does not exist // 0 if the operation fails rocksdb::Status String::CAS(const std::string &user_key, const std::string &old_value, const std::string &new_value, - int ttl, int *ret) { + uint64_t ttl, int *ret) { *ret = 0; std::string ns_key, current_value; @@ -445,11 +453,11 @@ rocksdb::Status String::CAS(const std::string &user_key, const std::string &old_ if (old_value == current_value) { std::string raw_value; - int expire = 0; + uint64_t expire = 0; Metadata metadata(kRedisString, false); if (ttl > 0) { - int64_t now = Util::GetTimeStamp(); - expire = int(now) + ttl; + uint64_t now = Util::GetTimeStampMS(); + expire = now + ttl; } metadata.expire = expire; metadata.Encode(&raw_value); diff --git a/src/types/redis_string.h b/src/types/redis_string.h index 98a09c89826..dcbeda83e55 100644 --- a/src/types/redis_string.h +++ b/src/types/redis_string.h @@ -20,6 +20,7 @@ #pragma once +#include #include #include @@ -33,28 +34,26 @@ struct StringPair { namespace Redis { -const int STRING_HDR_SIZE = 5; - class String : public Database { public: explicit String(Engine::Storage *storage, const std::string &ns) : Database(storage, ns) {} rocksdb::Status Append(const std::string &user_key, const std::string &value, int *ret); rocksdb::Status Get(const std::string &user_key, std::string *value); - rocksdb::Status GetEx(const std::string &user_key, std::string *value, int ttl); + rocksdb::Status GetEx(const std::string &user_key, std::string *value, uint64_t ttl); rocksdb::Status GetSet(const std::string &user_key, const std::string &new_value, std::string *old_value); rocksdb::Status GetDel(const std::string &user_key, std::string *value); rocksdb::Status Set(const std::string &user_key, const std::string &value); - rocksdb::Status SetEX(const std::string &user_key, const std::string &value, int ttl); - rocksdb::Status SetNX(const std::string &user_key, const std::string &value, int ttl, int *ret); - rocksdb::Status SetXX(const std::string &user_key, const std::string &value, int ttl, int *ret); - rocksdb::Status SetRange(const std::string &user_key, int offset, const std::string &value, int *ret); + rocksdb::Status SetEX(const std::string &user_key, const std::string &value, uint64_t ttl); + rocksdb::Status SetNX(const std::string &user_key, const std::string &value, uint64_t ttl, int *ret); + rocksdb::Status SetXX(const std::string &user_key, const std::string &value, uint64_t ttl, int *ret); + rocksdb::Status SetRange(const std::string &user_key, size_t offset, const std::string &value, int *ret); rocksdb::Status IncrBy(const std::string &user_key, int64_t increment, int64_t *ret); rocksdb::Status IncrByFloat(const std::string &user_key, double increment, double *ret); std::vector MGet(const std::vector &keys, std::vector *values); - rocksdb::Status MSet(const std::vector &pairs, int ttl = 0); - rocksdb::Status MSetNX(const std::vector &pairs, int ttl, int *ret); - rocksdb::Status CAS(const std::string &user_key, const std::string &old_value, const std::string &new_value, int ttl, - int *ret); + rocksdb::Status MSet(const std::vector &pairs, uint64_t ttl = 0); + rocksdb::Status MSetNX(const std::vector &pairs, uint64_t ttl, int *ret); + rocksdb::Status CAS(const std::string &user_key, const std::string &old_value, const std::string &new_value, + uint64_t ttl, int *ret); rocksdb::Status CAD(const std::string &user_key, const std::string &value, int *ret); private: diff --git a/tests/cppunit/types/hash_test.cc b/tests/cppunit/types/hash_test.cc index 2d64b7af3eb..4696d7eddc1 100644 --- a/tests/cppunit/types/hash_test.cc +++ b/tests/cppunit/types/hash_test.cc @@ -55,6 +55,7 @@ TEST_F(RedisHashTest, GetAndSet) { for (size_t i = 0; i < fields_.size(); i++) { std::string got; auto s = hash->Get(key_, fields_[i], &got); + EXPECT_EQ(s.ToString(), "OK"); EXPECT_EQ(values_[i], got); } auto s = hash->Delete(key_, fields_, &ret); diff --git a/tests/cppunit/types/metadata_test.cc b/tests/cppunit/types/metadata_test.cc index fa1a7e4dc7a..68a3515d7fa 100644 --- a/tests/cppunit/types/metadata_test.cc +++ b/tests/cppunit/types/metadata_test.cc @@ -24,6 +24,7 @@ #include "storage/redis_metadata.h" #include "test_base.h" +#include "time_util.h" #include "types/redis_hash.h" TEST(InternalKey, EncodeAndDecode) { @@ -47,14 +48,14 @@ TEST(InternalKey, EncodeAndDecode) { TEST(Metadata, EncodeAndDeocde) { std::string string_bytes; Metadata string_md(kRedisString); - string_md.expire = 123; + string_md.expire = 123000; string_md.Encode(&string_bytes); Metadata string_md1(kRedisNone); string_md1.Decode(string_bytes); ASSERT_EQ(string_md, string_md1); ListMetadata list_md; list_md.flags = 13; - list_md.expire = 123; + list_md.expire = 123000; list_md.version = 2; list_md.size = 1234; list_md.head = 123; @@ -109,9 +110,83 @@ TEST_F(RedisTypeTest, Expire) { EXPECT_TRUE(s.ok() && static_cast(fvs.size()) == ret); int64_t now = 0; rocksdb::Env::Default()->GetCurrentTime(&now); - redis->Expire(key_, int(now + 2)); - int ttl = 0; + redis->Expire(key_, now * 1000 + 2000); + int64_t ttl = 0; redis->TTL(key_, &ttl); - ASSERT_TRUE(ttl >= 1 && ttl <= 2); + ASSERT_TRUE(ttl >= 1000 && ttl <= 2000); sleep(2); } + +TEST(Metadata, MetadataDecodingBackwardCompatibleSimpleKey) { + auto expire_at = (Util::GetTimeStamp() + 10) * 1000; + Metadata md_old(kRedisString, true, false); + EXPECT_FALSE(md_old.Is64BitEncoded()); + md_old.expire = expire_at; + std::string encoded_bytes; + md_old.Encode(&encoded_bytes); + EXPECT_EQ(encoded_bytes.size(), 5); + + Metadata md_new(kRedisNone, false, true); // decoding existing metadata with 64-bit feature activated + md_new.Decode(encoded_bytes); + EXPECT_FALSE(md_new.Is64BitEncoded()); + EXPECT_EQ(md_new.Type(), kRedisString); + EXPECT_EQ(md_new.expire, expire_at); +} + +TEST(Metadata, MetadataDecoding64BitSimpleKey) { + auto expire_at = (Util::GetTimeStamp() + 10) * 1000; + Metadata md_old(kRedisString, true, true); + EXPECT_TRUE(md_old.Is64BitEncoded()); + md_old.expire = expire_at; + std::string encoded_bytes; + md_old.Encode(&encoded_bytes); + EXPECT_EQ(encoded_bytes.size(), 9); +} + +TEST(Metadata, MetadataDecodingBackwardCompatibleComplexKey) { + auto expire_at = (Util::GetTimeStamp() + 100) * 1000; + uint32_t size = 1000000000; + Metadata md_old(kRedisHash, true, false); + EXPECT_FALSE(md_old.Is64BitEncoded()); + md_old.expire = expire_at; + md_old.size = size; + std::string encoded_bytes; + md_old.Encode(&encoded_bytes); + + Metadata md_new(kRedisHash, false, true); + md_new.Decode(encoded_bytes); + EXPECT_FALSE(md_new.Is64BitEncoded()); + EXPECT_EQ(md_new.Type(), kRedisHash); + EXPECT_EQ(md_new.expire, expire_at); + EXPECT_EQ(md_new.size, size); +} + +TEST(Metadata, Metadata64bitExpiration) { + auto expire_at = Util::GetTimeStampMS() + 1000; + Metadata md_src(kRedisString, true, true); + EXPECT_TRUE(md_src.Is64BitEncoded()); + md_src.expire = expire_at; + std::string encoded_bytes; + md_src.Encode(&encoded_bytes); + + Metadata md_decoded(kRedisNone, false, true); + md_decoded.Decode(encoded_bytes); + EXPECT_TRUE(md_decoded.Is64BitEncoded()); + EXPECT_EQ(md_decoded.Type(), kRedisString); + EXPECT_EQ(md_decoded.expire, expire_at); +} + +TEST(Metadata, Metadata64bitSize) { + uint64_t big_size = 100000000000; + Metadata md_src(kRedisHash, true, true); + EXPECT_TRUE(md_src.Is64BitEncoded()); + md_src.size = big_size; + std::string encoded_bytes; + md_src.Encode(&encoded_bytes); + + Metadata md_decoded(kRedisNone, false, true); + md_decoded.Decode(encoded_bytes); + EXPECT_TRUE(md_decoded.Is64BitEncoded()); + EXPECT_EQ(md_decoded.Type(), kRedisHash); + EXPECT_EQ(md_decoded.size, big_size); +} diff --git a/tests/cppunit/types/string_test.cc b/tests/cppunit/types/string_test.cc index d9d220ea37e..efd49c1929c 100644 --- a/tests/cppunit/types/string_test.cc +++ b/tests/cppunit/types/string_test.cc @@ -72,6 +72,7 @@ TEST_F(RedisStringTest, MGetAndMSet) { string->MSet(pairs_); std::vector keys; std::vector values; + keys.reserve(pairs_.size()); for (const auto &pair : pairs_) { keys.emplace_back(pair.key); } @@ -137,13 +138,13 @@ TEST_F(RedisStringTest, GetEmptyValue) { } TEST_F(RedisStringTest, GetSet) { - int ttl = 0; + int64_t ttl = 0; int64_t now = 0; rocksdb::Env::Default()->GetCurrentTime(&now); std::vector values = {"a", "b", "c", "d"}; for (size_t i = 0; i < values.size(); i++) { std::string old_value; - string->Expire(key_, static_cast(now + 1000)); + string->Expire(key_, now * 1000 + 100000); string->GetSet(key_, values[i], &old_value); if (i != 0) { EXPECT_EQ(values[i - 1], old_value); @@ -172,14 +173,14 @@ TEST_F(RedisStringTest, GetDel) { TEST_F(RedisStringTest, MSetXX) { int ret = 0; - string->SetXX(key_, "test-value", 3, &ret); + string->SetXX(key_, "test-value", 3000, &ret); EXPECT_EQ(ret, 0); string->Set(key_, "test-value"); - string->SetXX(key_, "test-value", 3, &ret); + string->SetXX(key_, "test-value", 3000, &ret); EXPECT_EQ(ret, 1); - int ttl = 0; + int64_t ttl = 0; string->TTL(key_, &ttl); - EXPECT_TRUE(ttl >= 2 && ttl <= 3); + EXPECT_TRUE(ttl >= 2000 && ttl <= 3000); string->Del(key_); } @@ -189,6 +190,7 @@ TEST_F(RedisStringTest, MSetNX) { EXPECT_EQ(1, ret); std::vector keys; std::vector values; + keys.reserve(pairs_.size()); for (const auto &pair : pairs_) { keys.emplace_back(pair.key); } @@ -210,18 +212,18 @@ TEST_F(RedisStringTest, MSetNX) { TEST_F(RedisStringTest, MSetNXWithTTL) { int ret = 0; - string->SetNX(key_, "test-value", 3, &ret); - int ttl = 0; + string->SetNX(key_, "test-value", 3000, &ret); + int64_t ttl = 0; string->TTL(key_, &ttl); - EXPECT_TRUE(ttl >= 2 && ttl <= 3); + EXPECT_TRUE(ttl >= 2000 && ttl <= 3000); string->Del(key_); } TEST_F(RedisStringTest, SetEX) { - string->SetEX(key_, "test-value", 3); - int ttl = 0; + string->SetEX(key_, "test-value", 3000); + int64_t ttl = 0; string->TTL(key_, &ttl); - EXPECT_TRUE(ttl >= 2 && ttl <= 3); + EXPECT_TRUE(ttl >= 2000 && ttl <= 3000); string->Del(key_); } @@ -257,15 +259,15 @@ TEST_F(RedisStringTest, CAS) { auto status = string->Set(key, value); ASSERT_TRUE(status.ok()); - status = string->CAS("non_exist_key", value, new_value, 10, &ret); + status = string->CAS("non_exist_key", value, new_value, 10000, &ret); ASSERT_TRUE(status.ok()); EXPECT_EQ(-1, ret); - status = string->CAS(key, "cas_value_err", new_value, 10, &ret); + status = string->CAS(key, "cas_value_err", new_value, 10000, &ret); ASSERT_TRUE(status.ok()); EXPECT_EQ(0, ret); - status = string->CAS(key, value, new_value, 10, &ret); + status = string->CAS(key, value, new_value, 10000, &ret); ASSERT_TRUE(status.ok()); EXPECT_EQ(1, ret); @@ -274,9 +276,9 @@ TEST_F(RedisStringTest, CAS) { ASSERT_TRUE(status.ok()); EXPECT_EQ(new_value, current_value); - int ttl = 0; + int64_t ttl = 0; string->TTL(key, &ttl); - EXPECT_TRUE(ttl >= 9 && ttl <= 10); + EXPECT_TRUE(ttl >= 9000 && ttl <= 10000); string->Del(key); } diff --git a/tests/gocase/unit/expire/expire_test.go b/tests/gocase/unit/expire/expire_test.go index 5cc6763caa0..d35b84d8801 100644 --- a/tests/gocase/unit/expire/expire_test.go +++ b/tests/gocase/unit/expire/expire_test.go @@ -42,7 +42,7 @@ func TestExpire(t *testing.T) { require.True(t, rdb.Expire(ctx, "x", 5*time.Second).Val()) util.BetweenValues(t, rdb.TTL(ctx, "x").Val(), 4*time.Second, 5*time.Second) require.True(t, rdb.Expire(ctx, "x", 10*time.Second).Val()) - require.Equal(t, 10*time.Second, rdb.TTL(ctx, "x").Val()) + util.BetweenValues(t, rdb.TTL(ctx, "x").Val().Seconds(), 9, 10) require.NoError(t, rdb.Expire(ctx, "x", 2*time.Second).Err()) }) @@ -112,10 +112,10 @@ func TestExpire(t *testing.T) { t.Run("EXPIRE precision is now the millisecond", func(t *testing.T) { util.RetryEventually(t, func() bool { require.NoError(t, rdb.Del(ctx, "x").Err()) - require.NoError(t, rdb.SetEx(ctx, "x", "somevalue", time.Second).Err()) - time.Sleep(900 * time.Millisecond) + require.NoError(t, rdb.SetEx(ctx, "x", "somevalue", 1500*time.Millisecond).Err()) + time.Sleep(50 * time.Millisecond) a := rdb.Get(ctx, "x").Val() - time.Sleep(1100 * time.Millisecond) + time.Sleep(2000 * time.Millisecond) b := rdb.Get(ctx, "x").Val() return a == "somevalue" && b == "" }, 3) @@ -125,10 +125,10 @@ func TestExpire(t *testing.T) { util.RetryEventually(t, func() bool { require.NoError(t, rdb.Del(ctx, "x").Err()) require.NoError(t, rdb.Set(ctx, "x", "somevalue", 0).Err()) - require.NoError(t, rdb.PExpire(ctx, "x", 100*time.Millisecond).Err()) + require.NoError(t, rdb.PExpire(ctx, "x", 1500*time.Millisecond).Err()) time.Sleep(50 * time.Millisecond) a := rdb.Get(ctx, "x").Val() - time.Sleep(2 * time.Second) + time.Sleep(2000 * time.Millisecond) b := rdb.Get(ctx, "x").Val() return a == "somevalue" && b == "" }, 3) @@ -138,10 +138,10 @@ func TestExpire(t *testing.T) { util.RetryEventually(t, func() bool { require.NoError(t, rdb.Del(ctx, "x").Err()) require.NoError(t, rdb.Set(ctx, "x", "somevalue", 0).Err()) - require.NoError(t, rdb.PExpireAt(ctx, "x", time.UnixMilli(time.Now().Unix()*1000+100)).Err()) + require.NoError(t, rdb.PExpireAt(ctx, "x", time.UnixMilli(time.Now().Unix()*1000+1500)).Err()) time.Sleep(50 * time.Millisecond) a := rdb.Get(ctx, "x").Val() - time.Sleep(2 * time.Second) + time.Sleep(2000 * time.Millisecond) b := rdb.Get(ctx, "x").Val() return a == "somevalue" && b == "" }, 3) @@ -150,10 +150,10 @@ func TestExpire(t *testing.T) { t.Run("PSETEX can set sub-second expires", func(t *testing.T) { util.RetryEventually(t, func() bool { require.NoError(t, rdb.Del(ctx, "x").Err()) - require.NoError(t, rdb.Set(ctx, "x", "somevalue", 100*time.Millisecond).Err()) + require.NoError(t, rdb.Set(ctx, "x", "somevalue", 1500*time.Millisecond).Err()) time.Sleep(50 * time.Millisecond) a := rdb.Get(ctx, "x").Val() - time.Sleep(2 * time.Second) + time.Sleep(2000 * time.Millisecond) b := rdb.Get(ctx, "x").Val() return a == "somevalue" && b == "" }, 3) @@ -167,8 +167,8 @@ func TestExpire(t *testing.T) { t.Run("PTTL returns time to live in milliseconds", func(t *testing.T) { require.NoError(t, rdb.Del(ctx, "x").Err()) - require.NoError(t, rdb.SetEx(ctx, "x", "somevalue", 1*time.Second).Err()) - util.BetweenValues(t, rdb.PTTL(ctx, "x").Val(), 900*time.Millisecond, 1000*time.Millisecond) + require.NoError(t, rdb.SetEx(ctx, "x", "somevalue", 2*time.Second).Err()) + util.BetweenValues(t, rdb.PTTL(ctx, "x").Val(), 50*time.Millisecond, 2000*time.Millisecond) }) t.Run("TTL / PTTL return -1 if key has no expire", func(t *testing.T) { @@ -186,11 +186,11 @@ func TestExpire(t *testing.T) { t.Run("Redis should actively expire keys incrementally", func(t *testing.T) { require.NoError(t, rdb.FlushDB(ctx).Err()) - require.NoError(t, rdb.Do(ctx, "PSETEX", "key1", 500, "a").Err()) - require.NoError(t, rdb.Do(ctx, "PSETEX", "key2", 500, "a").Err()) - require.NoError(t, rdb.Do(ctx, "PSETEX", "key3", 500, "a").Err()) + require.NoError(t, rdb.Do(ctx, "PSETEX", "key1", 1500, "a").Err()) + require.NoError(t, rdb.Do(ctx, "PSETEX", "key2", 1500, "a").Err()) + require.NoError(t, rdb.Do(ctx, "PSETEX", "key3", 1500, "a").Err()) require.NoError(t, rdb.Do(ctx, "DBSIZE", "scan").Err()) - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) require.EqualValues(t, 3, rdb.DBSize(ctx).Val()) time.Sleep(2000 * time.Millisecond) require.NoError(t, rdb.Do(ctx, "DBSIZE", "scan").Err()) diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc index 89c6ca2edd7..4a521e9e72e 100644 --- a/utils/kvrocks2redis/parser.cc +++ b/utils/kvrocks2redis/parser.cc @@ -28,6 +28,7 @@ #include "cluster/redis_slot.h" #include "db_util.h" #include "server/redis_reply.h" +#include "storage/redis_metadata.h" #include "types/redis_string.h" Status Parser::ParseFullDB() { @@ -59,17 +60,17 @@ Status Parser::ParseFullDB() { return Status::OK(); } -Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, int expire) { +Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t expire) { std::string ns, user_key; ExtractNamespaceKey(ns_key, &ns, &user_key, slot_id_encoded_); - auto command = Redis::Command2RESP( - {"SET", user_key, value.ToString().substr(Redis::STRING_HDR_SIZE, value.size() - Redis::STRING_HDR_SIZE)}); + auto command = + Redis::Command2RESP({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}); Status s = writer_->Write(ns, {command}); if (!s.IsOK()) return s; if (expire > 0) { - command = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire)}); + command = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire / 1000)}); s = writer_->Write(ns, {command}); } @@ -142,7 +143,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { } if (metadata.expire > 0) { - output = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire)}); + output = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)}); Status s = writer_->Write(ns, {output}); if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF"); } diff --git a/utils/kvrocks2redis/parser.h b/utils/kvrocks2redis/parser.h index 771b9d6ee74..d02df5da535 100644 --- a/utils/kvrocks2redis/parser.h +++ b/utils/kvrocks2redis/parser.h @@ -65,7 +65,7 @@ class Parser { std::unique_ptr latest_snapshot_; bool slot_id_encoded_ = false; - Status parseSimpleKV(const Slice &ns_key, const Slice &value, int expire); + Status parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t expire); Status parseComplexKV(const Slice &ns_key, const Metadata &metadata); Status parseBitmapSegment(const Slice &ns, const Slice &user_key, int index, const Slice &bitmap); };