Skip to content

Commit

Permalink
feat: dont store cmd(Get/Set/Hget/Hset) with too large key in cache(fix
Browse files Browse the repository at this point in the history
  • Loading branch information
QlQlqiqi committed Aug 7, 2024
1 parent a2acd88 commit ba5903b
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 1 deletion.
4 changes: 4 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ zset-cache-field-num-per-key : 512
# If zset-cache-start-direction is -1, cache the last 512[zset-cache-field-num-per-key] elements
zset-cache-start-direction : 0

# Hget/Get cmd with more than max-key-size-in-cache size can not be stored in redis cache.
# -1 means unlimited and this value can not over the max value of int
# default: 512 (DEFAULT_CACHE_MAX_KEY_SIZE)
max-key-size-in-cache : 512

# the cache maxmemory of every db, configuration 10G
cache-maxmemory : 10737418240
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
// used for execute multikey command into different slots
virtual void Split(const HintKeys& hint_keys) = 0;
virtual void Merge() = 0;
virtual bool IsTooLargeKey(const int &max_sz) { return false; }

int8_t SubCmdIndex(const std::string& cmdName); // if the command no subCommand,return -1;

Expand Down
3 changes: 3 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ class PikaConf : public pstd::BaseConf {
void SetCacheMode(const int value) { cache_mode_ = value; }
void SetCacheStartDirection(const int value) { zset_cache_start_direction_ = value; }
void SetCacheItemsPerKey(const int value) { zset_cache_field_num_per_key_ = value; }
void SetCacheMaxKeySize(const int value) { max_key_size_in_cache_ = value; }
void SetCacheMaxmemory(const int64_t value) { cache_maxmemory_ = value; }
void SetCacheMaxmemoryPolicy(const int value) { cache_maxmemory_policy_ = value; }
void SetCacheMaxmemorySamples(const int value) { cache_maxmemory_samples_ = value; }
Expand Down Expand Up @@ -882,6 +883,7 @@ class PikaConf : public pstd::BaseConf {
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
int zset_cache_field_num_per_key() { return zset_cache_field_num_per_key_; }
int max_key_size_in_cache() { return max_key_size_in_cache_; }
int cache_maxmemory_policy() { return cache_maxmemory_policy_; }
int cache_maxmemory_samples() { return cache_maxmemory_samples_; }
int cache_lfu_decay_time() { return cache_lfu_decay_time_; }
Expand Down Expand Up @@ -1031,6 +1033,7 @@ class PikaConf : public pstd::BaseConf {
std::atomic_int cache_bit_ = 1;
std::atomic_int zset_cache_start_direction_ = 0;
std::atomic_int zset_cache_field_num_per_key_ = 512;
std::atomic_int max_key_size_in_cache_ = 512;
std::atomic_int cache_maxmemory_policy_ = 1;
std::atomic_int cache_maxmemory_samples_ = 5;
std::atomic_int cache_lfu_decay_time_ = 1;
Expand Down
1 change: 1 addition & 0 deletions include/pika_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class HGetCmd : public Cmd {
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
bool IsTooLargeKey(const int &max_sz) override { return key_.size() > static_cast<uint32_t>(max_sz); }
Cmd* Clone() override { return new HGetCmd(*this); }

private:
Expand Down
2 changes: 2 additions & 0 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class SetCmd : public Cmd {
void DoThroughDB() override;
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
bool IsTooLargeKey(const int& max_sz) override { return key_.size() > static_cast<uint32_t>(max_sz); }
Cmd* Clone() override { return new SetCmd(*this); }

private:
Expand Down Expand Up @@ -64,6 +65,7 @@ class GetCmd : public Cmd {
void ReadCache() override;
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
bool IsTooLargeKey(const int &max_sz) override { return key_.size() > static_cast<uint32_t>(max_sz); }
Cmd* Clone() override { return new GetCmd(*this); }

private:
Expand Down
1 change: 1 addition & 0 deletions src/cache/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ constexpr int CACHE_START_FROM_END = -1;
* cache items per key
*/
#define DEFAULT_CACHE_ITEMS_PER_KEY 512
#define DEFAULT_CACHE_MAX_KEY_SIZE 512

struct CacheConfig {
uint64_t maxmemory; /* Can used max memory */
Expand Down
6 changes: 6 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std
}
// Initial
c_ptr->Initial(argv, current_db_);
// dont store cmd with too large key(only Get/HGet cmd can reach here)
// the cmd with large key should be non-exist in cache, except for pre-stored
if(c_ptr->IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) {
resp_num--;
return false;
}
//acl check
int8_t subCmdIndex = -1;
std::string errKey;
Expand Down
7 changes: 7 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,13 @@ int PikaConf::Load() {
}
zset_cache_field_num_per_key_ = zset_cache_field_num_per_key;

int max_key_size_in_cache = DEFAULT_CACHE_MAX_KEY_SIZE;
GetConfInt("max-key-size-in-cache", &max_key_size_in_cache);
if (max_key_size_in_cache <= 0) {
max_key_size_in_cache = DEFAULT_CACHE_MAX_KEY_SIZE;
}
max_key_size_in_cache_ = max_key_size_in_cache;

int64_t cache_maxmemory = PIKA_CACHE_SIZE_DEFAULT;
GetConfInt64("cache-maxmemory", &cache_maxmemory);
cache_maxmemory_ = (PIKA_CACHE_SIZE_MIN > cache_maxmemory) ? PIKA_CACHE_SIZE_DEFAULT : cache_maxmemory;
Expand Down
7 changes: 7 additions & 0 deletions src/pika_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ void HSetCmd::DoThroughDB() {
}

void HSetCmd::DoUpdateCache() {
// HSetIfKeyExist() can void storing large key, but IsTooLargeKey() can speed up it
if (IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) {
return;
}
if (s_.ok()) {
db_->cache()->HSetIfKeyExist(key_, field_, value_);
}
Expand Down Expand Up @@ -123,6 +127,9 @@ void HGetCmd::DoThroughDB() {
}

void HGetCmd::DoUpdateCache() {
if (IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) {
return;
}
if (s_.ok()) {
db_->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, key_, db_);
}
Expand Down
5 changes: 4 additions & 1 deletion src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void SetCmd::DoThroughDB() {
}

void SetCmd::DoUpdateCache() {
if (SetCmd::kNX == condition_) {
if (SetCmd::kNX == condition_ || IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) {
return;
}
if (s_.ok()) {
Expand Down Expand Up @@ -185,6 +185,9 @@ void GetCmd::DoThroughDB() {
}

void GetCmd::DoUpdateCache() {
if (IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) {
return;
}
if (s_.ok()) {
db_->cache()->WriteKVToCache(key_, value_, sec_);
}
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pika_integration
import (
"context"
"sort"
"strings"
"time"

. "github.com/bsm/ginkgo/v2"
Expand Down Expand Up @@ -365,5 +366,25 @@ var _ = Describe("Hash Commands", func() {
// Equal([]redis.KeyValue{{Key: "key2", Value: "hello2"}}),
// ))
//})
It("HSet cmd with too large key should not in cache", func() {
key := strings.Repeat("A", 1000)
set := client.HSet(ctx, key, "field", "value", 0)
Expect(set.Err()).NotTo(HaveOccurred())
Expect(set.Val()).To(Equal("OK"))

get := client.HGet(ctx, key, "field")
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("value"))

// for timer task doing its job
time.Sleep(5 * time.Second)

info := client.Info(ctx, "cache")
Expect(info.Err()).NotTo(HaveOccurred())
Expect(info.Val()).NotTo(Equal(""))
Expect(info.Val()).To(ContainSubstring(`cache_keys`))
cache_keys := extractValue(info.Val(), "cache_keys")
Expect(strings.Contains(cache_keys, "0")).To(BeTrue())
})
})
})
35 changes: 35 additions & 0 deletions tests/integration/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pika_integration

import (
"context"
"strings"
"time"

. "github.com/bsm/ginkgo/v2"
Expand Down Expand Up @@ -403,5 +404,39 @@ var _ = Describe("Set Commands", func() {
Expect(sMembers.Err()).NotTo(HaveOccurred())
Expect(sMembers.Val()).To(HaveLen(5))
})

It("Set cmd with too large key should not in cache", func() {
key := strings.Repeat("A", 1000)
set := client.Set(ctx, key, "a", 0)
Expect(set.Err()).NotTo(HaveOccurred())
Expect(set.Val()).To(Equal("OK"))

get := client.Get(ctx, key)
Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("a"))

// for timer task doing its job
time.Sleep(5 * time.Second)

info := client.Info(ctx, "cache")
Expect(info.Err()).NotTo(HaveOccurred())
Expect(info.Val()).NotTo(Equal(""))
Expect(info.Val()).To(ContainSubstring(`cache_keys`))
cache_keys := extractValue(info.Val(), "cache_keys")
Expect(strings.Contains(cache_keys, "0")).To(BeTrue())
})
})
})

func extractValue(data, key string) string {
lines := strings.Split(data, "\n")
for _, line := range lines {
if strings.HasPrefix(line, key+":") {
parts := strings.Split(line, ":")
if len(parts) == 2 {
return parts[1]
}
}
}
return ""
}

0 comments on commit ba5903b

Please sign in to comment.