Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloomberg PR: Telemetry improvements #67

Merged
merged 3 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,34 @@ tls_server_name = ""
// The method to use for pinging external nodes. Defaults to "udp" but can
// also be set to "socket" to use ICMP (which requires root privileges).
ping_type = "udp"

// The telemetry configuration which matches Consul's telemetry config options.
// See Consul's documentation https://www.consul.io/docs/agent/options#telemetry
// for more details on how to configure
telemetry {
circonus_api_app = ""
circonus_api_token = ""
circonus_api_url = ""
circonus_broker_id = ""
circonus_broker_select_tag = ""
circonus_check_display_name = ""
circonus_check_force_metric_activation = ""
circonus_check_id = ""
circonus_check_instance_id = ""
circonus_check_search_tag = ""
circonus_check_tags = ""
circonus_submission_interval = ""
circonus_submission_url = ""
disable_hostname = false
dogstatsd_addr = ""
dogstatsd_tags = []
filter_default = false
prefix_filter = []
metrics_prefix = ""
prometheus_retention_time = ""
statsd_address = ""
statsite_address = ""
}
```

[HCL]: https://github.com/hashicorp/hcl "HashiCorp Configuration Language (HCL)"
Expand Down
10 changes: 10 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul-esm/version"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
)

const LeaderKey = "leader"
Expand Down Expand Up @@ -64,6 +66,8 @@ type Agent struct {
watchedNodeFunc func(map[string]bool, []*api.Node)
knownNodeStatuses map[string]lastKnownStatus
knownNodeStatusesLock sync.Mutex

memSink *metrics.InmemSink
}

func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
Expand All @@ -73,6 +77,11 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
return nil, err
}

memSink, err := lib.InitTelemetry(config.Telemetry)
if err != nil {
return nil, err
}

agent := Agent{
config: config,
client: client,
Expand All @@ -81,6 +90,7 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
shutdownCh: make(chan struct{}),
inflightPings: make(map[string]struct{}),
knownNodeStatuses: make(map[string]lastKnownStatus),
memSink: memSink,
}

logger.Printf("[INFO] Connecting to Consul on %s...", clientConf.Address)
Expand Down
3 changes: 3 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
consulchecks "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -182,6 +183,7 @@ func (c *CheckRunner) updateCheckTCP(latestCheck *api.HealthCheck, checkHash typ
// UpdateChecks takes a list of checks from the catalog and updates
// our list of running checks to match.
func (c *CheckRunner) UpdateChecks(checks api.HealthChecks) {
defer metrics.MeasureSince([]string{"checks", "update"}, time.Now())
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -339,6 +341,7 @@ func (c *CheckRunner) handleCheckUpdate(check *api.HealthCheck, status, output s
},
},
}
metrics.IncrCounter([]string{"check", "txn"}, 1)
ok, resp, _, err := c.client.Txn().Txn(ops, nil)
if err != nil {
c.logger.Printf("[WARN] Error updating check status in Consul: %v", err)
Expand Down
123 changes: 120 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
Expand Down Expand Up @@ -51,6 +52,8 @@ type Config struct {
PingType string

DisableCoordinateUpdates bool

Telemetry lib.TelemetryConfig
}

func (c *Config) ClientConfig() *api.Config {
Expand Down Expand Up @@ -111,6 +114,32 @@ func DefaultConfig() (*Config, error) {
}, nil
}

// Telemetry is the configuration for to match Consul's go-metrics telemetry config
type Telemetry struct {
CirconusAPIApp *string `mapstructure:"circonus_api_app"`
CirconusAPIToken *string `mapstructure:"circonus_api_token"`
CirconusAPIURL *string `mapstructure:"circonus_api_url"`
CirconusBrokerID *string `mapstructure:"circonus_broker_id"`
CirconusBrokerSelectTag *string `mapstructure:"circonus_broker_select_tag"`
CirconusCheckDisplayName *string `mapstructure:"circonus_check_display_name"`
CirconusCheckForceMetricActivation *string `mapstructure:"circonus_check_force_metric_activation"`
CirconusCheckID *string `mapstructure:"circonus_check_id"`
CirconusCheckInstanceID *string `mapstructure:"circonus_check_instance_id"`
CirconusCheckSearchTag *string `mapstructure:"circonus_check_search_tag"`
CirconusCheckTags *string `mapstructure:"circonus_check_tags"`
CirconusSubmissionInterval *string `mapstructure:"circonus_submission_interval"`
CirconusSubmissionURL *string `mapstructure:"circonus_submission_url"`
DisableHostname *bool `mapstructure:"disable_hostname"`
DogstatsdAddr *string `mapstructure:"dogstatsd_addr"`
DogstatsdTags []string `mapstructure:"dogstatsd_tags"`
PrometheusRetentionTime *string `mapstructure:"prometheus_retention_time"`
FilterDefault *bool `mapstructure:"filter_default"`
PrefixFilter []string `mapstructure:"prefix_filter"`
MetricsPrefix *string `mapstructure:"metrics_prefix"`
StatsdAddr *string `mapstructure:"statsd_address"`
StatsiteAddr *string `mapstructure:"statsite_address"`
}

// HumanConfig contains configuration that the practitioner can set
type HumanConfig struct {
LogLevel flags.StringValue `mapstructure:"log_level"`
Expand Down Expand Up @@ -138,6 +167,8 @@ type HumanConfig struct {
PingType flags.StringValue `mapstructure:"ping_type"`

DisableCoordinateUpdates flags.BoolValue `mapstructure:"disable_coordinate_updates"`

Telemetry []Telemetry `mapstructure:"telemetry"`
}

// DecodeConfig takes a reader containing config file and returns
Expand Down Expand Up @@ -166,6 +197,11 @@ func DecodeConfig(r io.Reader) (*HumanConfig, error) {
return nil, fmt.Errorf("only one node_meta block allowed")
}

telemetry := list.Filter("telemetry")
if len(telemetry.Elem().Items) > 1 {
return nil, fmt.Errorf("only one telemetry block allowed")
}

// Decode the full thing into a map[string]interface for ease of use
var config HumanConfig
var m map[string]interface{}
Expand Down Expand Up @@ -253,9 +289,8 @@ func MergeConfigPaths(dst *Config, paths []string) error {
if err != nil {
return err
}
MergeConfig(dst, src)

return nil
return MergeConfig(dst, src)
}

for _, path := range paths {
Expand All @@ -267,9 +302,82 @@ func MergeConfigPaths(dst *Config, paths []string) error {
return nil
}

func stringVal(v *string) string {
if v == nil {
return ""
}
return *v
}

func boolVal(v *bool) bool {
if v == nil {
return false
}

return *v
}

// convertTelemetry converts the HumanConfig{} telemetry to the telemetry
// structure needed by Config{}
func convertTelemetry(telemetry Telemetry) (lib.TelemetryConfig, error) {
// split metric filters into allow vs. blocked
var telemetryAllowedPrefixes, telemetryBlockedPrefixes []string
for _, rule := range telemetry.PrefixFilter {
if rule == "" {
fmt.Println("[WARN] Cannot have empty filter rule in prefix_filter")
continue
}
switch rule[0] {
case '+':
telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, rule[1:])
case '-':
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, rule[1:])
default:
fmt.Printf("[WARN] Filter rule must begin with either '+' or '-': %q\n", rule)
}
}

var prometheusRetentionTime time.Duration
if telemetry.PrometheusRetentionTime != nil {
d, err := time.ParseDuration(*telemetry.PrometheusRetentionTime)
if err != nil {
return lib.TelemetryConfig{},
fmt.Errorf("prometheus_retention_time: invalid duration: %q: %s", *telemetry.PrometheusRetentionTime, err)
}

prometheusRetentionTime = d
}

return lib.TelemetryConfig{
CirconusAPIApp: stringVal(telemetry.CirconusAPIApp),
CirconusAPIToken: stringVal(telemetry.CirconusAPIToken),
CirconusAPIURL: stringVal(telemetry.CirconusAPIURL),
CirconusBrokerID: stringVal(telemetry.CirconusBrokerID),
CirconusBrokerSelectTag: stringVal(telemetry.CirconusBrokerSelectTag),
CirconusCheckDisplayName: stringVal(telemetry.CirconusCheckDisplayName),
CirconusCheckForceMetricActivation: stringVal(telemetry.CirconusCheckForceMetricActivation),
CirconusCheckID: stringVal(telemetry.CirconusCheckID),
CirconusCheckInstanceID: stringVal(telemetry.CirconusCheckInstanceID),
CirconusCheckSearchTag: stringVal(telemetry.CirconusCheckSearchTag),
CirconusCheckTags: stringVal(telemetry.CirconusCheckTags),
CirconusSubmissionInterval: stringVal(telemetry.CirconusSubmissionInterval),
CirconusSubmissionURL: stringVal(telemetry.CirconusSubmissionURL),
DisableHostname: boolVal(telemetry.DisableHostname),
DogstatsdAddr: stringVal(telemetry.DogstatsdAddr),
DogstatsdTags: telemetry.DogstatsdTags,
PrometheusRetentionTime: prometheusRetentionTime,
FilterDefault: boolVal(telemetry.FilterDefault),
AllowedPrefixes: telemetryAllowedPrefixes,
BlockedPrefixes: telemetryBlockedPrefixes,
MetricsPrefix: stringVal(telemetry.MetricsPrefix),
StatsdAddr: stringVal(telemetry.StatsdAddr),
StatsiteAddr: stringVal(telemetry.StatsiteAddr),
}, nil
}

// MergeConfig merges the default config with any configuration
// set by the practitioner
func MergeConfig(dst *Config, src *HumanConfig) {
func MergeConfig(dst *Config, src *HumanConfig) error {
src.LogLevel.Merge(&dst.LogLevel)
src.InstanceID.Merge(&dst.InstanceID)
src.Service.Merge(&dst.Service)
Expand All @@ -290,4 +398,13 @@ func MergeConfig(dst *Config, src *HumanConfig) {
src.TLSServerName.Merge(&dst.TLSServerName)
src.PingType.Merge(&dst.PingType)
src.DisableCoordinateUpdates.Merge(&dst.DisableCoordinateUpdates)
if len(src.Telemetry) == 1 {
t, err := convertTelemetry(src.Telemetry[0])
if err != nil {
return err
}
dst.Telemetry = t
}

return nil
}
Loading