Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metric names and add a legacy config flag #3535

Merged
merged 3 commits into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ func (m *aclManager) lookupACL(a *Agent, id string) (acl.ACL, error) {
}
if cached != nil && time.Now().Before(cached.Expires) {
metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1)
metrics.IncrCounter([]string{"acl", "cache_hit"}, 1)
return cached.ACL, nil
}
metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1)
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)

// At this point we might have a stale cached ACL, or none at all, so
// try to contact the servers.
Expand Down
8 changes: 8 additions & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
}
}

// Add a filter rule if needed for enabling the deprecated metric names
enableDeprecatedNames := b.boolVal(c.Telemetry.EnableDeprecatedNames)
if enableDeprecatedNames {
telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, "consul.consul")
} else {
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, "consul.consul")
}

// raft performance scaling
performanceRaftMultiplier := b.intVal(c.Performance.RaftMultiplier)
if performanceRaftMultiplier < 1 || uint(performanceRaftMultiplier) > consul.MaxRaftMultiplier {
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ type Telemetry struct {
MetricsPrefix *string `json:"metrics_prefix,omitempty" hcl:"metrics_prefix" mapstructure:"metrics_prefix"`
StatsdAddr *string `json:"statsd_address,omitempty" hcl:"statsd_address" mapstructure:"statsd_address"`
StatsiteAddr *string `json:"statsite_address,omitempty" hcl:"statsite_address" mapstructure:"statsite_address"`
EnableDeprecatedNames *bool `json:"enable_deprecated_names" hcl:"enable_deprecated_names" mapstructure:"enable_deprecated_names"`
}

type Ports struct {
Expand Down
20 changes: 10 additions & 10 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ type RuntimeConfig struct {
ConsulSerfWANSuspicionMult int
ConsulServerHealthInterval time.Duration

ACLAgentMasterToken string
ACLAgentToken string
ACLDatacenter string
ACLDefaultPolicy string
ACLDownPolicy string
ACLEnforceVersion8 bool
ACLAgentMasterToken string
ACLAgentToken string
ACLDatacenter string
ACLDefaultPolicy string
ACLDownPolicy string
ACLEnforceVersion8 bool
ACLEnableKeyListPolicy bool
ACLMasterToken string
ACLReplicationToken string
ACLTTL time.Duration
ACLToken string
ACLMasterToken string
ACLReplicationToken string
ACLTTL time.Duration
ACLToken string

AutopilotCleanupDeadServers bool
AutopilotDisableUpgradeMigration bool
Expand Down
24 changes: 22 additions & 2 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,10 +1659,28 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
patch: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.TelemetryAllowedPrefixes = []string{"foo"}
rt.TelemetryBlockedPrefixes = []string{"bar"}
rt.TelemetryBlockedPrefixes = []string{"bar", "consul.consul"}
},
warns: []string{`Filter rule must begin with either '+' or '-': "nix"`},
},
{
desc: "telemetry.enable_deprecated_names adds allow rule for whitelist",
flags: []string{
`-data-dir=` + dataDir,
},
json: []string{`{
"telemetry": { "enable_deprecated_names": true, "filter_default": false }
}`},
hcl: []string{`
telemetry = { enable_deprecated_names = true filter_default = false }
`},
patch: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.TelemetryFilterDefault = false
rt.TelemetryAllowedPrefixes = []string{"consul.consul"}
rt.TelemetryBlockedPrefixes = []string{}
},
},
{
desc: "encrypt has invalid key",
flags: []string{
Expand Down Expand Up @@ -2312,6 +2330,7 @@ func TestFullConfig(t *testing.T) {
"dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ],
"filter_default": true,
"prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ],
"enable_deprecated_names": true,
"metrics_prefix": "ftO6DySn",
"statsd_address": "drce87cy",
"statsite_address": "HpFwKB8R"
Expand Down Expand Up @@ -2728,6 +2747,7 @@ func TestFullConfig(t *testing.T) {
dogstatsd_tags = [ "3N81zSUB","Xtj8AnXZ" ]
filter_default = true
prefix_filter = [ "+oJotS8XJ","-cazlEhGn" ]
enable_deprecated_names = true
metrics_prefix = "ftO6DySn"
statsd_address = "drce87cy"
statsite_address = "HpFwKB8R"
Expand Down Expand Up @@ -3266,7 +3286,7 @@ func TestFullConfig(t *testing.T) {
TelemetryDogstatsdAddr: "0wSndumK",
TelemetryDogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"},
TelemetryFilterDefault: true,
TelemetryAllowedPrefixes: []string{"oJotS8XJ"},
TelemetryAllowedPrefixes: []string{"oJotS8XJ", "consul.consul"},
TelemetryBlockedPrefixes: []string{"cazlEhGn"},
TelemetryMetricsPrefix: "ftO6DySn",
TelemetryStatsdAddr: "drce87cy",
Expand Down
5 changes: 5 additions & 0 deletions agent/consul/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type aclCacheEntry struct {
// using its replicated ACLs during an outage.
func (s *Server) aclLocalFault(id string) (string, string, error) {
defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now())
defer metrics.MeasureSince([]string{"acl", "fault"}, time.Now())

// Query the state store.
state := s.fsm.State()
Expand Down Expand Up @@ -75,6 +76,7 @@ func (s *Server) resolveToken(id string) (acl.ACL, error) {
return nil, nil
}
defer metrics.MeasureSince([]string{"consul", "acl", "resolveToken"}, time.Now())
defer metrics.MeasureSince([]string{"acl", "resolveToken"}, time.Now())

// Handle the anonymous token
if len(id) == 0 {
Expand Down Expand Up @@ -158,9 +160,11 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Check for live cache.
if cached != nil && time.Now().Before(cached.Expires) {
metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1)
metrics.IncrCounter([]string{"acl", "cache_hit"}, 1)
return cached.ACL, nil
}
metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1)
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)

// Attempt to refresh the policy from the ACL datacenter via an RPC.
args := structs.ACLPolicyRequest{
Expand Down Expand Up @@ -223,6 +227,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Note we use the local TTL here, so this'll be used for that
// amount of time even once the ACL datacenter becomes available.
metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1)
metrics.IncrCounter([]string{"acl", "replication_hit"}, 1)
reply.ETag = makeACLETag(parent, policy)
reply.TTL = c.config.ACLTTL
reply.Parent = parent
Expand Down
1 change: 1 addition & 0 deletions agent/consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
return err
}
defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now())

// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
Expand Down
3 changes: 3 additions & 0 deletions agent/consul/acl_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
// have replicated to, so this is expected to block until something changes.
func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
defer metrics.MeasureSince([]string{"consul", "leader", "fetchRemoteACLs"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now())

args := structs.DCSpecificRequest{
Datacenter: s.config.ACLDatacenter,
Expand All @@ -170,6 +171,7 @@ func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs,
// local ACLs in-line with the remote ACLs from the ACL datacenter.
func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
defer metrics.MeasureSince([]string{"consul", "leader", "updateLocalACLs"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now())

minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
for _, change := range changes {
Expand Down Expand Up @@ -217,6 +219,7 @@ func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) {
// periods of time. This metric is a good measure of how expensive the
// replication process is.
defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now())

local, err := s.fetchLocalACLs()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions agent/consul/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,13 @@ func (s *Server) updateClusterHealth() error {
// Heartbeat a metric for monitoring if we're the leader
if s.IsLeader() {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
}

Expand Down
8 changes: 8 additions & 0 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
return err
}
defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now())
defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now())

// Verify the args.
if args.Node == "" || args.Address == "" {
Expand Down Expand Up @@ -114,6 +115,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
return err
}
defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now())
defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now())

// Verify the args
if args.Node == "" {
Expand Down Expand Up @@ -272,13 +274,19 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
if err == nil {
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"catalog", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {

// Enforce the RPC limit.
metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}

Expand All @@ -267,8 +269,10 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io

// Enforce the RPC limit.
metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}

Expand Down
16 changes: 16 additions & 0 deletions agent/consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {

func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
Expand All @@ -140,6 +141,7 @@ func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {

func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
Expand Down Expand Up @@ -174,6 +176,8 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case api.KVSet:
return c.state.KVSSet(index, &req.DirEnt)
Expand Down Expand Up @@ -219,6 +223,8 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
Expand All @@ -240,6 +246,8 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.ACLBootstrapInit:
enabled, err := c.state.ACLBootstrapInit(index)
Expand Down Expand Up @@ -272,6 +280,8 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
Expand All @@ -291,6 +301,7 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
panic(fmt.Errorf("failed to decode batch updates: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now())
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
return err
}
Expand All @@ -307,6 +318,8 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf

defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
return c.state.PreparedQuerySet(index, req.Query)
Expand All @@ -324,6 +337,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
results, errors := c.state.TxnRW(index, req.Ops)
return structs.TxnResponse{
Results: results,
Expand All @@ -337,6 +351,7 @@ func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now())

if req.CAS {
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
Expand Down Expand Up @@ -506,6 +521,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {

func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())

// Register the nodes
encoder := codec.NewEncoder(sink, msgpackHandle)
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,19 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
if err == nil {
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"health", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
metrics.IncrCounterWithLabels([]string{"health", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.Nodes) == 0 {
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"health", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
Expand Down
1 change: 1 addition & 0 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now())

// Perform the pre-apply checks.
acl, err := k.srv.resolveToken(args.Token)
Expand Down
Loading