Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mdbx: Batch() #9999

Merged
merged 12 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/snapshots/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -234,7 +233,7 @@ func NewTorrentClient(config CreateNewTorrentClientConfig) (*TorrentClient, erro

cfg.ClientConfig.DataDir = torrentDir

cfg.ClientConfig.PieceHashersPerTorrent = 32 * runtime.NumCPU()
cfg.ClientConfig.PieceHashersPerTorrent = 32
cfg.ClientConfig.DisableIPv6 = config.DisableIPv6
cfg.ClientConfig.DisableIPv4 = config.DisableIPv4

Expand Down
1 change: 1 addition & 0 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,7 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err)
}
c, err = NewMdbxPieceCompletion(db)
//c, err = NewMdbxPieceCompletionBatch(db)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err)
}
Expand Down
57 changes: 57 additions & 0 deletions erigon-lib/downloader/mdbx_piece_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
)

const (
Expand Down Expand Up @@ -115,3 +116,59 @@ func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}

type mdbxPieceCompletionBatch struct {
db *mdbx.MdbxKV
}

var _ storage.PieceCompletion = (*mdbxPieceCompletionBatch)(nil)

func NewMdbxPieceCompletionBatch(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletionBatch{db: db.(*mdbx.MdbxKV)}
return
}

func (m *mdbxPieceCompletionBatch) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
cn.Ok = true
v, err := tx.GetOne(kv.BittorrentCompletion, key[:])
if err != nil {
return err
}
switch string(v) {
case complete:
cn.Complete = true
case incomplete:
cn.Complete = false
default:
cn.Ok = false
}
return nil
})
return
}

func (m *mdbxPieceCompletionBatch) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))

v := []byte(incomplete)
if b {
v = []byte(complete)
}
return m.db.Batch(func(tx kv.RwTx) error {
return tx.Put(kv.BittorrentCompletion, key[:], v)
})
}

func (m *mdbxPieceCompletionBatch) Close() error {
m.db.Close()
return nil
}
156 changes: 156 additions & 0 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -404,6 +405,9 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) {
txsAllDoneOnCloseCond: sync.NewCond(txsCountMutex),

leakDetector: dbg.NewLeakDetector("db."+opts.label.String(), dbg.SlowTx()),

MaxBatchSize: DefaultMaxBatchSize,
MaxBatchDelay: DefaultMaxBatchDelay,
}

customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg)
Expand Down Expand Up @@ -478,6 +482,158 @@ type MdbxKV struct {
txsAllDoneOnCloseCond *sync.Cond

leakDetector *dbg.LeakDetector

// MaxBatchSize is the maximum size of a batch. Default value is
// copied from DefaultMaxBatchSize in Open.
//
// If <=0, disables batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchSize int

// MaxBatchDelay is the maximum delay before a batch starts.
// Default value is copied from DefaultMaxBatchDelay in Open.
//
// If <=0, effectively disables batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchDelay time.Duration

batchMu sync.Mutex
batch *batch
}

// Default values if not set in a DB instance.
const (
DefaultMaxBatchSize int = 1000
DefaultMaxBatchDelay = 10 * time.Millisecond
)

type batch struct {
db *MdbxKV
timer *time.Timer
start sync.Once
calls []call
}

type call struct {
fn func(kv.RwTx) error
err chan<- error
}

// trigger runs the batch if it hasn't already been run.
func (b *batch) trigger() {
b.start.Do(b.run)
}

// run performs the transactions in the batch and communicates results
// back to DB.Batch.
func (b *batch) run() {
b.db.batchMu.Lock()
b.timer.Stop()
// Make sure no new work is added to this batch, but don't break
// other batches.
if b.db.batch == b {
b.db.batch = nil
}
b.db.batchMu.Unlock()

retry:
for len(b.calls) > 0 {
var failIdx = -1
err := b.db.Update(context.Background(), func(tx kv.RwTx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
failIdx = i
return err
}
}
return nil
})

if failIdx >= 0 {
// take the failing transaction out of the batch. it's
// safe to shorten b.calls here because db.batch no longer
// points to us, and we hold the mutex anyway.
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// tell the submitter re-run it solo, continue with the rest of the batch
c.err <- trySolo
continue retry
}

// pass success, or bolt internal errors, to all callers
for _, c := range b.calls {
c.err <- err
}
break retry
}
}

// trySolo is a special sentinel error value used for signaling that a
// transaction function should be re-run. It should never be seen by
// callers.
var trySolo = errors.New("batch function returned an error and should be re-run solo")

type panicked struct {
reason interface{}
}

func (p panicked) Error() string {
if err, ok := p.reason.(error); ok {
return err.Error()
}
return fmt.Sprintf("panic: %v", p.reason)
}

func safelyCall(fn func(tx kv.RwTx) error, tx kv.RwTx) (err error) {
defer func() {
if p := recover(); p != nil {
err = panicked{p}
}
}()
return fn(tx)
}

// Batch is only useful when there are multiple goroutines calling it.
// It behaves similar to Update, except:
//
// 1. concurrent Batch calls can be combined into a single RwTx.
//
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
//
// This means that Batch function side effects must be idempotent and
// take permanent effect only after a successful return is seen in
// caller.
//
// Example of bad side-effects: print messages, mutate external counters `i++`
//
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
func (db *MdbxKV) Batch(fn func(tx kv.RwTx) error) error {
errCh := make(chan error, 1)

db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
// There is no existing batch, or the existing batch is full; start a new one.
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
if len(db.batch.calls) >= db.MaxBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()
}
db.batchMu.Unlock()

err := <-errCh
if errors.Is(err, trySolo) {
err = db.Update(context.Background(), fn)
}
return err
}

func (db *MdbxKV) Path() string { return db.opts.path }
Expand Down
Loading
Loading