Skip to content

Commit

Permalink
Merge pull request #4878 from hashicorp/f-metric-prefix
Browse files Browse the repository at this point in the history
Metric prefix filtering
  • Loading branch information
nickethier authored Nov 15, 2018
2 parents a6e257b + b99d089 commit 3a826a0
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 39 deletions.
1 change: 1 addition & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
// Setup telemetry related config
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics
conf.DisableDispatchedJobSummaryMetrics = agentConfig.Telemetry.DisableDispatchedJobSummaryMetrics
conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics

return conf, nil
Expand Down
13 changes: 13 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
metricsConf.EnableHostname = true
}

allowedPrefixes, blockedPrefixes, err := telConfig.PrefixFilters()
if err != nil {
return inm, err
}

metricsConf.AllowedPrefixes = allowedPrefixes
metricsConf.BlockedPrefixes = blockedPrefixes

if telConfig.FilterDefault != nil {
metricsConf.FilterDefault = *telConfig.FilterDefault
}

// Configure the statsite sink
var fanout metrics.FanoutSink
if telConfig.StatsiteAddr != "" {
Expand Down Expand Up @@ -895,6 +907,7 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, inm)
}

return inm, nil
}

Expand Down
43 changes: 43 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,19 @@ type Telemetry struct {
// key/value structure as done in older versions of Nomad
BackwardsCompatibleMetrics bool `mapstructure:"backwards_compatible_metrics"`

// PrefixFilter allows for filtering out metrics from being collected
PrefixFilter []string `mapstructure:"prefix_filter"`

// FilterDefault controls whether to allow metrics that have not been specified
// by the filter
FilterDefault *bool `mapstructure:"filter_default"`

// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
// publishing Job summary metrics. This is useful in environment that produce
// high numbers of single count dispatch jobs as the metrics for each take up
// a small memory overhead.
DisableDispatchedJobSummaryMetrics bool `mapstructure:"disable_dispatched_job_summary_metrics"`

// Circonus: see https://github.com/circonus-labs/circonus-gometrics
// for more details on the various configuration options.
// Valid configuration combinations:
Expand Down Expand Up @@ -506,6 +519,24 @@ type Telemetry struct {
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
}

// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (t *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range t.PrefixFilter {
if rule == "" {
continue
}
switch rule[0] {
case '+':
allowed = append(allowed, rule[1:])
case '-':
blocked = append(blocked, rule[1:])
default:
return nil, nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %q", rule)
}
}
return allowed, blocked, nil
}

// Ports encapsulates the various ports we bind to for network services. If any
// are not specified then the defaults are used instead.
type Ports struct {
Expand Down Expand Up @@ -1323,6 +1354,18 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
result.BackwardsCompatibleMetrics = b.BackwardsCompatibleMetrics
}

if b.PrefixFilter != nil {
result.PrefixFilter = b.PrefixFilter
}

if b.FilterDefault != nil {
result.FilterDefault = b.FilterDefault
}

if b.DisableDispatchedJobSummaryMetrics {
result.DisableDispatchedJobSummaryMetrics = b.DisableDispatchedJobSummaryMetrics
}

return &result
}

Expand Down
3 changes: 3 additions & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ func parseTelemetry(result **Telemetry, list *ast.ObjectList) error {
"circonus_broker_select_tag",
"disable_tagged_metrics",
"backwards_compatible_metrics",
"prefix_filter",
"filter_default",
"disable_dispatched_job_summary_metrics",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
Expand Down
69 changes: 69 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestConfig_Merge(t *testing.T) {
CirconusCheckTags: "cat1:tag1,cat2:tag2",
CirconusBrokerID: "0",
CirconusBrokerSelectTag: "dc:dc1",
PrefixFilter: []string{"filter1", "filter2"},
},
Client: &ClientConfig{
Enabled: false,
Expand Down Expand Up @@ -215,6 +216,9 @@ func TestConfig_Merge(t *testing.T) {
CirconusCheckTags: "cat1:tag1,cat2:tag2",
CirconusBrokerID: "1",
CirconusBrokerSelectTag: "dc:dc2",
PrefixFilter: []string{"prefix1", "prefix2"},
DisableDispatchedJobSummaryMetrics: true,
FilterDefault: helper.BoolToPtr(false),
},
Client: &ClientConfig{
Enabled: true,
Expand Down Expand Up @@ -1013,3 +1017,68 @@ func TestMergeServerJoin(t *testing.T) {
require.Equal(result.RetryInterval, retryInterval)
}
}

func TestTelemetry_PrefixFilters(t *testing.T) {
t.Parallel()
cases := []struct {
in []string
expAllow []string
expBlock []string
expErr bool
}{
{
in: []string{"+foo"},
expAllow: []string{"foo"},
},
{
in: []string{"-foo"},
expBlock: []string{"foo"},
},
{
in: []string{"+a.b.c", "-x.y.z"},
expAllow: []string{"a.b.c"},
expBlock: []string{"x.y.z"},
},
{
in: []string{"+foo", "bad", "-bar"},
expErr: true,
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("PrefixCase%d", i), func(t *testing.T) {
require := require.New(t)
tel := &Telemetry{
PrefixFilter: c.in,
}

allow, block, err := tel.PrefixFilters()
require.Exactly(c.expAllow, allow)
require.Exactly(c.expBlock, block)
require.Equal(c.expErr, err != nil)
})
}
}

func TestTelemetry_Parse(t *testing.T) {
require := require.New(t)
dir, err := ioutil.TempDir("", "nomad")
require.NoError(err)
defer os.RemoveAll(dir)

file1 := filepath.Join(dir, "config1.hcl")
err = ioutil.WriteFile(file1, []byte(`telemetry{
prefix_filter = ["+nomad.raft"]
filter_default = false
disable_dispatched_job_summary_metrics = true
}`), 0600)
require.NoError(err)

// Works on config dir
config, err := LoadConfig(dir)
require.NoError(err)

require.False(*config.Telemetry.FilterDefault)
require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter)
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
}
4 changes: 4 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ type Config struct {
// key/value/tag format, or simply a key/value format
DisableTaggedMetrics bool

// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
// publishing Job summary metrics
DisableDispatchedJobSummaryMetrics bool

// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older versions, or to only show the new format
BackwardsCompatibleMetrics bool
Expand Down
76 changes: 45 additions & 31 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,45 +613,59 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
break
}
summary := raw.(*structs.JobSummary)
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
if s.config.DisableDispatchedJobSummaryMetrics {
job, err := state.JobByID(ws, summary.Namespace, summary.JobID)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to lookup job for summary: %v", err)
continue
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
if job.Dispatched {
continue
}
}
s.iterateJobSummaryMetrics(summary)
}
}
}
}

func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
}
}
}

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
Expand Down
14 changes: 7 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,13 +1574,13 @@ func (n *Node) Stub() *NodeListStub {
addr, _, _ := net.SplitHostPort(n.HTTPAddr)

return &NodeListStub{
Address: addr,
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Version: n.Attributes["nomad.version"],
Drain: n.Drain,
Address: addr,
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Version: n.Attributes["nomad.version"],
Drain: n.Drain,
SchedulingEligibility: n.SchedulingEligibility,
Status: n.Status,
StatusDescription: n.StatusDescription,
Expand Down
25 changes: 24 additions & 1 deletion website/source/docs/agent/configuration/telemetry.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,36 @@ The following options are available on all telemetry configurations.
only be added to tagged metrics. Note that this option is used to transition
monitoring to tagged metrics and will eventually be deprecated.


- `disable_tagged_metrics` `(bool: false)` - Specifies if Nomad should not emit
tagged metrics and only emit metrics compatible with versions below Nomad
0.7. Note that this option is used to transition monitoring to tagged
metrics and will eventually be deprecated.

- `filter_default` `(bool: true)` - This controls whether to allow metrics that
have not been specified by the filter. Defaults to true, which will allow all
metrics when no filters are provided. When set to false with no filters, no
metrics will be sent.

- `prefix_filter` `(list: [])` - This is a list of filter rules to apply for
allowing/blocking metrics by prefix. A leading "<b>+</b>" will enable any
metrics with the given prefix, and a leading "<b>-</b>" will block them. If
there is overlap between two rules, the more specific rule will take
precedence. Blocking will take priority if the same prefix is listed multiple
times.

```javascript
[
"-nomad.raft",
"+nomad.raft.apply",
"-nomad.memberlist",
]
```

- `disable_dispatched_job_summary_metrics` `(bool: false)` - Specifies if Nomad
should ignore jobs dispatched from a parameterized job when publishing job
summary statistics. Since each job has a small memory overhead for tracking
summary statistics, it is sometimes desired to trade these statistics for
more memory when dispatching high volumes of jobs.

### `statsite`

Expand Down

0 comments on commit 3a826a0

Please sign in to comment.