Skip to content

Commit

Permalink
speedup fdb (#3860)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Jun 29, 2023
1 parent c6914c4 commit 7442e7c
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 18 deletions.
7 changes: 5 additions & 2 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type kvtxn interface {
scan(begin, end []byte, keysOnly bool, handler func(k, v []byte) bool)
exist(prefix []byte) bool
set(key, value []byte)
append(key []byte, value []byte) []byte
append(key []byte, value []byte)
incrBy(key []byte, value int64) int64
delete(key []byte)
}
Expand Down Expand Up @@ -3232,9 +3232,11 @@ func (m *kvMeta) LoadMeta(r io.Reader) error {
go func() {
defer wg.Done()
var buffer []*pair
var total int
for p := range kv {
buffer = append(buffer, p)
if len(buffer) >= batch {
total += len(p.key) + len(p.value)
if len(buffer) >= batch || total > 5<<20 {
err := m.txn(func(tx *kvTxn) error {
for _, p := range buffer {
tx.set(p.key, p.value)
Expand All @@ -3245,6 +3247,7 @@ func (m *kvMeta) LoadMeta(r io.Reader) error {
logger.Fatalf("write %d pairs: %s", len(buffer), err)
}
buffer = buffer[:0]
total = 0
}
}
if len(buffer) > 0 {
Expand Down
3 changes: 1 addition & 2 deletions pkg/meta/tkv_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,9 @@ func (tx *badgerTxn) set(key, value []byte) {
}
}

func (tx *badgerTxn) append(key []byte, value []byte) []byte {
func (tx *badgerTxn) append(key []byte, value []byte) {
list := append(tx.get(key), value...)
tx.set(key, list)
return list
}

func (tx *badgerTxn) incrBy(key []byte, value int64) int64 {
Expand Down
3 changes: 1 addition & 2 deletions pkg/meta/tkv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,9 @@ func (tx *etcdTxn) set(key, value []byte) {
}
}

func (tx *etcdTxn) append(key []byte, value []byte) []byte {
func (tx *etcdTxn) append(key []byte, value []byte) {
new := append(tx.get(key), value...)
tx.set(key, new)
return new
}

func (tx *etcdTxn) incrBy(key []byte, value int64) int64 {
Expand Down
16 changes: 10 additions & 6 deletions pkg/meta/tkv_fdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func newFdbClient(addr string) (tkvClient, error) {
if err != nil {
return nil, fmt.Errorf("open database: %s", err)
}
// TODO: database options
return withPrefix(&fdbClient{db}, append([]byte(u.Query().Get("prefix")), 0xFD)), nil
}

Expand All @@ -74,6 +75,7 @@ func (c *fdbClient) scan(prefix []byte, handler func(key, value []byte)) error {
var done bool
for {
if _, err := c.client.ReadTransact(func(t fdb.ReadTransaction) (interface{}, error) {
// TODO: t.Options().SetPriorityBatch()
snapshot := t.Snapshot()
iter := snapshot.GetRange(
fdb.KeyRange{Begin: begin, End: end},
Expand Down Expand Up @@ -113,7 +115,6 @@ func (c *fdbClient) reset(prefix []byte) error {
}

func (c *fdbClient) close() error {
// c = &fdbClient{}
return nil
}

Expand All @@ -128,10 +129,13 @@ func (tx *fdbTxn) get(key []byte) []byte {
}

func (tx *fdbTxn) gets(keys ...[]byte) [][]byte {
ret := make([][]byte, len(keys))
fut := make([]fdb.FutureByteSlice, len(keys))
for i, key := range keys {
val := tx.Get(fdb.Key(key)).MustGet()
ret[i] = val
fut[i] = tx.Get(fdb.Key(key))
}
ret := make([][]byte, len(keys))
for i, f := range fut {
ret[i] = f.MustGet()
}
return ret
}
Expand All @@ -158,13 +162,13 @@ func (tx *fdbTxn) set(key, value []byte) {
tx.Set(fdb.Key(key), value)
}

func (tx *fdbTxn) append(key []byte, value []byte) []byte {
func (tx *fdbTxn) append(key []byte, value []byte) {
tx.AppendIfFits(fdb.Key(key), fdb.Key(value))
return tx.Get(fdb.Key(key)).MustGet()
}

func (tx *fdbTxn) incrBy(key []byte, value int64) int64 {
tx.Add(fdb.Key(key), packCounter(value))
// TODO: don't return new value if not needed
return parseCounter(tx.Get(fdb.Key(key)).MustGet())
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/meta/tkv_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ func (tx *memTxn) set(key, value []byte) {
tx.buffer[string(key)] = value
}

func (tx *memTxn) append(key []byte, value []byte) []byte {
func (tx *memTxn) append(key []byte, value []byte) {
new := append(tx.get(key), value...)
tx.set(key, new)
return new
}

func (tx *memTxn) incrBy(key []byte, value int64) int64 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/tkv_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (tx *prefixTxn) set(key, value []byte) {
tx.kvTxn.set(tx.realKey(key), value)
}

func (tx *prefixTxn) append(key []byte, value []byte) []byte {
return tx.kvTxn.append(tx.realKey(key), value)
func (tx *prefixTxn) append(key []byte, value []byte) {
tx.kvTxn.append(tx.realKey(key), value)
}

func (tx *prefixTxn) incrBy(key []byte, value int64) int64 {
Expand Down
3 changes: 1 addition & 2 deletions pkg/meta/tkv_tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ func (tx *tikvTxn) set(key, value []byte) {
}
}

func (tx *tikvTxn) append(key []byte, value []byte) []byte {
func (tx *tikvTxn) append(key []byte, value []byte) {
new := append(tx.get(key), value...)
tx.set(key, new)
return new
}

func (tx *tikvTxn) incrBy(key []byte, value int64) int64 {
Expand Down

0 comments on commit 7442e7c

Please sign in to comment.