Skip to content

Commit

Permalink
Fix memory leak and avoid lock when dstore disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
dispensable committed Sep 3, 2024
1 parent 1bed777 commit 6bdf473
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 7 deletions.
28 changes: 24 additions & 4 deletions cassandra/prefix_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type PrefixSwitcher struct {
lock sync.RWMutex
currentTrieMap map[string]string
cstarEnabled bool
bdbEnabled bool
}

func (s PrefixSwitchStatus) IsReadOnBeansdb() bool {
Expand Down Expand Up @@ -157,30 +158,31 @@ func GetPrefixSwitchTrieFromCfg(
}
}

func NewPrefixSwitcher(config *config.CassandraStoreCfg, cqlStore *CassandraStore) (*PrefixSwitcher, error) {
func NewPrefixSwitcher(cfg *config.ProxyConfig, cqlStore *CassandraStore) (*PrefixSwitcher, error) {
f := new(PrefixSwitcher)

if !config.Enable {
if !cfg.CassandraStoreCfg.Enable {
f.defaultT = PrefixSwitchBrw
f.cstarEnabled = false
return f, nil
}

prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(config, cqlStore)
prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(&cfg.CassandraStoreCfg, cqlStore)
if err != nil {
return nil, err
}

f.trie = prefixTrie
f.cstarEnabled = true

defaultS, err := strToSwitchStatus(config.SwitchToKeyDefault)
defaultS, err := strToSwitchStatus(cfg.SwitchToKeyDefault)
if err != nil {
return nil, err
}

f.defaultT = defaultS
f.currentTrieMap = nowMap
f.bdbEnabled = cfg.DStoreConfig.Enable
return f, nil
}

Expand Down Expand Up @@ -215,6 +217,10 @@ func (s *PrefixSwitcher) matchStatus(key string) PrefixSwitchStatus {
}

func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus {
if !s.bdbEnabled && s.cstarEnabled {
return PrefixSwitchCrw
}

if !s.cstarEnabled {
return PrefixSwitchBrw
}
Expand All @@ -226,15 +232,25 @@ func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus {

// check key prefix and return bdb read enable c* read enable
func (s *PrefixSwitcher) ReadEnabledOn(key string) (bool, bool) {
if !s.bdbEnabled && s.cstarEnabled {
return false, true
}

if !s.cstarEnabled {
return true, false
}

status := s.GetStatus(key)
return status.IsReadOnBeansdb(), status.IsReadOnCstar()
}

// check keys prefix list and return bdb read keys and c* read keys
func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys []string) {
if !s.bdbEnabled && s.cstarEnabled {
ckeys = keys
return
}

if !s.cstarEnabled {
bkeys = keys
return
Expand All @@ -261,6 +277,10 @@ func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys

// check key prefix and return bdb write enable c* write enable
func (s *PrefixSwitcher) WriteEnabledOn(key string) (bool, bool) {
if !s.bdbEnabled && s.cstarEnabled {
return false, true
}

if !s.cstarEnabled {
return true, false
}
Expand Down
10 changes: 7 additions & 3 deletions dstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error {

s.cstar = cstar

switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, cstar)
switcher, err := cassandra.NewPrefixSwitcher(proxyConf, cstar)
if err != nil {
return err
}
Expand All @@ -63,7 +63,7 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error {
s.dualWErrHandler = dualWErrHandler
logger.Infof("dual write log send to: %s", s.dualWErrHandler.EFile)
} else {
switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, nil)
switcher, err := cassandra.NewPrefixSwitcher(proxyConf, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -269,14 +269,18 @@ func (c *StorageClient) getMulti(keys []string) (rs map[string]*mc.Item, targets
}

func (c *StorageClient) GetMulti(keys []string) (rs map[string]*mc.Item, err error) {
// The keys args MUST BE DEDUPLICATED, if not, there will be MEMORY LEAK
keys = deduplicateKeys(keys)

timer := prometheus.NewTimer(
cmdE2EDurationSeconds.WithLabelValues("getm"),
)
defer timer.ObserveDuration()

bkeys, ckeys := c.pswitcher.ReadEnableOnKeys(keys)

rs = make(map[string]*mc.Item, len(keys))

if len(bkeys) > 0 {
totalReqs.WithLabelValues("getm", "beansdb").Inc()
c.sched = GetScheduler()
Expand Down
21 changes: 21 additions & 0 deletions dstore/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dstore

func deduplicateKeys(keys []string) []string {
dedup := make(map[string]struct{}, len(keys))

for _, k := range keys {
if _, ok := dedup[k]; ok {
continue
} else {
dedup[k] = struct{}{}
}
}

dedupKs := make([]string, len(dedup))
i := 0
for k := range dedup {
dedupKs[i] = k
i++
}
return dedupKs
}
38 changes: 38 additions & 0 deletions dstore/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dstore

import (
"fmt"
"testing"
)

func TestDeduplicateKeys(t *testing.T) {
test := []string{"a", "b", "c", "d", "a"}
dtk := deduplicateKeys(test)
if (len(dtk) != 4) {
t.Errorf("string slice should be deduplicated: %s", dtk)
}
test2 := []string{"a", "n"}
dtk2 := deduplicateKeys(test2)
if (len(dtk2) != 2) {
t.Errorf("string slice %s has no duplications", test2)
}
t.Logf("after dedup: %s | %s", dtk, dtk2)
}

func BenchmarkDeduplicateKeys(b *testing.B) {
test := []string{
"/frodo_feed/title_vecs/3055:4601087161",
"/frodo_feed/title_vecs/3055:4601087162",
"/frodo_feed/title_vecs/3055:4601087161",
"/frodo_feed/title_vecs/3055:4601087165",
"/frodo_feed/title_vecs/3055:4601087161",
}

for j := 0; j < 200; j++ {
test = append(test, fmt.Sprintf("/frodo_feed/title_vecs/3055:460108716%d", j))
}

for i := 0; i < b.N; i++ {
deduplicateKeys(test)
}
}

0 comments on commit 6bdf473

Please sign in to comment.