Skip to content

Commit

Permalink
use doWithBatch to optmize concurrent batch code.
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmie Han <hanjinming@outlook.com>
  • Loading branch information
hanjm committed Nov 28, 2021
1 parent 158a3c3 commit 801384f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 102 deletions.
2 changes: 1 addition & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ While the remaining settings are **optional**:

### Redis index cache

The `memcached` index cache allows to use [Redis](https://redis.io) as cache backend. This cache type is configured using `--index-cache.config-file` to reference the configuration file or `--index-cache.config` to put yaml config directly:
The `redis` index cache allows to use [Redis](https://redis.io) as cache backend. This cache type is configured using `--index-cache.config-file` to reference the configuration file or `--index-cache.config` to put yaml config directly:

```yaml mdox-exec="go run scripts/cfggen/main.go --name=cacheutil.RedisClientConfig"
type: REDIS
Expand Down
13 changes: 2 additions & 11 deletions pkg/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,8 @@ func TestRedisCache(t *testing.T) {
defer s.Close()
logger := log.NewLogfmtLogger(os.Stderr)
reg := prometheus.NewRegistry()
cfg := cacheutil.RedisClientConfig{
Addr: s.Addr(),
DialTimeout: time.Second,
ReadTimeout: time.Second,
WriteTimeout: time.Second,
MinIdleConns: 10,
MaxConnAge: time.Minute * 10,
IdleTimeout: time.Minute * 5,
MaxGetMultiConcurrency: 2,
GetMultiBatchSize: 2,
}
cfg := cacheutil.DefaultRedisClientConfig
cfg.Addr = s.Addr()
c, err := cacheutil.NewRedisClientWithConfig(logger, t.Name(), cfg, reg)
if err != nil {
testutil.Ok(t, err)
Expand Down
156 changes: 66 additions & 90 deletions pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,52 +190,25 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl
return
}
start := time.Now()
// Split multi keys to batch.
var batches []map[string][]byte
if c.config.SetMultiBatchSize > 0 {
batch := len(data)/c.config.SetMultiBatchSize + 1
batches = make([]map[string][]byte, batch)
for i := 0; i < batch; i++ {
batches[i] = make(map[string][]byte)
}
var idx int
for k, v := range data {
batches[idx/c.config.SetMultiBatchSize][k] = v
idx++
}
} else {
batches = []map[string][]byte{data}
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
g, ctx := errgroup.WithContext(ctx)
for _, v := range batches {
v := v
// Wait until we get a free slot from the gate, if the max
// concurrency should be enforced.
if c.config.MaxSetMultiConcurrency > 0 {
if err := c.setMultiGate.Start(ctx); err != nil {
level.Warn(c.logger).Log("msg", "setMultiGate err", "err", err, "items", len(data))
return
}
}
g.Go(func() error {
if c.config.MaxSetMultiConcurrency > 0 {
defer c.setMultiGate.Done()
}
_, err := c.Pipelined(ctx, func(p redis.Pipeliner) error {
for key, val := range v {
p.SetEX(ctx, key, val, ttl)
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis",
"err", err, "items", len(data))
return nil
err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error {
_, err := c.Pipelined(ctx, func(p redis.Pipeliner) error {
for _, key := range keys {
p.SetEX(ctx, key, data[key], ttl)
}
return nil
})
}
if err := g.Wait(); err != nil {
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis",
"err", err, "items", len(data))
return nil
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err,
"items", len(data))
return
Expand All @@ -249,58 +222,31 @@ func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][]
return nil
}
start := time.Now()
// Split multi keys to batch.
var keysBatches [][]string
if c.config.GetMultiBatchSize > 0 {
batch := len(keys)/c.config.GetMultiBatchSize + 1
keysBatches = make([][]string, batch)
var idx int
for _, v := range keys {
keysBatches[idx/c.config.GetMultiBatchSize] = append(keysBatches[idx/c.config.GetMultiBatchSize], v)
idx++
}
} else {
keysBatches = [][]string{keys}
}
results := make(map[string][]byte, len(keys))
var mu sync.Mutex
g, ctx := errgroup.WithContext(ctx)
for _, v := range keysBatches {
v := v
// Wait until we get a free slot from the gate, if the max
// concurrency should be enforced.
if c.config.MaxGetMultiConcurrency > 0 {
if err := c.getMultiGate.Start(ctx); err != nil {
level.Warn(c.logger).Log("msg", "getMultiGate err", "err", err, "items", len(keys))
return nil
}
err := doWithBatch(ctx, len(keys), c.config.GetMultiBatchSize, c.getMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
resp, err := c.MGet(ctx, currentKeys...).Result()
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resp))
return nil
}
g.Go(func() error {
if c.config.MaxGetMultiConcurrency > 0 {
defer c.getMultiGate.Done()
}
resp, err := c.MGet(ctx, v...).Result()
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resp))
return nil
}
mu.Lock()
defer mu.Unlock()
for i := 0; i < len(resp); i++ {
key := v[i]
switch val := resp[i].(type) {
case string:
results[key] = []byte(val)
case nil: // miss
default:
level.Warn(c.logger).Log("msg",
fmt.Sprintf("unexpected redis mget result type:%T %v", resp[i], resp[i]))
}
mu.Lock()
defer mu.Unlock()
for i := 0; i < len(resp); i++ {
key := currentKeys[i]
switch val := resp[i].(type) {
case string:
results[key] = []byte(val)
case nil: // miss
default:
level.Warn(c.logger).Log("msg",
fmt.Sprintf("unexpected redis mget result type:%T %v", resp[i], resp[i]))
}
return nil
})
}
if err := g.Wait(); err != nil {
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(keys))
return nil
}
Expand All @@ -315,6 +261,36 @@ func (c *RedisClient) Stop() {
}
}

// doWithBatch do func with batch and gate. batchSize==0 means one batch. gate==nil means no gate.
func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate, f func(startIndex, endIndex int) error) error {
if totalSize == 0 {
return nil
}
if batchSize <= 0 {
return f(0, totalSize)
}
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < totalSize; i += batchSize {
j := i + batchSize
if j > totalSize {
j = totalSize
}
if ga != nil {
if err := ga.Start(ctx); err != nil {
return nil
}
}
startIndex, endIndex := i, j
g.Go(func() error {
if ga != nil {
defer ga.Done()
}
return f(startIndex, endIndex)
})
}
return g.Wait()
}

// parseRedisClientConfig unmarshals a buffer into a RedisClientConfig with default values.
func parseRedisClientConfig(conf []byte) (RedisClientConfig, error) {
config := DefaultRedisClientConfig
Expand Down

0 comments on commit 801384f

Please sign in to comment.