Skip to content

Commit

Permalink
Merge pull request ethereum#306 from OffchainLabs/pebble-extra-options
Browse files Browse the repository at this point in the history
expose pebble open options, add more compact metrics
  • Loading branch information
PlasmaPower authored May 23, 2024
2 parents 3ad1488 + 07f6d7a commit dc3b2d2
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 33 deletions.
10 changes: 6 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r

// NewPebbleDBDatabase creates a persistent key-value database without a freezer
// moving immutable chain segments into cold storage.
func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral)
func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool, extraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral, extraOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -399,6 +399,8 @@ type OpenOptions struct {
// Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of
// a crash is not important. This option should typically be used in tests.
Ephemeral bool

PebbleExtraOptions *pebble.ExtraOptions
}

// openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble.
Expand All @@ -420,15 +422,15 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) {
}
if o.Type == dbPebble || existingDb == dbPebble {
log.Info("Using pebble as the backing database")
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions)
}
if o.Type == dbLeveldb || existingDb == dbLeveldb {
log.Info("Using leveldb as the backing database")
return NewLevelDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly)
}
// No pre-existing database, no user-requested one either. Default to Pebble.
log.Info("Defaulting to pebble as the backing database")
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions)
}

// Open opens both a disk-based key-value database such as leveldb or pebble, but also
Expand Down
35 changes: 35 additions & 0 deletions ethdb/pebble/extraoptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pebble

import "time"

type ExtraOptions struct {
BytesPerSync int
L0CompactionFileThreshold int
L0CompactionThreshold int
L0StopWritesThreshold int
LBaseMaxBytes int64
MemTableStopWritesThreshold int
MaxConcurrentCompactions func() int
DisableAutomaticCompactions bool
WALBytesPerSync int
WALDir string
WALMinSyncInterval func() time.Duration
TargetByteDeletionRate int
Experimental ExtraOptionsExperimental
Levels []ExtraLevelOptions
}

type ExtraOptionsExperimental struct {
L0CompactionConcurrency int
CompactionDebtConcurrency uint64
ReadCompactionRate int64
ReadSamplingMultiplier int64
MaxWriterConcurrency int
ForceWriterParallelism bool
}

type ExtraLevelOptions struct {
BlockSize int
IndexBlockSize int
TargetFileSize int64
}
151 changes: 136 additions & 15 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ type Database struct {
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated

compDebtGauge metrics.Gauge
compInProgressGauge metrics.Gauge

commitCountMeter metrics.Meter
commitTotalDurationMeter metrics.Meter
commitSemaphoreWaitMeter metrics.Meter
commitMemTableWriteStallMeter metrics.Meter
commitL0ReadAmpWriteStallMeter metrics.Meter
commitWALRotationMeter metrics.Meter
commitWaitMeter metrics.Meter

commitCount atomic.Int64
commitTotalDuration atomic.Int64
commitSemaphoreWait atomic.Int64
commitMemTableWriteStall atomic.Int64
commitL0ReadAmpWriteStall atomic.Int64
commitWALRotation atomic.Int64
commitWait atomic.Int64

levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels

quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
Expand Down Expand Up @@ -137,7 +156,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {

// New returns a wrapped pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (*Database, error) {
if extraOptions == nil {
extraOptions = &ExtraOptions{}
}
if extraOptions.MemTableStopWritesThreshold <= 0 {
extraOptions.MemTableStopWritesThreshold = 2
}
if extraOptions.MaxConcurrentCompactions == nil {
extraOptions.MaxConcurrentCompactions = func() int { return runtime.NumCPU() }
}
var levels []pebble.LevelOptions
if len(extraOptions.Levels) == 0 {
levels = []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
}
} else {
for _, level := range extraOptions.Levels {
levels = append(levels, pebble.LevelOptions{
BlockSize: level.BlockSize,
IndexBlockSize: level.IndexBlockSize,
TargetFileSize: level.TargetFileSize,
FilterPolicy: bloom.FilterPolicy(10),
})
}
}

// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
Expand All @@ -162,7 +212,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e

// Two memory tables is configured which is identical to leveldb,
// including a frozen memory table and another live one.
memTableLimit := 2
memTableLimit := extraOptions.MemTableStopWritesThreshold
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit

// The memory table size is currently capped at maxMemTableSize-1 due to a
Expand Down Expand Up @@ -200,19 +250,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e

// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },
MaxConcurrentCompactions: extraOptions.MaxConcurrentCompactions,

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
},
// Per-level extraOptions. Options for at least one level must be specified. The
// extraOptions for the last level are used for all subsequent levels.
Levels: levels,
ReadOnly: readonly,
EventListener: &pebble.EventListener{
CompactionBegin: db.onCompactionBegin,
Expand All @@ -221,11 +263,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
WriteStallEnd: db.onWriteStallEnd,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble

BytesPerSync: extraOptions.BytesPerSync,
L0CompactionFileThreshold: extraOptions.L0CompactionFileThreshold,
L0CompactionThreshold: extraOptions.L0CompactionThreshold,
L0StopWritesThreshold: extraOptions.L0StopWritesThreshold,
LBaseMaxBytes: extraOptions.LBaseMaxBytes,
DisableAutomaticCompactions: extraOptions.DisableAutomaticCompactions,
WALBytesPerSync: extraOptions.WALBytesPerSync,
WALDir: extraOptions.WALDir,
WALMinSyncInterval: extraOptions.WALMinSyncInterval,
TargetByteDeletionRate: extraOptions.TargetByteDeletionRate,
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1

if opt.Experimental.ReadSamplingMultiplier != 0 {
opt.Experimental.ReadSamplingMultiplier = extraOptions.Experimental.ReadSamplingMultiplier
}
opt.Experimental.L0CompactionConcurrency = extraOptions.Experimental.L0CompactionConcurrency
opt.Experimental.CompactionDebtConcurrency = extraOptions.Experimental.CompactionDebtConcurrency
opt.Experimental.ReadCompactionRate = extraOptions.Experimental.ReadCompactionRate
opt.Experimental.MaxWriterConcurrency = extraOptions.Experimental.MaxWriterConcurrency
opt.Experimental.ForceWriterParallelism = extraOptions.Experimental.ForceWriterParallelism

// Open the db and recover any potential corruptions
innerDB, err := pebble.Open(file, opt)
if err != nil {
Expand All @@ -247,6 +309,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
db.seekCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/seek", nil)
db.manualMemAllocGauge = metrics.GetOrRegisterGauge(namespace+"memory/manualalloc", nil)

db.compDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/debt", nil)
db.compInProgressGauge = metrics.GetOrRegisterGauge(namespace+"compact/inprogress", nil)

db.commitCountMeter = metrics.GetOrRegisterMeter(namespace+"commit/counter", nil)
db.commitTotalDurationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/total", nil)
db.commitSemaphoreWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/semaphorewait", nil)
db.commitMemTableWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/memtablewritestall", nil)
db.commitL0ReadAmpWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/l0readampwritestall", nil)
db.commitWALRotationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/walrotation", nil)
db.commitWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/commitwait", nil)

// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval, namespace)
return db, nil
Expand Down Expand Up @@ -459,6 +532,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
compReads [2]int64

nWrites [2]int64

commitCounts [2]int64
commitTotalDurations [2]int64
commitSemaphoreWaits [2]int64
commitMemTableWriteStalls [2]int64
commitL0ReadAmpWriteStalls [2]int64
commitWALRotations [2]int64
commitWaits [2]int64
)

// Iterate ad infinitum and collect the stats
Expand All @@ -474,6 +555,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
writeDelayTime = d.writeDelayTime.Load()
nonLevel0CompCount = int64(d.nonLevel0Comp.Load())
level0CompCount = int64(d.level0Comp.Load())

commitCount = d.commitCount.Load()
commitTotalDuration = d.commitTotalDuration.Load()
commitSemaphoreWait = d.commitSemaphoreWait.Load()
commitMemTableWriteStall = d.commitMemTableWriteStall.Load()
commitL0ReadAmpWriteStall = d.commitL0ReadAmpWriteStall.Load()
commitWALRotation = d.commitWALRotation.Load()
commitWait = d.commitWait.Load()
)
writeDelayTimes[i%2] = writeDelayTime
writeDelayCounts[i%2] = writeDelayCount
Expand Down Expand Up @@ -524,6 +613,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(stats.Compact.ReadCount)

commitCounts[i%2] = commitCount
commitTotalDurations[i%2] = commitTotalDuration
commitSemaphoreWaits[i%2] = commitSemaphoreWait
commitMemTableWriteStalls[i%2] = commitMemTableWriteStall
commitL0ReadAmpWriteStalls[i%2] = commitL0ReadAmpWriteStall
commitWALRotations[i%2] = commitWALRotation
commitWaits[i%2] = commitWait

d.commitCountMeter.Mark(commitCounts[i%2] - commitCounts[(i-1)%2])
d.commitTotalDurationMeter.Mark(commitTotalDurations[i%2] - commitTotalDurations[(i-1)%2])
d.commitSemaphoreWaitMeter.Mark(commitSemaphoreWaits[i%2] - commitSemaphoreWaits[(i-1)%2])
d.commitMemTableWriteStallMeter.Mark(commitMemTableWriteStalls[i%2] - commitMemTableWriteStalls[(i-1)%2])
d.commitL0ReadAmpWriteStallMeter.Mark(commitL0ReadAmpWriteStalls[i%2] - commitL0ReadAmpWriteStalls[(i-1)%2])
d.commitWALRotationMeter.Mark(commitWALRotations[i%2] - commitWALRotations[(i-1)%2])
d.commitWaitMeter.Mark(commitWaits[i%2] - commitWaits[(i-1)%2])

d.compDebtGauge.Update(int64(stats.Compact.EstimatedDebt))
d.compInProgressGauge.Update(stats.Compact.NumInProgress)

for i, level := range stats.Levels {
// Append metrics for additional layers
if i >= len(d.levelsGauge) {
Expand Down Expand Up @@ -578,7 +686,20 @@ func (b *batch) Write() error {
if b.db.closed {
return pebble.ErrClosed
}
return b.b.Commit(b.db.writeOptions)
err := b.b.Commit(b.db.writeOptions)
if err != nil {
return err
}
stats := b.b.CommitStats()
b.db.commitCount.Add(1)
b.db.commitTotalDuration.Add(int64(stats.TotalDuration))
b.db.commitSemaphoreWait.Add(int64(stats.SemaphoreWaitDuration))
b.db.commitMemTableWriteStall.Add(int64(stats.MemTableWriteStallDuration))
b.db.commitL0ReadAmpWriteStall.Add(int64(stats.L0ReadAmpWriteStallDuration))
b.db.commitWALRotation.Add(int64(stats.WALRotationDuration))
b.db.commitWait.Add(int64(stats.CommitWaitDuration))
// TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0)
return nil
}

// Reset resets the batch for reuse.
Expand Down
2 changes: 1 addition & 1 deletion ethdb/pebble/pebble_non64bit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
)

func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (ethdb.Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (ethdb.Database, error) {
return nil, errors.New("pebble is not supported on this platform")
}
37 changes: 24 additions & 13 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -746,6 +747,10 @@ func (n *Node) EventMux() *event.TypeMux {
// previous can be found) from within the node's instance directory. If the node is
// ephemeral, a memory database is returned.
func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) {
return n.OpenDatabaseWithExtraOptions(name, cache, handles, namespace, readonly, nil)
}

func (n *Node) OpenDatabaseWithExtraOptions(name string, cache, handles int, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -758,12 +763,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
db = rawdb.NewMemoryDatabase()
} else {
db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
PebbleExtraOptions: pebbleExtraOptions,
})
}

Expand All @@ -779,6 +785,10 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
// database to immutable append-only files. If the node is an ephemeral one, a
// memory database is returned.
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) {
return n.OpenDatabaseWithFreezerWithExtraOptions(name, cache, handles, ancient, namespace, readonly, nil)
}

func (n *Node) OpenDatabaseWithFreezerWithExtraOptions(name string, cache, handles int, ancient string, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -790,13 +800,14 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient
db = rawdb.NewMemoryDatabase()
} else {
db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
AncientsDirectory: n.ResolveAncient(name, ancient),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
AncientsDirectory: n.ResolveAncient(name, ancient),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
PebbleExtraOptions: pebbleExtraOptions,
})
}
if err == nil {
Expand Down

0 comments on commit dc3b2d2

Please sign in to comment.