Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric prefix filtering #4878

Merged
merged 5 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
// Setup telemetry related config
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics
conf.DisableDispatchedJobSummaryMetrics = agentConfig.Telemetry.DisableDispatchedJobSummaryMetrics
conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics

return conf, nil
Expand Down
9 changes: 9 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,14 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
metricsConf.EnableHostname = true
}

allowedPrefixes, blockedPrefixes, err := telConfig.PrefixFilters()
if err != nil {
return inm, err
}

metricsConf.AllowedPrefixes = allowedPrefixes
metricsConf.BlockedPrefixes = blockedPrefixes

// Configure the statsite sink
var fanout metrics.FanoutSink
if telConfig.StatsiteAddr != "" {
Expand Down Expand Up @@ -895,6 +903,7 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, inm)
}

return inm, nil
}

Expand Down
35 changes: 35 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,15 @@ type Telemetry struct {
// key/value structure as done in older versions of Nomad
BackwardsCompatibleMetrics bool `mapstructure:"backwards_compatible_metrics"`

// PrefixFilter allows for filtering out metrics from being collected
nickethier marked this conversation as resolved.
Show resolved Hide resolved
PrefixFilter []string `mapstructure:"prefix_filter"`

// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
// publishing Job summary metrics. This is useful in environment that produce
// high numbers of single count dispatch jobs as the metrics for each take up
// a small memory overhead.
DisableDispatchedJobSummaryMetrics bool `mapstructure:"disable_dispatched_job_summary_metrics"`
nickethier marked this conversation as resolved.
Show resolved Hide resolved

// Circonus: see https://github.com/circonus-labs/circonus-gometrics
// for more details on the various configuration options.
// Valid configuration combinations:
Expand Down Expand Up @@ -506,6 +515,24 @@ type Telemetry struct {
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
}

// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (t *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range t.PrefixFilter {
if rule == "" {
continue
}
switch rule[0] {
case '+':
allowed = append(allowed, rule[1:])
case '-':
blocked = append(blocked, rule[1:])
default:
return nil, nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %q", rule)
}
}
return allowed, blocked, nil
}

// Ports encapsulates the various ports we bind to for network services. If any
// are not specified then the defaults are used instead.
type Ports struct {
Expand Down Expand Up @@ -1323,6 +1350,14 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
result.BackwardsCompatibleMetrics = b.BackwardsCompatibleMetrics
}

if b.PrefixFilter != nil {
result.PrefixFilter = b.PrefixFilter
}

if b.DisableDispatchedJobSummaryMetrics {
result.DisableDispatchedJobSummaryMetrics = b.DisableDispatchedJobSummaryMetrics
}

return &result
}

Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,8 @@ func parseTelemetry(result **Telemetry, list *ast.ObjectList) error {
"circonus_broker_select_tag",
"disable_tagged_metrics",
"backwards_compatible_metrics",
"prefix_filter",
"disable_dispatched_job_summary_metrics",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
Expand Down
44 changes: 44 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestConfig_Merge(t *testing.T) {
CirconusCheckTags: "cat1:tag1,cat2:tag2",
CirconusBrokerID: "0",
CirconusBrokerSelectTag: "dc:dc1",
PrefixFilter: []string{"filter1", "filter2"},
nickethier marked this conversation as resolved.
Show resolved Hide resolved
},
Client: &ClientConfig{
Enabled: false,
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestConfig_Merge(t *testing.T) {
CirconusCheckTags: "cat1:tag1,cat2:tag2",
CirconusBrokerID: "1",
CirconusBrokerSelectTag: "dc:dc2",
PrefixFilter: []string{"prefix1", "prefix2"},
},
Client: &ClientConfig{
Enabled: true,
Expand Down Expand Up @@ -1013,3 +1015,45 @@ func TestMergeServerJoin(t *testing.T) {
require.Equal(result.RetryInterval, retryInterval)
}
}

func TestTelemetry_PrefixFilters(t *testing.T) {
t.Parallel()
cases := []struct {
in []string
expAllow []string
expBlock []string
expErr bool
}{
{
in: []string{"+foo"},
expAllow: []string{"foo"},
},
{
in: []string{"-foo"},
expBlock: []string{"foo"},
},
{
in: []string{"+a.b.c", "-x.y.z"},
expAllow: []string{"a.b.c"},
expBlock: []string{"x.y.z"},
},
{
in: []string{"+foo", "bad", "-bar"},
expErr: true,
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("PrefixCase%d", i), func(t *testing.T) {
require := require.New(t)
tel := &Telemetry{
PrefixFilter: c.in,
}

allow, block, err := tel.PrefixFilters()
require.Exactly(c.expAllow, allow)
require.Exactly(c.expBlock, block)
require.Equal(c.expErr, err != nil)
})
}
}
4 changes: 4 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ type Config struct {
// key/value/tag format, or simply a key/value format
DisableTaggedMetrics bool

// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
// publishing Job summary metrics
DisableDispatchedJobSummaryMetrics bool

// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older versions, or to only show the new format
BackwardsCompatibleMetrics bool
Expand Down
73 changes: 40 additions & 33 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,45 +613,52 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
break
}
summary := raw.(*structs.JobSummary)
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
}
if s.config.DisableDispatchedJobSummaryMetrics && summary.Dispatched {
nickethier marked this conversation as resolved.
Show resolved Hide resolved
continue
}
s.iterateJobSummaryMetrics(summary)
}
}
}
}

func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
}
}
}

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
Expand Down
8 changes: 5 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3022,9 +3022,10 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {

// Create a job summary for the job
summary := &structs.JobSummary{
JobID: job.ID,
Namespace: job.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
JobID: job.ID,
Namespace: job.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Dispatched: job.Dispatched,
}
for _, tg := range job.TaskGroups {
summary.Summary[tg.Name] = structs.TaskGroupSummary{}
Expand Down Expand Up @@ -3321,6 +3322,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
Summary: make(map[string]structs.TaskGroupSummary),
Children: new(structs.JobChildrenSummary),
CreateIndex: index,
Dispatched: job.Dispatched,
}
hasSummaryChanged = true
}
Expand Down
17 changes: 10 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,13 +1574,13 @@ func (n *Node) Stub() *NodeListStub {
addr, _, _ := net.SplitHostPort(n.HTTPAddr)

return &NodeListStub{
Address: addr,
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Version: n.Attributes["nomad.version"],
Drain: n.Drain,
Address: addr,
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Version: n.Attributes["nomad.version"],
Drain: n.Drain,
SchedulingEligibility: n.SchedulingEligibility,
Status: n.Status,
StatusDescription: n.StatusDescription,
Expand Down Expand Up @@ -2482,6 +2482,9 @@ type JobSummary struct {
// Children contains a summary for the children of this job.
Children *JobChildrenSummary

// Dispatched is true if this job is dispatched from a parameterized job
Dispatched bool

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
Expand Down
19 changes: 19 additions & 0 deletions website/source/docs/agent/configuration/telemetry.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,26 @@ The following options are available on all telemetry configurations.
0.7. Note that this option is used to transition monitoring to tagged
metrics and will eventually be deprecated.

- `prefix_filter` `(list: [])` - This is a list of filter rules to apply for
allowing/blocking metrics by prefix. A leading "<b>+</b>" will enable any
metrics with the given prefix, and a leading "<b>-</b>" will block them. If
there is overlap between two rules, the more specific rule will take
precedence. Blocking will take priority if the same prefix is listed multiple
times.

```javascript
[
"-nomad.raft",
"+nomad.raft.apply",
"-nomad.memberlist",
]
```

- `disable_dispatched_job_summary_metrics` `(bool: false)` - Specifies if Nomad
should ignore jobs dispatched from a parameterized job when publishing job
summary statistics. Since each job has a small memory overhead for tracking
summary statistics, it is sometimes desired to trade these statistics for
more memory when dispatching high volumes of jobs.

### `statsite`

Expand Down