Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hash): Support hash field expiration #2402

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ json-max-nesting-depth 1024
# Default: json
json-storage-format json

# Whether to enable hash field expiration feature.
# NOTE: This option only affects newly hash object
# Default: no
hash-field-expiration no

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
247 changes: 246 additions & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
#include "time_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -445,6 +446,241 @@ class CommandHRandField : public Commander {
bool no_parameters_ = true;
};

class CommandFieldExpireBase : public Commander {
protected:
Status commonParse(const std::vector<std::string> &args, int start_idx) {
CommandParser parser(args, start_idx);
std::string_view expire_flag, num_flag;
uint64_t fields_num = 0;
while (parser.Good()) {
if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
break;
} else if (parser.EatEqICaseFlag("NX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::NX;
} else if (parser.EatEqICaseFlag("XX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::XX;
} else if (parser.EatEqICaseFlag("GT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::GT;
} else if (parser.EatEqICaseFlag("LT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::LT;
} else {
return parser.InvalidSyntax();
}
}

auto remains = parser.Remains();
auto size = args.size();
if (remains != fields_num) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
for (size_t i = size - remains; i < size; i++) {
fields_.emplace_back(args_[i]);
}

return Status::OK();
}

Status expireFieldExecute(Server *srv, Connection *conn, std::string *output) {
if (!srv->storage->GetConfig()->hash_field_expiration) {
return {Status::RedisExecErr, "field expiration feature is disabled"};
}

std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
auto s = hash_db.ExpireFields(ctx, args_[1], expire_, fields_, field_expire_type_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}

return Status::OK();
}

Status ttlExpireExecute(Server *srv, Connection *conn, std::vector<int64_t> &ret) {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
auto s = hash_db.TTLFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}

uint64_t expire_ = 0;
HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
std::vector<Slice> fields_;
};

class CommandHExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000 + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHPExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHPExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHExpireTime : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer((now + ttl) / 1000));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHPExpireTime : public CommandFieldExpireBase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command HExpireTime/HTTL/HPTTL doesn't support NX/XX/LT/GT flags. By the way, it should be good to remove the ttlExpireExecute/expireFieldExecute and only add a reply helper function like redis::IntArray. @torwig @PragmaTwice What do you think?

public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer(now + ttl));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl));
}
return Status::OK();
}
};

class CommandHPTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl));
}
return Status::OK();
}
};

class CommandHPersist : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
auto s = hash_db.PersistFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
Expand All @@ -461,6 +697,15 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only", 1, 1, 1), )
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireAt>("hexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireTime>("hexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPExpire>("hpexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireAt>("hpexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireTime>("hpexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPTTL>("hpttl", -5, "read-only", 1, 1, 1), )

} // namespace redis
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Config::Config() {
{"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)},
{"json-storage-format", false,
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"hash-field-expiration", false, new YesNoField(&hash_field_expiration, false)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ struct Config {
int json_max_nesting_depth = 1024;
JsonStorageFormat json_storage_format = JsonStorageFormat::JSON;

// whether to enable hash field expiration feature
bool hash_field_expiration = false;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down
5 changes: 4 additions & 1 deletion src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "db_util.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
#include "types/redis_hash.h"

namespace engine {

Expand Down Expand Up @@ -132,7 +133,9 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl
return false;
}

return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
return IsMetadataExpired(ikey, metadata) ||
(metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)) ||
(metadata.Type() == kRedisHash && redis::Hash::IsFieldExpired(cached_metadata_, value));
}

} // namespace engine
Loading