Skip to content

Commit

Permalink
Swarm rather stable: LocalStore metrics (ethereum#1349)
Browse files Browse the repository at this point in the history
* swarm/shed: remove metrics fields from DB struct

* swarm/schunk: add String methods to modes

* swarm/storage/localstore: add metrics and traces

* swarm/chunk: unknown modes without spaces in String methods

* swarm/storage/localstore: remove bin number from pull subscription metrics

* swarm/storage/localstore: add resetting time metrics and code improvements
  • Loading branch information
janos authored and acud committed Apr 25, 2019
1 parent 183c55c commit 7ef4555
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 47 deletions.
39 changes: 39 additions & 0 deletions swarm/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) {
// ModeGet enumerates different Getter modes.
type ModeGet int

func (m ModeGet) String() string {
switch m {
case ModeGetRequest:
return "Request"
case ModeGetSync:
return "Sync"
case ModeGetLookup:
return "Lookup"
default:
return "Unknown"
}
}

// Getter modes.
const (
// ModeGetRequest: when accessed for retrieval
Expand All @@ -125,6 +138,19 @@ const (
// ModePut enumerates different Putter modes.
type ModePut int

func (m ModePut) String() string {
switch m {
case ModePutRequest:
return "Request"
case ModePutSync:
return "Sync"
case ModePutUpload:
return "Upload"
default:
return "Unknown"
}
}

// Putter modes.
const (
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
Expand All @@ -138,6 +164,19 @@ const (
// ModeSet enumerates different Setter modes.
type ModeSet int

func (m ModeSet) String() string {
switch m {
case ModeSetAccess:
return "Access"
case ModeSetSync:
return "Sync"
case ModeSetRemove:
return "Remove"
default:
return "Unknown"
}
}

// Setter modes.
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
Expand Down
71 changes: 31 additions & 40 deletions swarm/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,7 @@ const (
// It provides a schema functionality to store fields and indexes
// information about naming and types.
type DB struct {
ldb *leveldb.DB

compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written

ldb *leveldb.DB
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
}

Expand Down Expand Up @@ -86,13 +77,10 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) {
}
}

// Configure meters for DB
db.configure(metricsPrefix)

// Create a quit channel for the periodic metrics collector and run it
db.quit = make(chan struct{})

go db.meter(10 * time.Second)
go db.meter(metricsPrefix, 10*time.Second)

return db, nil
}
Expand Down Expand Up @@ -169,19 +157,22 @@ func (db *DB) Close() (err error) {
return db.ldb.Close()
}

// Configure configures the database metrics collectors
func (db *DB) configure(prefix string) {
// Initialize all the metrics collector at the requested prefix
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
}
func (db *DB) meter(prefix string, refresh time.Duration) {
// Meter for measuring the total time spent in database compaction
compTimeMeter := metrics.NewRegisteredMeter(prefix+"compact/time", nil)
// Meter for measuring the data read during compaction
compReadMeter := metrics.NewRegisteredMeter(prefix+"compact/input", nil)
// Meter for measuring the data written during compaction
compWriteMeter := metrics.NewRegisteredMeter(prefix+"compact/output", nil)
// Meter for measuring the write delay number due to database compaction
writeDelayMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
// Meter for measuring the write delay duration due to database compaction
writeDelayNMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
// Meter for measuring the effective amount of data read
diskReadMeter := metrics.NewRegisteredMeter(prefix+"disk/read", nil)
// Meter for measuring the effective amount of data written
diskWriteMeter := metrics.NewRegisteredMeter(prefix+"disk/write", nil)

func (db *DB) meter(refresh time.Duration) {
// Create the counters to store current and previous compaction values
compactions := make([][]float64, 2)
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -234,14 +225,14 @@ func (db *DB) meter(refresh time.Duration) {
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
if compTimeMeter != nil {
compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
if compReadMeter != nil {
compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
if compWriteMeter != nil {
compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
}

// Retrieve the write delay statistic
Expand All @@ -265,11 +256,11 @@ func (db *DB) meter(refresh time.Duration) {
log.Error("Failed to parse delay duration", "err", err)
continue
}
if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(delayN - delaystats[0])
if writeDelayNMeter != nil {
writeDelayNMeter.Mark(delayN - delaystats[0])
}
if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
if writeDelayMeter != nil {
writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
}
// If a warning that db is performing compaction has been displayed, any subsequent
// warnings will be withheld for one minute not to overwhelm the user.
Expand Down Expand Up @@ -300,11 +291,11 @@ func (db *DB) meter(refresh time.Duration) {
log.Error("Bad syntax of write entry", "entry", parts[1])
continue
}
if db.diskReadMeter != nil {
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
if diskReadMeter != nil {
diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
}
if db.diskWriteMeter != nil {
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
if diskWriteMeter != nil {
diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
}
iostats[0], iostats[1] = nRead, nWrite

Expand Down
12 changes: 12 additions & 0 deletions swarm/storage/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package localstore

import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
Expand Down Expand Up @@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
metricName := "localstore.gc"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
}()

batch := new(leveldb.Batch)
target := db.gcTarget()

Expand Down
10 changes: 10 additions & 0 deletions swarm/storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
Expand Down Expand Up @@ -388,3 +389,12 @@ func init() {
return time.Now().UTC().UnixNano()
}
}

// totalTimeMetric logs a message about time between provided start time
// and the time when the function is called and sends a resetting timer metric
// with provided name appended with ".total-time".
func totalTimeMetric(name string, start time.Time) {
totalTime := time.Since(start)
log.Trace(name+" total time", "time", totalTime)
metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
}
28 changes: 27 additions & 1 deletion swarm/storage/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)

Expand All @@ -30,7 +35,22 @@ import (
// All required indexes will be updated required by the
// Getter Mode. Get is required to implement chunk.Store
// interface.
func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore.Get.%s", mode)

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-get", mode.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

defer func() {
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf(metricName+".error", mode), nil).Inc(1)
}
}()

out, err := db.get(mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
Expand Down Expand Up @@ -66,8 +86,14 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
// for a new goroutine
defer func() { <-db.updateGCSem }()
}

metricName := "localstore.updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

err := db.updateGC(out)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
Expand Down
21 changes: 19 additions & 2 deletions swarm/storage/localstore/mode_has.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,28 @@ package localstore

import (
"context"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
)

// Has returns true if the chunk is stored in database.
func (db *DB) Has(_ context.Context, addr chunk.Address) (bool, error) {
return db.retrievalDataIndex.Has(addressToItem(addr))
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
metricName := "localstore.Has"

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", addr.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

has, err := db.retrievalDataIndex.Has(addressToItem(addr))
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return has, err
}
22 changes: 20 additions & 2 deletions swarm/storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,36 @@ package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)

// Put stores the Chunk to database and depending
// on the Putter mode, it updates required indexes.
// Put is required to implement chunk.Store
// interface.
func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
return db.put(mode, chunkToItem(ch))
func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
metricName := fmt.Sprintf("localstore.Put.%s", mode)

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", ch.Address().String()), olog.String("mode-put", mode.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

exists, err = db.put(mode, chunkToItem(ch))
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return exists, err
}

// put stores Item to database and updates other
Expand Down
22 changes: 20 additions & 2 deletions swarm/storage/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,35 @@ package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)

// Set updates database indexes for a specific
// chunk represented by the address.
// Set is required to implement chunk.Store
// interface.
func (db *DB) Set(_ context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
return db.set(mode, addr)
func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
metricName := fmt.Sprintf("localstore.Set.%s", mode)

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-set", mode.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

err = db.set(mode, addr)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return err
}

// set updates database indexes for a specific
Expand Down
Loading

0 comments on commit 7ef4555

Please sign in to comment.