diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 666ba8f1694a..f5150fcec5c4 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -46,7 +46,7 @@ type kvtxn interface { scan(begin, end []byte, keysOnly bool, handler func(k, v []byte) bool) exist(prefix []byte) bool set(key, value []byte) - append(key []byte, value []byte) []byte + append(key []byte, value []byte) incrBy(key []byte, value int64) int64 delete(key []byte) } @@ -3232,9 +3232,11 @@ func (m *kvMeta) LoadMeta(r io.Reader) error { go func() { defer wg.Done() var buffer []*pair + var total int for p := range kv { buffer = append(buffer, p) - if len(buffer) >= batch { + total += len(p.key) + len(p.value) + if len(buffer) >= batch || total > 5<<20 { err := m.txn(func(tx *kvTxn) error { for _, p := range buffer { tx.set(p.key, p.value) @@ -3245,6 +3247,7 @@ func (m *kvMeta) LoadMeta(r io.Reader) error { logger.Fatalf("write %d pairs: %s", len(buffer), err) } buffer = buffer[:0] + total = 0 } } if len(buffer) > 0 { diff --git a/pkg/meta/tkv_badger.go b/pkg/meta/tkv_badger.go index 69d2f300a4b5..432b692ba48d 100644 --- a/pkg/meta/tkv_badger.go +++ b/pkg/meta/tkv_badger.go @@ -112,10 +112,9 @@ func (tx *badgerTxn) set(key, value []byte) { } } -func (tx *badgerTxn) append(key []byte, value []byte) []byte { +func (tx *badgerTxn) append(key []byte, value []byte) { list := append(tx.get(key), value...) tx.set(key, list) - return list } func (tx *badgerTxn) incrBy(key []byte, value int64) int64 { diff --git a/pkg/meta/tkv_etcd.go b/pkg/meta/tkv_etcd.go index 75534546ce85..63dad8189ed0 100644 --- a/pkg/meta/tkv_etcd.go +++ b/pkg/meta/tkv_etcd.go @@ -151,10 +151,9 @@ func (tx *etcdTxn) set(key, value []byte) { } } -func (tx *etcdTxn) append(key []byte, value []byte) []byte { +func (tx *etcdTxn) append(key []byte, value []byte) { new := append(tx.get(key), value...) tx.set(key, new) - return new } func (tx *etcdTxn) incrBy(key []byte, value int64) int64 { diff --git a/pkg/meta/tkv_fdb.go b/pkg/meta/tkv_fdb.go index f98752ec300e..901117e66494 100644 --- a/pkg/meta/tkv_fdb.go +++ b/pkg/meta/tkv_fdb.go @@ -52,6 +52,7 @@ func newFdbClient(addr string) (tkvClient, error) { if err != nil { return nil, fmt.Errorf("open database: %s", err) } + // TODO: database options return withPrefix(&fdbClient{db}, append([]byte(u.Query().Get("prefix")), 0xFD)), nil } @@ -74,6 +75,7 @@ func (c *fdbClient) scan(prefix []byte, handler func(key, value []byte)) error { var done bool for { if _, err := c.client.ReadTransact(func(t fdb.ReadTransaction) (interface{}, error) { + // TODO: t.Options().SetPriorityBatch() snapshot := t.Snapshot() iter := snapshot.GetRange( fdb.KeyRange{Begin: begin, End: end}, @@ -113,7 +115,6 @@ func (c *fdbClient) reset(prefix []byte) error { } func (c *fdbClient) close() error { - // c = &fdbClient{} return nil } @@ -128,10 +129,13 @@ func (tx *fdbTxn) get(key []byte) []byte { } func (tx *fdbTxn) gets(keys ...[]byte) [][]byte { - ret := make([][]byte, len(keys)) + fut := make([]fdb.FutureByteSlice, len(keys)) for i, key := range keys { - val := tx.Get(fdb.Key(key)).MustGet() - ret[i] = val + fut[i] = tx.Get(fdb.Key(key)) + } + ret := make([][]byte, len(keys)) + for i, f := range fut { + ret[i] = f.MustGet() } return ret } @@ -158,13 +162,13 @@ func (tx *fdbTxn) set(key, value []byte) { tx.Set(fdb.Key(key), value) } -func (tx *fdbTxn) append(key []byte, value []byte) []byte { +func (tx *fdbTxn) append(key []byte, value []byte) { tx.AppendIfFits(fdb.Key(key), fdb.Key(value)) - return tx.Get(fdb.Key(key)).MustGet() } func (tx *fdbTxn) incrBy(key []byte, value int64) int64 { tx.Add(fdb.Key(key), packCounter(value)) + // TODO: don't return new value if not needed return parseCounter(tx.Get(fdb.Key(key)).MustGet()) } diff --git a/pkg/meta/tkv_mem.go b/pkg/meta/tkv_mem.go index 11e5808b27e5..e7b2b3ebcc14 100644 --- a/pkg/meta/tkv_mem.go +++ b/pkg/meta/tkv_mem.go @@ -131,10 +131,9 @@ func (tx *memTxn) set(key, value []byte) { tx.buffer[string(key)] = value } -func (tx *memTxn) append(key []byte, value []byte) []byte { +func (tx *memTxn) append(key []byte, value []byte) { new := append(tx.get(key), value...) tx.set(key, new) - return new } func (tx *memTxn) incrBy(key []byte, value int64) int64 { diff --git a/pkg/meta/tkv_prefix.go b/pkg/meta/tkv_prefix.go index 5bfe85d66851..16ddd3738960 100644 --- a/pkg/meta/tkv_prefix.go +++ b/pkg/meta/tkv_prefix.go @@ -60,8 +60,8 @@ func (tx *prefixTxn) set(key, value []byte) { tx.kvTxn.set(tx.realKey(key), value) } -func (tx *prefixTxn) append(key []byte, value []byte) []byte { - return tx.kvTxn.append(tx.realKey(key), value) +func (tx *prefixTxn) append(key []byte, value []byte) { + tx.kvTxn.append(tx.realKey(key), value) } func (tx *prefixTxn) incrBy(key []byte, value int64) int64 { diff --git a/pkg/meta/tkv_tikv.go b/pkg/meta/tkv_tikv.go index 4804eb2a8e3e..c682bd7095be 100644 --- a/pkg/meta/tkv_tikv.go +++ b/pkg/meta/tkv_tikv.go @@ -146,10 +146,9 @@ func (tx *tikvTxn) set(key, value []byte) { } } -func (tx *tikvTxn) append(key []byte, value []byte) []byte { +func (tx *tikvTxn) append(key []byte, value []byte) { new := append(tx.get(key), value...) tx.set(key, new) - return new } func (tx *tikvTxn) incrBy(key []byte, value int64) int64 {