From 04705e90f9c907a09c727714c89f9ec4e413a312 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:16 -0500 Subject: [PATCH 1/9] Add new usage memdb table that tracks usage counts of various elements We update the usage table on Commit() by using the TrackedChanges() API of memdb. Track memdb changes on restore so that usage data can be compiled --- agent/consul/fsm/snapshot_oss_test.go | 6 + agent/consul/state/memdb.go | 30 +++-- agent/consul/state/usage.go | 170 ++++++++++++++++++++++++++ agent/consul/state/usage_oss.go | 33 +++++ agent/consul/state/usage_oss_test.go | 25 ++++ agent/consul/state/usage_test.go | 133 ++++++++++++++++++++ 6 files changed, 387 insertions(+), 10 deletions(-) create mode 100644 agent/consul/state/usage.go create mode 100644 agent/consul/state/usage_oss.go create mode 100644 agent/consul/state/usage_oss_test.go create mode 100644 agent/consul/state/usage_test.go diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index e845c41c91af..f798a0efaa29 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, err) require.Equal(t, fedState2, fedStateLoaded2) + // Verify usage data is correctly updated + idx, nodeCount, err := fsm2.state.NodeCount() + require.NoError(t, err) + require.Equal(t, len(nodes), nodeCount) + require.NotZero(t, idx) + // Snapshot snap, err = fsm2.Snapshot() require.NoError(t, err) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index be4f4348e25e..5cdfd19dd1b8 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -89,17 +89,20 @@ func (c *changeTrackerDB) publish(changes Changes) error { return nil } -// WriteTxnRestore returns a wrapped RW transaction that does NOT have change -// tracking enabled. This should only be used in Restore where we need to -// replace the entire contents of the Store without a need to track the changes. -// WriteTxnRestore uses a zero index since the whole restore doesn't really occur -// at one index - the effect is to write many values that were previously +// WriteTxnRestore returns a wrapped RW transaction that should only be used in +// Restore where we need to replace the entire contents of the Store. +// WriteTxnRestore uses a zero index since the whole restore doesn't really +// occur at one index - the effect is to write many values that were previously // written across many indexes. func (c *changeTrackerDB) WriteTxnRestore() *txn { - return &txn{ + t := &txn{ Txn: c.db.Txn(true), Index: 0, } + + // We enable change tracking so that usage data is correctly populated. + t.Txn.TrackChanges() + return t } // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. @@ -125,14 +128,21 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { + changes := Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + + if len(changes.Changes) > 0 { + if err := updateUsage(tx, changes); err != nil { + return err + } + } + // publish may be nil if this is a read-only or WriteTxnRestore transaction. // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - changes := Changes{ - Index: tx.Index, - Changes: tx.Txn.Changes(), - } if err := tx.publish(changes); err != nil { return err } diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go new file mode 100644 index 000000000000..397e157f37cf --- /dev/null +++ b/agent/consul/state/usage.go @@ -0,0 +1,170 @@ +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" +) + +// usageTableSchema returns a new table schema used for tracking various indexes +// for the Raft log. +func usageTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "usage", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", + Lowercase: true, + }, + }, + }, + } +} + +func init() { + registerSchema(usageTableSchema) +} + +type UsageEntry struct { + ID string + Index uint64 + Count int +} + +// updateUsage takes a set of memdb changes and computes a delta for specific +// usage metrics that we track. +func updateUsage(tx *txn, changes Changes) error { + usageDeltas := make(map[string]int) + for _, change := range changes.Changes { + var delta int + if change.Created() { + delta = 1 + } else if change.Deleted() { + delta = -1 + } + switch change.Table { + case "nodes": + usageDeltas[change.Table] += delta + case "services": + usageDeltas[change.Table] += delta + } + + addEnterpriseUsage(usageDeltas, change) + } + + idx := changes.Index + // This will happen when restoring from a snapshot, just take the max index + // of the tables we are tracking. + if idx == 0 { + idx = maxIndexTxn(tx, "nodes", "services") + } + + for id, delta := range usageDeltas { + u, err := tx.First("usage", "id", id) + if err != nil { + return fmt.Errorf("failed to retrieve existing usage entry: %s", err) + } + + if u == nil { + if delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } else if cur, ok := u.(*UsageEntry); ok { + if cur.Count+delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: cur.Count + delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } + } + return nil +} + +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +// NodeCount returns the latest seen Raft index, a count of the number of nodes +// registered, and any errors. +func (s *Store) NodeCount() (uint64, int, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + usage, err := tx.First("usage", "id", "nodes") + if err != nil { + return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) + } + + // If no nodes have been registered, the usage entry will not exist. + if usage == nil { + return 0, 0, nil + } + + nodeUsage, ok := usage.(*UsageEntry) + if !ok { + return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage) + } + + return nodeUsage.Index, nodeUsage.Count, nil +} + +// ServiceUsage returns the latest seen Raft index, a compiled set of service +// usage data, and any errors. +func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + usage, err := firstUsageEntry(tx, "services") + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + results, err := s.compileServiceUsage(tx, usage.Count) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + return usage.Index, results, nil +} + +func firstUsageEntry(tx *txn, id string) (*UsageEntry, error) { + usage, err := tx.First("usage", "id", id) + if err != nil { + return nil, err + } + + // If no elements have been inserted, the usage entry will not exist. We + // return a valid value so that can be certain the return value is not nil + // when no error has occurred. + if usage == nil { + return &UsageEntry{ID: id, Count: 0}, nil + } + + realUsage, ok := usage.(*UsageEntry) + if !ok { + return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage) + } + + return realUsage, nil +} diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go new file mode 100644 index 000000000000..ec54313d5690 --- /dev/null +++ b/agent/consul/state/usage_oss.go @@ -0,0 +1,33 @@ +// +build !consulent + +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" +) + +type EnterpriseServiceUsage struct{} + +func addEnterpriseUsage(map[string]int, memdb.Change) {} + +func (s *Store) compileServiceUsage(tx *txn, totalInstances int) (ServiceUsage, error) { + var totalServices int + results, err := tx.Get( + "index", + "id_prefix", + serviceIndexName("", nil), + ) + if err != nil { + return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err) + } + for i := results.Next(); i != nil; i = results.Next() { + totalServices += 1 + } + + return ServiceUsage{ + Services: totalServices, + ServiceInstances: totalInstances, + }, nil +} diff --git a/agent/consul/state/usage_oss_test.go b/agent/consul/state/usage_oss_test.go new file mode 100644 index 000000000000..b441c7163529 --- /dev/null +++ b/agent/consul/state/usage_oss_test.go @@ -0,0 +1,25 @@ +// +build !consulent + +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_ServiceUsage(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + testRegisterService(t, s, 8, "node1", "service1") + testRegisterService(t, s, 9, "node2", "service1") + testRegisterService(t, s, 10, "node2", "service2") + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(10)) + require.Equal(t, 2, usage.Services) + require.Equal(t, 3, usage.ServiceInstances) +} diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go new file mode 100644 index 000000000000..a1c07f6546ee --- /dev/null +++ b/agent/consul/state/usage_test.go @@ -0,0 +1,133 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_NodeCount(t *testing.T) { + s := testStateStore(t) + + // No nodes have been registered, and thus no usage entry exists + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, count, 0) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) +} + +func TestStateStore_Usage_NodeCount_Delete(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) + + require.NoError(t, s.DeleteNode(2, "node2")) + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { + s := testStateStore(t) + + // No services have been registered, and thus no usage entry exists + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, usage.Services, 0) + require.Equal(t, usage.ServiceInstances, 0) +} + +func TestStateStore_Usage_Restore(t *testing.T) { + s := testStateStore(t) + restore := s.Restore() + restore.Registration(9, &structs.RegisterRequest{ + Node: "test-node", + Service: &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Port: 8080, + Address: "198.18.0.2", + }, + }) + require.NoError(t, restore.Commit()) + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(9)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { + s := testStateStore(t) + txn := s.db.WriteTxn(1) + + // A single delete change will cause a negative count + changes := Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err := updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") + + // A insert a change to create a usage entry + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: nil, + After: &structs.Node{}, + }, + }, + } + + err = updateUsage(txn, changes) + require.NoError(t, err) + + // Two deletes will cause a negative count now + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err = updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") +} From 3feae7f77b805653a8853dcfdbcfed20011d656e Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:17 -0500 Subject: [PATCH 2/9] Report node/service usage metrics from every server Using the newly provided state store methods, we periodically emit usage metrics from the servers. We decided to emit these metrics from all servers, not just the leader, because that means we do not have to care about leader election flapping causing metrics turbulence, and it seems reasonable for each server to emit its own view of the state, even if they should always converge rapidly. --- agent/consul/config.go | 16 ++- agent/consul/server.go | 14 ++ agent/consul/usagemetrics/usagemetrics.go | 135 ++++++++++++++++++ agent/consul/usagemetrics/usagemetrics_oss.go | 7 + .../usagemetrics/usagemetrics_oss_test.go | 9 ++ .../consul/usagemetrics/usagemetrics_test.go | 128 +++++++++++++++++ logging/names.go | 1 + 7 files changed, 305 insertions(+), 5 deletions(-) create mode 100644 agent/consul/usagemetrics/usagemetrics.go create mode 100644 agent/consul/usagemetrics/usagemetrics_oss.go create mode 100644 agent/consul/usagemetrics/usagemetrics_oss_test.go create mode 100644 agent/consul/usagemetrics/usagemetrics_test.go diff --git a/agent/consul/config.go b/agent/consul/config.go index a48effe4414a..955fb49d6613 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -443,6 +443,10 @@ type Config struct { // dead servers. AutopilotInterval time.Duration + // MetricsReportingInterval is the frequency with which the server will + // report usage metrics to the configured go-metrics Sinks. + MetricsReportingInterval time.Duration + // ConnectEnabled is whether to enable Connect features such as the CA. ConnectEnabled bool @@ -589,11 +593,13 @@ func DefaultConfig() *Config { }, }, - ServerHealthInterval: 2 * time.Second, - AutopilotInterval: 10 * time.Second, - DefaultQueryTime: 300 * time.Second, - MaxQueryTime: 600 * time.Second, - EnterpriseConfig: DefaultEnterpriseConfig(), + ServerHealthInterval: 2 * time.Second, + AutopilotInterval: 10 * time.Second, + MetricsReportingInterval: 10 * time.Second, + DefaultQueryTime: 300 * time.Second, + MaxQueryTime: 600 * time.Second, + + EnterpriseConfig: DefaultEnterpriseConfig(), } // Increase our reap interval to 3 days instead of 24h. diff --git a/agent/consul/server.go b/agent/consul/server.go index 04d3b61bd380..c1c1a6d76e28 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -589,6 +590,19 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { return nil, err } + reporter, err := usagemetrics.NewUsageMetricsReporter( + new(usagemetrics.Config). + WithStateProvider(s.fsm). + WithLogger(s.logger). + WithDatacenter(s.config.Datacenter). + WithReportingInterval(s.config.MetricsReportingInterval), + ) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start usage metrics reporter: %v", err) + } + go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) diff --git a/agent/consul/usagemetrics/usagemetrics.go b/agent/consul/usagemetrics/usagemetrics.go new file mode 100644 index 000000000000..18b36cfd69a4 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics.go @@ -0,0 +1,135 @@ +package usagemetrics + +import ( + "context" + "errors" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/logging" + "github.com/hashicorp/go-hclog" +) + +// Config holds the settings for various parameters for the +// UsageMetricsReporter +type Config struct { + logger hclog.Logger + metricLabels []metrics.Label + stateProvider StateProvider + tickerInterval time.Duration +} + +// WithDatacenter adds the datacenter as a label to all metrics emitted by the +// UsageMetricsReporter +func (c *Config) WithDatacenter(dc string) *Config { + c.metricLabels = append(c.metricLabels, metrics.Label{Name: "datacenter", Value: dc}) + return c +} + +// WithLogger takes a logger and creates a new, named sub-logger to use when +// running +func (c *Config) WithLogger(logger hclog.Logger) *Config { + c.logger = logger.Named(logging.UsageMetrics) + return c +} + +// WithReportingInterval specifies the interval on which UsageMetricsReporter +// should emit metrics +func (c *Config) WithReportingInterval(dur time.Duration) *Config { + c.tickerInterval = dur + return c +} + +func (c *Config) WithStateProvider(sp StateProvider) *Config { + c.stateProvider = sp + return c +} + +// StateProvider defines an inteface for retrieving a state.Store handle. In +// non-test code, this is satisfied by the fsm.FSM struct. +type StateProvider interface { + State() *state.Store +} + +// UsageMetricsReporter provides functionality for emitting usage metrics into +// the metrics stream. This makes it essentially a translation layer +// between the state store and metrics stream. +type UsageMetricsReporter struct { + logger hclog.Logger + metricLabels []metrics.Label + stateProvider StateProvider + tickerInterval time.Duration +} + +func NewUsageMetricsReporter(cfg *Config) (*UsageMetricsReporter, error) { + if cfg.stateProvider == nil { + return nil, errors.New("must provide a StateProvider to usage reporter") + } + + if cfg.logger == nil { + cfg.logger = hclog.NewNullLogger() + } + + if cfg.tickerInterval == 0 { + // Metrics are aggregated every 10 seconds, so we default to that. + cfg.tickerInterval = 10 * time.Second + } + + u := &UsageMetricsReporter{ + logger: cfg.logger, + stateProvider: cfg.stateProvider, + metricLabels: cfg.metricLabels, + tickerInterval: cfg.tickerInterval, + } + + return u, nil +} + +// Run must be run in a goroutine, and can be stopped by closing or sending +// data to the passed in shutdownCh +func (u *UsageMetricsReporter) Run(ctx context.Context) { + ticker := time.NewTicker(u.tickerInterval) + for { + select { + case <-ctx.Done(): + u.logger.Debug("usage metrics reporter shutting down") + ticker.Stop() + return + case <-ticker.C: + u.runOnce() + } + } +} + +func (u *UsageMetricsReporter) runOnce() { + state := u.stateProvider.State() + _, nodes, err := state.NodeCount() + if err != nil { + u.logger.Warn("failed to retrieve nodes from state store", "error", err) + } + metrics.SetGaugeWithLabels( + []string{"consul", "state", "nodes"}, + float32(nodes), + u.metricLabels, + ) + + _, serviceUsage, err := state.ServiceUsage() + if err != nil { + u.logger.Warn("failed to retrieve services from state store", "error", err) + } + + metrics.SetGaugeWithLabels( + []string{"consul", "state", "services"}, + float32(serviceUsage.Services), + u.metricLabels, + ) + + metrics.SetGaugeWithLabels( + []string{"consul", "state", "service_instances"}, + float32(serviceUsage.ServiceInstances), + u.metricLabels, + ) + + u.emitEnterpriseUsage(serviceUsage) +} diff --git a/agent/consul/usagemetrics/usagemetrics_oss.go b/agent/consul/usagemetrics/usagemetrics_oss.go new file mode 100644 index 000000000000..37d71b83f817 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_oss.go @@ -0,0 +1,7 @@ +// +build !consulent + +package usagemetrics + +import "github.com/hashicorp/consul/agent/consul/state" + +func (u *UsageMetricsReporter) emitEnterpriseUsage(state.ServiceUsage) {} diff --git a/agent/consul/usagemetrics/usagemetrics_oss_test.go b/agent/consul/usagemetrics/usagemetrics_oss_test.go new file mode 100644 index 000000000000..3d5263c0b2c6 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_oss_test.go @@ -0,0 +1,9 @@ +// +build !consulent + +package usagemetrics + +import "github.com/hashicorp/consul/agent/consul/state" + +func newStateStore() (*state.Store, error) { + return state.NewStateStore(nil) +} diff --git a/agent/consul/usagemetrics/usagemetrics_test.go b/agent/consul/usagemetrics/usagemetrics_test.go new file mode 100644 index 000000000000..c293cbb1de2e --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_test.go @@ -0,0 +1,128 @@ +package usagemetrics + +import ( + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockStateProvider struct { + mock.Mock +} + +func (m *mockStateProvider) State() *state.Store { + retValues := m.Called() + return retValues.Get(0).(*state.Store) +} + +func TestUsageReporter_Run(t *testing.T) { + type testCase struct { + modfiyStateStore func(t *testing.T, s *state.Store) + expectedGauges map[string]metrics.GaugeValue + } + cases := map[string]testCase{ + "empty-state": { + expectedGauges: map[string]metrics.GaugeValue{ + "consul.usage.test.consul.state.nodes;datacenter=dc1": { + Name: "consul.usage.test.consul.state.nodes", + Value: 0, + Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, + }, + "consul.usage.test.consul.state.services;datacenter=dc1": { + Name: "consul.usage.test.consul.state.services", + Value: 0, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + "consul.usage.test.consul.state.service_instances;datacenter=dc1": { + Name: "consul.usage.test.consul.state.service_instances", + Value: 0, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + }, + }, + "nodes-and-services": { + modfiyStateStore: func(t *testing.T, s *state.Store) { + require.Nil(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + require.Nil(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"})) + require.Nil(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"})) + + // Typical services and some consul services spread across two nodes + require.Nil(t, s.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) + require.Nil(t, s.EnsureService(5, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + require.Nil(t, s.EnsureService(6, "foo", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + require.Nil(t, s.EnsureService(7, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + }, + expectedGauges: map[string]metrics.GaugeValue{ + "consul.usage.test.consul.state.nodes;datacenter=dc1": { + Name: "consul.usage.test.consul.state.nodes", + Value: 3, + Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, + }, + "consul.usage.test.consul.state.services;datacenter=dc1": { + Name: "consul.usage.test.consul.state.services", + Value: 3, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + "consul.usage.test.consul.state.service_instances;datacenter=dc1": { + Name: "consul.usage.test.consul.state.service_instances", + Value: 4, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + }, + }, + } + + for name, tcase := range cases { + t.Run(name, func(t *testing.T) { + // Only have a single interval for the test + sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) + cfg := metrics.DefaultConfig("consul.usage.test") + cfg.EnableHostname = false + metrics.NewGlobal(cfg, sink) + + mockStateProvider := &mockStateProvider{} + s, err := newStateStore() + require.NoError(t, err) + if tcase.modfiyStateStore != nil { + tcase.modfiyStateStore(t, s) + } + mockStateProvider.On("State").Return(s) + + reporter, err := NewUsageMetricsReporter( + new(Config). + WithStateProvider(mockStateProvider). + WithLogger(testutil.Logger(t)). + WithDatacenter("dc1"), + ) + require.NoError(t, err) + + reporter.runOnce() + + intervals := sink.Data() + require.Len(t, intervals, 1) + intv := intervals[0] + + // Range over the expected values instead of just doing an Equal + // comparison on the maps because of different metrics emitted between + // OSS and Ent. The enterprise tests have a full equality comparison on + // the maps. + for key, expected := range tcase.expectedGauges { + require.Equal(t, expected, intv.Gauges[key]) + } + }) + } +} diff --git a/logging/names.go b/logging/names.go index 6ade11bf691c..02c0fbf69fa6 100644 --- a/logging/names.go +++ b/logging/names.go @@ -51,6 +51,7 @@ const ( TerminatingGateway string = "terminating_gateway" TLSUtil string = "tlsutil" Transaction string = "txn" + UsageMetrics string = "usage_metrics" WAN string = "wan" Watch string = "watch" ) From 69dbc926ad592fa362576868d5223fdf43dbf940 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:19 -0500 Subject: [PATCH 3/9] Add WriteTxn interface and convert more functions to ReadTxn We add a WriteTxn interface for use in updating the usage memdb table, with the forward-looking prospect of incrementally converting other functions to accept interfaces. As well, we use the ReadTxn in new usage code, and as a side effect convert a couple of existing functions to use that interface as well. --- agent/consul/state/memdb.go | 7 +++++++ agent/consul/state/usage.go | 6 +++--- agent/consul/state/usage_oss.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 5cdfd19dd1b8..3b5d5e696eb1 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -15,6 +15,13 @@ type ReadTxn interface { Abort() } +// WriteTxn is implemented by memdb.Txn to perform write operations. +type WriteTxn interface { + ReadTxn + Insert(table string, obj interface{}) error + Commit() error +} + // Changes wraps a memdb.Changes to include the index at which these changes // were made. type Changes struct { diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 397e157f37cf..8b226b3e19bd 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -37,7 +37,7 @@ type UsageEntry struct { // updateUsage takes a set of memdb changes and computes a delta for specific // usage metrics that we track. -func updateUsage(tx *txn, changes Changes) error { +func updateUsage(tx WriteTxn, changes Changes) error { usageDeltas := make(map[string]int) for _, change := range changes.Changes { var delta int @@ -140,7 +140,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - results, err := s.compileServiceUsage(tx, usage.Count) + results, err := compileServiceUsage(tx, usage.Count) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } @@ -148,7 +148,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { return usage.Index, results, nil } -func firstUsageEntry(tx *txn, id string) (*UsageEntry, error) { +func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { usage, err := tx.First("usage", "id", id) if err != nil { return nil, err diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index ec54313d5690..825b80494838 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -12,7 +12,7 @@ type EnterpriseServiceUsage struct{} func addEnterpriseUsage(map[string]int, memdb.Change) {} -func (s *Store) compileServiceUsage(tx *txn, totalInstances int) (ServiceUsage, error) { +func compileServiceUsage(tx ReadTxn, totalInstances int) (ServiceUsage, error) { var totalServices int results, err := tx.Get( "index", From 086a8ea8eb2069966e2b9d6fbb19c3cd39e5ea48 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:20 -0500 Subject: [PATCH 4/9] Use ReadTxn interface in state store helper functions --- agent/consul/state/operations_oss.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/agent/consul/state/operations_oss.go b/agent/consul/state/operations_oss.go index 4c382694b9ec..30deb706832f 100644 --- a/agent/consul/state/operations_oss.go +++ b/agent/consul/state/operations_oss.go @@ -7,30 +7,30 @@ import ( "github.com/hashicorp/go-memdb" ) -func firstWithTxn(tx *txn, +func firstWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) { return tx.First(table, index, idxVal) } -func firstWatchWithTxn(tx *txn, +func firstWatchWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { return tx.FirstWatch(table, index, idxVal) } -func firstWatchCompoundWithTxn(tx *txn, +func firstWatchCompoundWithTxn(tx ReadTxn, table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) { return tx.FirstWatch(table, index, idxVals...) } -func getWithTxn(tx *txn, +func getWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get(table, index, idxVal) } -func getCompoundWithTxn(tx *txn, table, index string, +func getCompoundWithTxn(tx ReadTxn, table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) { return tx.Get(table, index, idxVals...) From d301145e623fadc5ef14f02d31330156feabf0e6 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:21 -0500 Subject: [PATCH 5/9] Refactor state store usage to track unique service names This commit refactors the state store usage code to track unique service name changes on transaction commit. This means we only need to lookup usage entries when reading the information, as opposed to iterating over a large number of service indices. - Take into account a service instance's name being changed - Do not iterate through entire list of service instances, we only care about whether there is 0, 1, or more than 1. --- agent/consul/state/usage.go | 138 +++++++++++++++++++++++++------ agent/consul/state/usage_oss.go | 24 +----- agent/consul/state/usage_test.go | 61 ++++++++++++++ 3 files changed, 177 insertions(+), 46 deletions(-) diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 8b226b3e19bd..6e43f3729f7f 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -3,9 +3,14 @@ package state import ( "fmt" + "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) +const ( + serviceNamesUsageTable = "service-names" +) + // usageTableSchema returns a new table schema used for tracking various indexes // for the Raft log. func usageTableSchema() *memdb.TableSchema { @@ -29,12 +34,29 @@ func init() { registerSchema(usageTableSchema) } +// UsageEntry represents a count of some arbitrary identifier within the +// state store, along with the last seen index. type UsageEntry struct { ID string Index uint64 Count int } +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +type uniqueServiceState int + +const ( + NoChange uniqueServiceState = 0 + Deleted uniqueServiceState = 1 + Created uniqueServiceState = 2 +) + // updateUsage takes a set of memdb changes and computes a delta for specific // usage metrics that we track. func updateUsage(tx WriteTxn, changes Changes) error { @@ -46,23 +68,98 @@ func updateUsage(tx WriteTxn, changes Changes) error { } else if change.Deleted() { delta = -1 } + switch change.Table { case "nodes": usageDeltas[change.Table] += delta case "services": + svc := changeObject(change).(*structs.ServiceNode) usageDeltas[change.Table] += delta - } + serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return err + } + + var serviceState uniqueServiceState + if serviceIter.Next() == nil { + // If no services exist, we know we deleted the last service + // instance. + serviceState = Deleted + usageDeltas[serviceNamesUsageTable] -= 1 + } else if serviceIter.Next() == nil { + // If a second call to Next() returns nil, we know only a single + // instance exists. If, in addition, a new service name has been + // registered, either via creating a new service instance or via + // renaming an existing service, than we update our service count. + // + // We only care about two cases here: + // 1. A new service instance has been created with a unique name + // 2. An existing service instance has been updated with a new unique name + // + // These are the only ways a new unique service can be created. The + // other valid cases here: an update that does not change the service + // name, and a deletion, both do not impact the count of unique service + // names in the system. + + if change.Created() { + // Given a single existing service instance of the service: If a + // service has just been created, then we know this is a new unique + // service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + } else if serviceNameChanged(change) { + // Given a single existing service instance of the service: If a + // service has been updated with a new service name, then we know + // this is a new unique service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 - addEnterpriseUsage(usageDeltas, change) + // Check whether the previous name was deleted in this rename, this + // is a special case of renaming a service which does not result in + // changing the count of unique service names. + before := change.Before.(*structs.ServiceNode) + beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta) + if err != nil { + return err + } + if beforeSvc == nil { + usageDeltas[serviceNamesUsageTable] -= 1 + // set serviceState to NoChange since we have both gained and lost a + // service, cancelling each other out + serviceState = NoChange + } + } + } + addEnterpriseServiceUsage(usageDeltas, change, serviceState) + } } idx := changes.Index // This will happen when restoring from a snapshot, just take the max index // of the tables we are tracking. if idx == 0 { - idx = maxIndexTxn(tx, "nodes", "services") + idx = maxIndexTxn(tx, "nodes", servicesTableName) } + return writeUsageDeltas(tx, idx, usageDeltas) +} + +// serviceNameChanged returns a boolean that indicates whether the +// provided change resulted in an update to the service's service name. +func serviceNameChanged(change memdb.Change) bool { + if change.Updated() { + before := change.Before.(*structs.ServiceNode) + after := change.After.(*structs.ServiceNode) + return before.ServiceName != after.ServiceName + } + + return false +} + +// writeUsageDeltas will take in a map of IDs to deltas and update each +// entry accordingly, checking for integer underflow. The index that is +// passed in will be recorded on the entry as well. +func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error { for id, delta := range usageDeltas { u, err := tx.First("usage", "id", id) if err != nil { @@ -98,34 +195,16 @@ func updateUsage(tx WriteTxn, changes Changes) error { return nil } -// ServiceUsage contains all of the usage data related to services -type ServiceUsage struct { - Services int - ServiceInstances int - EnterpriseServiceUsage -} - // NodeCount returns the latest seen Raft index, a count of the number of nodes // registered, and any errors. func (s *Store) NodeCount() (uint64, int, error) { tx := s.db.ReadTxn() defer tx.Abort() - usage, err := tx.First("usage", "id", "nodes") + nodeUsage, err := firstUsageEntry(tx, "nodes") if err != nil { return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) } - - // If no nodes have been registered, the usage entry will not exist. - if usage == nil { - return 0, 0, nil - } - - nodeUsage, ok := usage.(*UsageEntry) - if !ok { - return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage) - } - return nodeUsage.Index, nodeUsage.Count, nil } @@ -135,17 +214,26 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - usage, err := firstUsageEntry(tx, "services") + serviceInstances, err := firstUsageEntry(tx, servicesTableName) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - results, err := compileServiceUsage(tx, usage.Count) + services, err := firstUsageEntry(tx, serviceNamesUsageTable) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + usage := ServiceUsage{ + ServiceInstances: serviceInstances.Count, + Services: services.Count, + } + results, err := compileEnterpriseUsage(tx, usage) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - return usage.Index, results, nil + return serviceInstances.Index, results, nil } func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index 825b80494838..f35576abf5f0 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -3,31 +3,13 @@ package state import ( - "fmt" - memdb "github.com/hashicorp/go-memdb" ) type EnterpriseServiceUsage struct{} -func addEnterpriseUsage(map[string]int, memdb.Change) {} - -func compileServiceUsage(tx ReadTxn, totalInstances int) (ServiceUsage, error) { - var totalServices int - results, err := tx.Get( - "index", - "id_prefix", - serviceIndexName("", nil), - ) - if err != nil { - return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err) - } - for i := results.Next(); i != nil; i = results.Next() { - totalServices += 1 - } +func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {} - return ServiceUsage{ - Services: totalServices, - ServiceInstances: totalInstances, - }, nil +func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { + return usage, nil } diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go index a1c07f6546ee..f608d7d75cb4 100644 --- a/agent/consul/state/usage_test.go +++ b/agent/consul/state/usage_test.go @@ -131,3 +131,64 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "negative count") } + +func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) { + s := testStateStore(t) + testRegisterNode(t, s, 1, "node1") + testRegisterService(t, s, 1, "node1", "service1") + + t.Run("rename service with a single instance", func(t *testing.T) { + svc := &structs.NodeService{ + ID: "service1", + Service: "after", + Address: "1.1.1.1", + Port: 1111, + } + require.NoError(t, s.EnsureService(2, "node1", svc)) + + // We renamed a service with a single instance, so we maintain 1 service. + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 1) + }) + + t.Run("rename service with a multiple instances", func(t *testing.T) { + svc2 := &structs.NodeService{ + ID: "service2", + Service: "before", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(3, "node1", svc2)) + + svc3 := &structs.NodeService{ + ID: "service3", + Service: "before", + Address: "1.1.1.3", + Port: 1111, + } + require.NoError(t, s.EnsureService(4, "node1", svc3)) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(4)) + require.Equal(t, usage.Services, 2) + require.Equal(t, usage.ServiceInstances, 3) + + update := &structs.NodeService{ + ID: "service2", + Service: "another-name", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(5, "node1", update)) + + idx, usage, err = s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(5)) + require.Equal(t, usage.Services, 3) + require.Equal(t, usage.ServiceInstances, 3) + }) +} From a3028cad89e751b2c7c12baca6eca77601066baf Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:22 -0500 Subject: [PATCH 6/9] Update godoc string for memdb wrapper functions/structs --- agent/consul/state/memdb.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3b5d5e696eb1..895da9e06941 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -31,8 +31,9 @@ type Changes struct { } // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on -// all write transactions. When the transaction is committed the changes are -// sent to the eventPublisher which will create and emit change events. +// all write transactions. When the transaction is committed the changes are: +// 1. Used to update our internal usage tracking +// 2. Sent to the eventPublisher which will create and emit change events type changeTrackerDB struct { db *memdb.MemDB publisher eventPublisher @@ -100,7 +101,8 @@ func (c *changeTrackerDB) publish(changes Changes) error { // Restore where we need to replace the entire contents of the Store. // WriteTxnRestore uses a zero index since the whole restore doesn't really // occur at one index - the effect is to write many values that were previously -// written across many indexes. +// written across many indexes. WriteTxnRestore also does not publish any +// change events to subscribers. func (c *changeTrackerDB) WriteTxnRestore() *txn { t := &txn{ Txn: c.db.Txn(true), From bcb586bee2e5637869d7a310de738ae9daf6c683 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:23 -0500 Subject: [PATCH 7/9] Set metrics reporting interval to 9 seconds This is below the 10 second interval that lib/telemetry.go implements as its aggregation interval, ensuring that we always report these metrics. --- agent/consul/config.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/agent/consul/config.go b/agent/consul/config.go index 955fb49d6613..43164756516f 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -593,9 +593,12 @@ func DefaultConfig() *Config { }, }, + // Stay under the 10 second aggregation interval of + // go-metrics. This ensures we always report the + // usage metrics in each cycle. + MetricsReportingInterval: 9 * time.Second, ServerHealthInterval: 2 * time.Second, AutopilotInterval: 10 * time.Second, - MetricsReportingInterval: 10 * time.Second, DefaultQueryTime: 300 * time.Second, MaxQueryTime: 600 * time.Second, From bf50c478db42b2cc8d14588f40e5cc61e955fbb2 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:38:29 -0500 Subject: [PATCH 8/9] docs: add new usage metrics to telemetry page --- website/pages/docs/agent/telemetry.mdx | 3 +++ 1 file changed, 3 insertions(+) diff --git a/website/pages/docs/agent/telemetry.mdx b/website/pages/docs/agent/telemetry.mdx index 449997fcd7b3..ea40e2ee6460 100644 --- a/website/pages/docs/agent/telemetry.mdx +++ b/website/pages/docs/agent/telemetry.mdx @@ -171,6 +171,9 @@ This is a full list of metrics emitted by Consul. | `consul.runtime.num_goroutines` | This tracks the number of running goroutines and is a general load pressure indicator. This may burst from time to time but should return to a steady state value. | number of goroutines | gauge | | `consul.runtime.alloc_bytes` | This measures the number of bytes allocated by the Consul process. This may burst from time to time but should return to a steady state value. | bytes | gauge | | `consul.runtime.heap_objects` | This measures the number of objects allocated on the heap and is a general memory pressure indicator. This may burst from time to time but should return to a steady state value. | number of objects | gauge | +| `consul.state.nodes` | This meansures the current number of nodes registered with Consul. It is only emitted by Consul servers. | number of objects | gauge | +| `consul.state.services` | This meansures the current number of unique services registered with Consul, based on service name. It is only emitted by Consul servers. | number of objects | gauge | +| `consul.state.service_instances` | This meansures the current number of unique service instances registered with Consul. It is only emitted by Consul servers. | number of objects | gauge | | `consul.acl.cache_hit` | The number of ACL cache hits. | hits | counter | | `consul.acl.cache_miss` | The number of ACL cache misses. | misses | counter | | `consul.acl.replication_hit` | The number of ACL replication cache hits (when not running in the ACL datacenter). | hits | counter | From 40cbd5a8f3251b829331644dd75d4c95818549da Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:39:35 -0500 Subject: [PATCH 9/9] Changelog entry for usage metrics --- .changelog/8603.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/8603.txt diff --git a/.changelog/8603.txt b/.changelog/8603.txt new file mode 100644 index 000000000000..ffe9a9401f8e --- /dev/null +++ b/.changelog/8603.txt @@ -0,0 +1,3 @@ +```release-note:feature +telemetry: track node and service counts and emit them as metrics +```