From 122f9f85a8ac3760cb3b63d5d9918ae7fef600b9 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 20 Apr 2024 15:53:32 +0700 Subject: [PATCH] mdbx: `Batch()` (#9999) This task is mostly implemented to be used in `erigon/erigon-lib/downloader/mdbx_piece_completion.go` and maybe in `nodesDB` (where we need many parallel RwTx) I was agains adding this "trick"/"api" last years, because thought that we can implement our App to be more 1-big-rwtx-friendly. And we did it in Erigon - StagedSync. TxPool also did, but with a bit less happy face - by "map+mutex with periodic flush to db". But `anacrolix/torrent` is external library and unlikely will survive such big mind-model-change. Maybe it's time to add `db.Batch()`. #### Batch Rw transactions Each `DB.Update()` waits for disk to commit the writes. This overhead can be minimized by combining multiple updates with the `DB.Batch()` function: ```go err := db.Batch(func(tx *bolt.Tx) error { ... return nil }) ``` Concurrent Batch calls are opportunistically combined into larger transactions. Batch is only useful when there are multiple goroutines calling it. The trade-off is that `Batch` can call the given function multiple times, if parts of the transaction fail. The function must be idempotent and side effects must take effect only after a successful return from `DB.Batch()`. For example: don't display messages from inside the function, instead set variables in the enclosing scope: ```go var id uint64 err := db.Batch(func(tx *bolt.Tx) error { // Find last key in bucket, decode as bigendian uint64, increment // by one, encode back to []byte, and add new key. ... id = newValue return nil }) if err != nil { return ... } fmt.Println("Allocated ID %d", id) ``` ---- Implementation mostly taken from https://github.com/etcd-io/bbolt/?tab=readme-ov-file#batch-read-write-transactions Maybe in future can push-down it to https://github.com/erigontech/mdbx-go --- cmd/snapshots/sync/sync.go | 3 +- erigon-lib/downloader/downloader.go | 1 + .../downloader/mdbx_piece_completion.go | 57 ++++++ erigon-lib/kv/mdbx/kv_mdbx.go | 156 ++++++++++++++ erigon-lib/kv/mdbx/kv_mdbx_test.go | 192 +++++++++++++++++- 5 files changed, 406 insertions(+), 3 deletions(-) diff --git a/cmd/snapshots/sync/sync.go b/cmd/snapshots/sync/sync.go index 6af76249938..165d0e9b63e 100644 --- a/cmd/snapshots/sync/sync.go +++ b/cmd/snapshots/sync/sync.go @@ -8,7 +8,6 @@ import ( "os" "path/filepath" "regexp" - "runtime" "strconv" "strings" "time" @@ -234,7 +233,7 @@ func NewTorrentClient(config CreateNewTorrentClientConfig) (*TorrentClient, erro cfg.ClientConfig.DataDir = torrentDir - cfg.ClientConfig.PieceHashersPerTorrent = 32 * runtime.NumCPU() + cfg.ClientConfig.PieceHashersPerTorrent = 32 cfg.ClientConfig.DisableIPv6 = config.DisableIPv6 cfg.ClientConfig.DisableIPv4 = config.DisableIPv4 diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 471fc0d6f6f..53cc0718136 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2540,6 +2540,7 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err) } c, err = NewMdbxPieceCompletion(db) + //c, err = NewMdbxPieceCompletionBatch(db) if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err) } diff --git a/erigon-lib/downloader/mdbx_piece_completion.go b/erigon-lib/downloader/mdbx_piece_completion.go index 6f209cd19c9..32e38ea4d2d 100644 --- a/erigon-lib/downloader/mdbx_piece_completion.go +++ b/erigon-lib/downloader/mdbx_piece_completion.go @@ -24,6 +24,7 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/types/infohash" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" ) const ( @@ -115,3 +116,59 @@ func (m *mdbxPieceCompletion) Close() error { m.db.Close() return nil } + +type mdbxPieceCompletionBatch struct { + db *mdbx.MdbxKV +} + +var _ storage.PieceCompletion = (*mdbxPieceCompletionBatch)(nil) + +func NewMdbxPieceCompletionBatch(db kv.RwDB) (ret storage.PieceCompletion, err error) { + ret = &mdbxPieceCompletionBatch{db: db.(*mdbx.MdbxKV)} + return +} + +func (m *mdbxPieceCompletionBatch) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) { + err = m.db.View(context.Background(), func(tx kv.Tx) error { + var key [infohash.Size + 4]byte + copy(key[:], pk.InfoHash[:]) + binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index)) + cn.Ok = true + v, err := tx.GetOne(kv.BittorrentCompletion, key[:]) + if err != nil { + return err + } + switch string(v) { + case complete: + cn.Complete = true + case incomplete: + cn.Complete = false + default: + cn.Ok = false + } + return nil + }) + return +} + +func (m *mdbxPieceCompletionBatch) Set(pk metainfo.PieceKey, b bool) error { + if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b { + return nil + } + var key [infohash.Size + 4]byte + copy(key[:], pk.InfoHash[:]) + binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index)) + + v := []byte(incomplete) + if b { + v = []byte(complete) + } + return m.db.Batch(func(tx kv.RwTx) error { + return tx.Put(kv.BittorrentCompletion, key[:], v) + }) +} + +func (m *mdbxPieceCompletionBatch) Close() error { + m.db.Close() + return nil +} diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index 6a71016bbb9..134b18e2671 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "os" "path/filepath" @@ -404,6 +405,9 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) { txsAllDoneOnCloseCond: sync.NewCond(txsCountMutex), leakDetector: dbg.NewLeakDetector("db."+opts.label.String(), dbg.SlowTx()), + + MaxBatchSize: DefaultMaxBatchSize, + MaxBatchDelay: DefaultMaxBatchDelay, } customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg) @@ -478,6 +482,158 @@ type MdbxKV struct { txsAllDoneOnCloseCond *sync.Cond leakDetector *dbg.LeakDetector + + // MaxBatchSize is the maximum size of a batch. Default value is + // copied from DefaultMaxBatchSize in Open. + // + // If <=0, disables batching. + // + // Do not change concurrently with calls to Batch. + MaxBatchSize int + + // MaxBatchDelay is the maximum delay before a batch starts. + // Default value is copied from DefaultMaxBatchDelay in Open. + // + // If <=0, effectively disables batching. + // + // Do not change concurrently with calls to Batch. + MaxBatchDelay time.Duration + + batchMu sync.Mutex + batch *batch +} + +// Default values if not set in a DB instance. +const ( + DefaultMaxBatchSize int = 1000 + DefaultMaxBatchDelay = 10 * time.Millisecond +) + +type batch struct { + db *MdbxKV + timer *time.Timer + start sync.Once + calls []call +} + +type call struct { + fn func(kv.RwTx) error + err chan<- error +} + +// trigger runs the batch if it hasn't already been run. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run performs the transactions in the batch and communicates results +// back to DB.Batch. +func (b *batch) run() { + b.db.batchMu.Lock() + b.timer.Stop() + // Make sure no new work is added to this batch, but don't break + // other batches. + if b.db.batch == b { + b.db.batch = nil + } + b.db.batchMu.Unlock() + +retry: + for len(b.calls) > 0 { + var failIdx = -1 + err := b.db.Update(context.Background(), func(tx kv.RwTx) error { + for i, c := range b.calls { + if err := safelyCall(c.fn, tx); err != nil { + failIdx = i + return err + } + } + return nil + }) + + if failIdx >= 0 { + // take the failing transaction out of the batch. it's + // safe to shorten b.calls here because db.batch no longer + // points to us, and we hold the mutex anyway. + c := b.calls[failIdx] + b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] + // tell the submitter re-run it solo, continue with the rest of the batch + c.err <- trySolo + continue retry + } + + // pass success, or bolt internal errors, to all callers + for _, c := range b.calls { + c.err <- err + } + break retry + } +} + +// trySolo is a special sentinel error value used for signaling that a +// transaction function should be re-run. It should never be seen by +// callers. +var trySolo = errors.New("batch function returned an error and should be re-run solo") + +type panicked struct { + reason interface{} +} + +func (p panicked) Error() string { + if err, ok := p.reason.(error); ok { + return err.Error() + } + return fmt.Sprintf("panic: %v", p.reason) +} + +func safelyCall(fn func(tx kv.RwTx) error, tx kv.RwTx) (err error) { + defer func() { + if p := recover(); p != nil { + err = panicked{p} + } + }() + return fn(tx) +} + +// Batch is only useful when there are multiple goroutines calling it. +// It behaves similar to Update, except: +// +// 1. concurrent Batch calls can be combined into a single RwTx. +// +// 2. the function passed to Batch may be called multiple times, +// regardless of whether it returns error or not. +// +// This means that Batch function side effects must be idempotent and +// take permanent effect only after a successful return is seen in +// caller. +// +// Example of bad side-effects: print messages, mutate external counters `i++` +// +// The maximum batch size and delay can be adjusted with DB.MaxBatchSize +// and DB.MaxBatchDelay, respectively. +func (db *MdbxKV) Batch(fn func(tx kv.RwTx) error) error { + errCh := make(chan error, 1) + + db.batchMu.Lock() + if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) { + // There is no existing batch, or the existing batch is full; start a new one. + db.batch = &batch{ + db: db, + } + db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) + } + db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) + if len(db.batch.calls) >= db.MaxBatchSize { + // wake up batch, it's ready to run + go db.batch.trigger() + } + db.batchMu.Unlock() + + err := <-errCh + if errors.Is(err, trySolo) { + err = db.Update(context.Background(), fn) + } + return err } func (db *MdbxKV) Path() string { return db.opts.path } diff --git a/erigon-lib/kv/mdbx/kv_mdbx_test.go b/erigon-lib/kv/mdbx/kv_mdbx_test.go index 66506ef720f..d30d8a5624d 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx_test.go +++ b/erigon-lib/kv/mdbx/kv_mdbx_test.go @@ -18,6 +18,8 @@ package mdbx import ( "context" + "encoding/binary" + "errors" "sync/atomic" "testing" "time" @@ -31,7 +33,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/order" ) -func BaseCase(t *testing.T) (kv.RwDB, kv.RwTx, kv.RwCursorDupSort) { +func BaseCaseDB(t *testing.T) kv.RwDB { t.Helper() path := t.TempDir() logger := log.New() @@ -43,6 +45,13 @@ func BaseCase(t *testing.T) (kv.RwDB, kv.RwTx, kv.RwCursorDupSort) { } }).MapSize(128 * datasize.MB).MustOpen() t.Cleanup(db.Close) + return db +} + +func BaseCase(t *testing.T) (kv.RwDB, kv.RwTx, kv.RwCursorDupSort) { + t.Helper() + db := BaseCaseDB(t) + table := "Table" tx, err := db.BeginRw(context.Background()) require.NoError(t, err) @@ -897,3 +906,184 @@ func TestCloseWaitsAfterTxBegin(t *testing.T) { ) }) } + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// Ensure two functions can perform updates in a single batch. +func TestDB_Batch(t *testing.T) { + _db := BaseCaseDB(t) + table := "Table" + db := _db.(*MdbxKV) + + // Iterate over multiple updates in separate goroutines. + n := 2 + ch := make(chan error, n) + for i := 0; i < n; i++ { + go func(i int) { + ch <- db.Batch(func(tx kv.RwTx) error { + return tx.Put(table, u64tob(uint64(i)), []byte{}) + }) + }(i) + } + + // Check all responses to make sure there's no error. + for i := 0; i < n; i++ { + if err := <-ch; err != nil { + t.Fatal(err) + } + } + + // Ensure data is correct. + if err := db.View(context.Background(), func(tx kv.Tx) error { + for i := 0; i < n; i++ { + v, err := tx.GetOne(table, u64tob(uint64(i))) + if err != nil { + panic(err) + } + if v == nil { + t.Errorf("key not found: %d", i) + } + } + return nil + }); err != nil { + t.Fatal(err) + } +} + +func TestDB_Batch_Panic(t *testing.T) { + _db := BaseCaseDB(t) + db := _db.(*MdbxKV) + + var sentinel int + var bork = &sentinel + var problem interface{} + var err error + + // Execute a function inside a batch that panics. + func() { + defer func() { + if p := recover(); p != nil { + problem = p + } + }() + err = db.Batch(func(tx kv.RwTx) error { + panic(bork) + }) + }() + + // Verify there is no error. + if g, e := err, error(nil); !errors.Is(g, e) { + t.Fatalf("wrong error: %v != %v", g, e) + } + // Verify the panic was captured. + if g, e := problem, bork; g != e { + t.Fatalf("wrong error: %v != %v", g, e) + } +} + +func TestDB_BatchFull(t *testing.T) { + _db := BaseCaseDB(t) + table := "Table" + db := _db.(*MdbxKV) + + const size = 3 + // buffered so we never leak goroutines + ch := make(chan error, size) + put := func(i int) { + ch <- db.Batch(func(tx kv.RwTx) error { + return tx.Put(table, u64tob(uint64(i)), []byte{}) + }) + } + + db.MaxBatchSize = size + // high enough to never trigger here + db.MaxBatchDelay = 1 * time.Hour + + go put(1) + go put(2) + + // Give the batch a chance to exhibit bugs. + time.Sleep(10 * time.Millisecond) + + // not triggered yet + select { + case <-ch: + t.Fatalf("batch triggered too early") + default: + } + + go put(3) + + // Check all responses to make sure there's no error. + for i := 0; i < size; i++ { + if err := <-ch; err != nil { + t.Fatal(err) + } + } + + // Ensure data is correct. + if err := db.View(context.Background(), func(tx kv.Tx) error { + for i := 1; i <= size; i++ { + v, err := tx.GetOne(table, u64tob(uint64(i))) + if err != nil { + panic(err) + } + if v == nil { + t.Errorf("key not found: %d", i) + } + } + return nil + }); err != nil { + t.Fatal(err) + } +} + +func TestDB_BatchTime(t *testing.T) { + _db := BaseCaseDB(t) + table := "Table" + db := _db.(*MdbxKV) + + const size = 1 + // buffered so we never leak goroutines + ch := make(chan error, size) + put := func(i int) { + ch <- db.Batch(func(tx kv.RwTx) error { + return tx.Put(table, u64tob(uint64(i)), []byte{}) + }) + } + + db.MaxBatchSize = 1000 + db.MaxBatchDelay = 0 + + go put(1) + + // Batch must trigger by time alone. + + // Check all responses to make sure there's no error. + for i := 0; i < size; i++ { + if err := <-ch; err != nil { + t.Fatal(err) + } + } + + // Ensure data is correct. + if err := db.View(context.Background(), func(tx kv.Tx) error { + for i := 1; i <= size; i++ { + v, err := tx.GetOne(table, u64tob(uint64(i))) + if err != nil { + return err + } + if v == nil { + t.Errorf("key not found: %d", i) + } + } + return nil + }); err != nil { + t.Fatal(err) + } +}