Skip to content

Commit

Permalink
POC for caching range response
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeng zhang committed Nov 10, 2021
1 parent 72d3e38 commit 264a756
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 14 deletions.
2 changes: 1 addition & 1 deletion etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (sws *serverWatchStream) sendLoop() {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
events[i].PrevKv = r.KVs[0]
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
resp.PrevKv = rr.KVs[0]
}
}

Expand All @@ -245,7 +245,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
if rr != nil {
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.PrevKvs[i] = &rr.KVs[i]
resp.PrevKvs[i] = rr.KVs[i]
}
}
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
resp.Kvs[i] = rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
Expand Down Expand Up @@ -463,7 +463,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
// nil == empty string in grpc; no way to represent missing value
return false
}
return compareKV(c, mvccpb.KeyValue{})
return compareKV(c, &mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
Expand All @@ -473,7 +473,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
return true
}

func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
func compareKV(c *pb.Compare, ckv *mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
Expand Down Expand Up @@ -868,7 +868,7 @@ func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantRes
return resp, err
}

type kvSort struct{ kvs []mvccpb.KeyValue }
type kvSort struct{ kvs []*mvccpb.KeyValue }

func (s *kvSort) Swap(i, j int) {
t := s.kvs[i]
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
j := 0
for i := range rr.KVs {
rr.KVs[j] = rr.KVs[i]
if !isPrunable(&rr.KVs[i]) {
if !isPrunable(rr.KVs[i]) {
j++
}
}
Expand Down
2 changes: 1 addition & 1 deletion mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type RangeOptions struct {
}

type RangeResult struct {
KVs []mvccpb.KeyValue
KVs []*mvccpb.KeyValue
Rev int64
Count int
}
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func TestKVRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
tt(s)
var kvss [][]mvccpb.KeyValue
var kvss [][]*mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
r, _ := s.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k})
kvss = append(kvss, r.KVs)
Expand All @@ -652,7 +652,7 @@ func TestKVRestore(t *testing.T) {

// wait for possible compaction to finish
testutil.WaitSchedule()
var nkvss [][]mvccpb.KeyValue
var nkvss [][]*mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
r, _ := ns.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k})
nkvss = append(nkvss, r.KVs)
Expand Down
5 changes: 5 additions & 0 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type store struct {

ig ConsistentIndexGetter

cache map[revision]mvccpb.KeyValue
clock sync.RWMutex

b backend.Backend
kvindex index

Expand Down Expand Up @@ -125,6 +128,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
b: b,
ig: ig,
kvindex: newTreeIndex(lg),
cache: map[revision]mvccpb.KeyValue{},

This comment has been minimized.

Copy link
@chaochn47

chaochn47 Nov 11, 2021

IMHO, another "hot" key to revision mapping to reduce the size of map is needed.

This comment has been minimized.

Copy link
@zyfo2

zyfo2 Nov 21, 2021

Owner

yeah, but assume this is the overwhelming part for mem usage? key index won't take too much memory, right?

clock: sync.RWMutex{},

le: le,

Expand Down
19 changes: 17 additions & 2 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
"strconv"
)

type storeTxnRead struct {
Expand Down Expand Up @@ -140,9 +141,16 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
limit = len(revpairs)
}

kvs := make([]mvccpb.KeyValue, limit)
kvs := make([]*mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
tr.s.clock.RLock()
v, exists := tr.s.cache[revpair]
tr.s.clock.RUnlock()
if exists {
kvs[i] = &v
continue
}
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 {
Expand All @@ -156,7 +164,8 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
x := mvccpb.KeyValue{}
if err := x.Unmarshal(vs[0]); err != nil {
if tr.s.lg != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
Expand All @@ -165,6 +174,12 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
} else {
kvs[i] = &x
tr.s.clock.Lock()

This comment has been minimized.

Copy link
@chaochn47

chaochn47 Nov 11, 2021

Looks like the read write lock will degrade the read concurrency. I think creating a copy of the cache whenever open a read transaction concurrently will reduce the conflicts but the memory overhead evaluation is needed.

This comment has been minimized.

Copy link
@zyfo2

zyfo2 Nov 21, 2021

Owner

yeah, this is just POC.

one idea to optimize can be using rw lock, and we won't update cache frequently so should be contention free for reads.

another idea is to do check on request key level so we won't need to lock for every RV

tr.s.cache[revpair] = x
tr.s.clock.Unlock()
plog.Warning("caching " + strconv.Itoa(i))
}
}
tr.trace.Step("range keys from bolt db")
Expand Down
2 changes: 1 addition & 1 deletion tools/benchmark/cmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func putFunc(cmd *cobra.Command, args []string) {
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
requests <- v3.OpPut("a"+string(k), v)
}
close(requests)
}()
Expand Down
1 change: 1 addition & 0 deletions tools/benchmark/cmd/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {
}

k := args[0]

end := ""
if len(args) == 2 {
end = args[1]
Expand Down

0 comments on commit 264a756

Please sign in to comment.