Skip to content

Commit

Permalink
swarm/metrics: Send the accounting registry to InfluxDB (#18470)
Browse files Browse the repository at this point in the history
  • Loading branch information
JerzyLa authored and nonsense committed Jan 24, 2019
1 parent 2abeb35 commit f28da4f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 43 deletions.
5 changes: 3 additions & 2 deletions metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,9 @@ func (r *PrefixedRegistry) UnregisterAll() {
}

var (
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
AccountingRegistry = NewRegistry() // registry used in swarm
)

// Call the given function for each registered metric.
Expand Down
35 changes: 11 additions & 24 deletions p2p/protocols/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ var (
// All metrics are cumulative

// total amount of units credited
mBalanceCredit metrics.Counter
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", metrics.AccountingRegistry)
// total amount of units debited
mBalanceDebit metrics.Counter
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", metrics.AccountingRegistry)
// total amount of bytes credited
mBytesCredit metrics.Counter
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", metrics.AccountingRegistry)
// total amount of bytes debited
mBytesDebit metrics.Counter
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", metrics.AccountingRegistry)
// total amount of credited messages
mMsgCredit metrics.Counter
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", metrics.AccountingRegistry)
// total amount of debited messages
mMsgDebit metrics.Counter
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", metrics.AccountingRegistry)
// how many times local node had to drop remote peers
mPeerDrops metrics.Counter
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", metrics.AccountingRegistry)
// how many times local node overdrafted and dropped
mSelfDrops metrics.Counter

MetricsRegistry metrics.Registry
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", metrics.AccountingRegistry)
)

// Prices defines how prices are being passed on to the accounting instance
Expand Down Expand Up @@ -110,24 +108,13 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah
}

// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
// SetupAccountingMetrics uses a separate registry for p2p accounting metrics;
// this registry should be independent of any other metrics as it persists at different endpoints.
// It also instantiates the given metrics and starts the persisting go-routine which
// It also starts the persisting go-routine which
// at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
// create an empty registry
MetricsRegistry = metrics.NewRegistry()
// instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
// create the DB and start persisting
return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
return NewAccountingMetrics(metrics.AccountingRegistry, reportInterval, path)
}

// Send takes a peer, a size and a msg and
Expand Down
28 changes: 17 additions & 11 deletions p2p/protocols/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,41 @@ func TestReporter(t *testing.T) {
metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
log.Debug("Done.")

//do some metrics
//change metrics
mBalanceCredit.Inc(12)
mBytesCredit.Inc(34)
mMsgDebit.Inc(9)

//store expected metrics
expectedBalanceCredit := mBalanceCredit.Count()
expectedBytesCredit := mBytesCredit.Count()
expectedMsgDebit := mMsgDebit.Count()

//give the reporter time to write the metrics to DB
time.Sleep(20 * time.Millisecond)

//set the metrics to nil - this effectively simulates the node having shut down...
mBalanceCredit = nil
mBytesCredit = nil
mMsgDebit = nil
//close the DB also, or we can't create a new one
metrics.Close()

//clear the metrics - this effectively simulates the node having shut down...
mBalanceCredit.Clear()
mBytesCredit.Clear()
mMsgDebit.Clear()

//setup the metrics again
log.Debug("Setting up metrics second time")
metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
defer metrics.Close()
log.Debug("Done.")

//now check the metrics, they should have the same value as before "shutdown"
if mBalanceCredit.Count() != 12 {
t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count())
if mBalanceCredit.Count() != expectedBalanceCredit {
t.Fatalf("Expected counter to be %d, but is %d", expectedBalanceCredit, mBalanceCredit.Count())
}
if mBytesCredit.Count() != 34 {
t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count())
if mBytesCredit.Count() != expectedBytesCredit {
t.Fatalf("Expected counter to be %d, but is %d", expectedBytesCredit, mBytesCredit.Count())
}
if mMsgDebit.Count() != 9 {
t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count())
if mMsgDebit.Count() != expectedMsgDebit {
t.Fatalf("Expected counter to be %d, but is %d", expectedMsgDebit, mMsgDebit.Count())
}
}
25 changes: 19 additions & 6 deletions swarm/metrics/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ var (
Name: "metrics.influxdb.export",
Usage: "Enable metrics export/push to an external InfluxDB database",
}
MetricsEnableInfluxDBAccountingExportFlag = cli.BoolFlag{
Name: "metrics.influxdb.accounting",
Usage: "Enable accounting metrics export/push to an external InfluxDB database",
}
MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint",
Usage: "Metrics InfluxDB endpoint",
Expand Down Expand Up @@ -66,6 +70,7 @@ var (
var Flags = []cli.Flag{
utils.MetricsEnabledFlag,
MetricsEnableInfluxDBExportFlag,
MetricsEnableInfluxDBAccountingExportFlag,
MetricsInfluxDBEndpointFlag,
MetricsInfluxDBDatabaseFlag,
MetricsInfluxDBUsernameFlag,
Expand All @@ -77,12 +82,13 @@ func Setup(ctx *cli.Context) {
if gethmetrics.Enabled {
log.Info("Enabling swarm metrics collection")
var (
enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
enableAccountingExport = ctx.GlobalBool(MetricsEnableInfluxDBAccountingExportFlag.Name)
endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
)

// Start system runtime metrics collection
Expand All @@ -94,5 +100,12 @@ func Setup(ctx *cli.Context) {
"host": hosttag,
})
}

if enableAccountingExport {
log.Info("Exporting accounting metrics to InfluxDB")
go influxdb.InfluxDBWithTags(gethmetrics.AccountingRegistry, 10*time.Second, endpoint, database, username, password, "accounting.", map[string]string{
"host": hosttag,
})
}
}
}

0 comments on commit f28da4f

Please sign in to comment.