Skip to content

Commit

Permalink
Revert "revert telemetry changes"
Browse files Browse the repository at this point in the history
This reverts commit 0fed622.

0fed622 was done in order to review and merge Bloomberg’s non-telemetry changes from telemetry changes. Now that the non-telemetry changes are merged into master, in order to view the telemetry changes from the original branch, we need to unrevert the telemetry changes.
  • Loading branch information
lornasong committed Aug 13, 2020
1 parent df7bf0e commit 53794ff
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 3 deletions.
10 changes: 10 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul-esm/version"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
)

const LeaderKey = "leader"
Expand Down Expand Up @@ -64,6 +66,8 @@ type Agent struct {
watchedNodeFunc func(map[string]bool, []*api.Node)
knownNodeStatuses map[string]lastKnownStatus
knownNodeStatusesLock sync.Mutex

memSink *metrics.InmemSink
}

func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
Expand All @@ -73,6 +77,11 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
return nil, err
}

memSink, err := lib.InitTelemetry(config.Telemetry)
if err != nil {
return nil, err
}

agent := Agent{
config: config,
client: client,
Expand All @@ -81,6 +90,7 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
shutdownCh: make(chan struct{}),
inflightPings: make(map[string]struct{}),
knownNodeStatuses: make(map[string]lastKnownStatus),
memSink: memSink,
}

logger.Printf("[INFO] Connecting to Consul on %s...", clientConf.Address)
Expand Down
3 changes: 3 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
consulchecks "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -182,6 +183,7 @@ func (c *CheckRunner) updateCheckTCP(latestCheck *api.HealthCheck, checkHash typ
// UpdateChecks takes a list of checks from the catalog and updates
// our list of running checks to match.
func (c *CheckRunner) UpdateChecks(checks api.HealthChecks) {
defer metrics.MeasureSince([]string{"checks", "update"}, time.Now())
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -339,6 +341,7 @@ func (c *CheckRunner) handleCheckUpdate(check *api.HealthCheck, status, output s
},
},
}
metrics.IncrCounter([]string{"check", "txn"}, 1)
ok, resp, _, err := c.client.Txn().Txn(ops, nil)
if err != nil {
c.logger.Printf("[WARN] Error updating check status in Consul: %v", err)
Expand Down
114 changes: 111 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
Expand Down Expand Up @@ -51,6 +52,8 @@ type Config struct {
PingType string

DisableCoordinateUpdates bool

Telemetry lib.TelemetryConfig
}

func (c *Config) ClientConfig() *api.Config {
Expand Down Expand Up @@ -111,6 +114,31 @@ func DefaultConfig() (*Config, error) {
}, nil
}

type Telemetry struct {
CirconusAPIApp *string `mapstructure:"circonus_api_app"`
CirconusAPIToken *string `mapstructure:"circonus_api_token"`
CirconusAPIURL *string `mapstructure:"circonus_api_url"`
CirconusBrokerID *string `mapstructure:"circonus_broker_id"`
CirconusBrokerSelectTag *string `mapstructure:"circonus_broker_select_tag"`
CirconusCheckDisplayName *string `mapstructure:"circonus_check_display_name"`
CirconusCheckForceMetricActivation *string `mapstructure:"circonus_check_force_metric_activation"`
CirconusCheckID *string `mapstructure:"circonus_check_id"`
CirconusCheckInstanceID *string `mapstructure:"circonus_check_instance_id"`
CirconusCheckSearchTag *string `mapstructure:"circonus_check_search_tag"`
CirconusCheckTags *string `mapstructure:"circonus_check_tags"`
CirconusSubmissionInterval *string `mapstructure:"circonus_submission_interval"`
CirconusSubmissionURL *string `mapstructure:"circonus_submission_url"`
DisableHostname *bool `mapstructure:"disable_hostname"`
DogstatsdAddr *string `mapstructure:"dogstatsd_addr"`
DogstatsdTags []string `mapstructure:"dogstatsd_tags"`
FilterDefault *bool `mapstructure:"filter_default"`
PrefixFilter []string `mapstructure:"prefix_filter"`
MetricsPrefix *string `mapstructure:"metrics_prefix"`
PrometheusRetentionTime *string `mapstructure:"prometheus_retention_time"`
StatsdAddr *string `mapstructure:"statsd_address"`
StatsiteAddr *string `mapstructure:"statsite_address"`
}

// HumanConfig contains configuration that the practitioner can set
type HumanConfig struct {
LogLevel flags.StringValue `mapstructure:"log_level"`
Expand Down Expand Up @@ -138,6 +166,8 @@ type HumanConfig struct {
PingType flags.StringValue `mapstructure:"ping_type"`

DisableCoordinateUpdates flags.BoolValue `mapstructure:"disable_coordinate_updates"`

Telemetry []Telemetry `mapstructure:"telemetry"`
}

// DecodeConfig takes a reader containing config file and returns
Expand Down Expand Up @@ -166,6 +196,11 @@ func DecodeConfig(r io.Reader) (*HumanConfig, error) {
return nil, fmt.Errorf("only one node_meta block allowed")
}

telemetry := list.Filter("telemetry")
if len(telemetry.Elem().Items) > 1 {
return nil, fmt.Errorf("only one telemetry block allowed")
}

// Decode the full thing into a map[string]interface for ease of use
var config HumanConfig
var m map[string]interface{}
Expand Down Expand Up @@ -253,9 +288,8 @@ func MergeConfigPaths(dst *Config, paths []string) error {
if err != nil {
return err
}
MergeConfig(dst, src)

return nil
return MergeConfig(dst, src)
}

for _, path := range paths {
Expand All @@ -267,9 +301,24 @@ func MergeConfigPaths(dst *Config, paths []string) error {
return nil
}

func stringVal(v *string) string {
if v == nil {
return ""
}
return *v
}

func boolVal(v *bool) bool {
if v == nil {
return false
}

return *v
}

// MergeConfig merges the default config with any configuration
// set by the practitioner
func MergeConfig(dst *Config, src *HumanConfig) {
func MergeConfig(dst *Config, src *HumanConfig) error {
src.LogLevel.Merge(&dst.LogLevel)
src.InstanceID.Merge(&dst.InstanceID)
src.Service.Merge(&dst.Service)
Expand All @@ -290,4 +339,63 @@ func MergeConfig(dst *Config, src *HumanConfig) {
src.TLSServerName.Merge(&dst.TLSServerName)
src.PingType.Merge(&dst.PingType)
src.DisableCoordinateUpdates.Merge(&dst.DisableCoordinateUpdates)

// We check on parse time that there is at most one
if len(src.Telemetry) != 0 {
telemetry := src.Telemetry[0]
// Parse the metric filters
var telemetryAllowedPrefixes, telemetryBlockedPrefixes []string
for _, rule := range telemetry.PrefixFilter {
if rule == "" {
fmt.Println("[WARN] Cannot have empty filter rule in prefix_filter")
continue
}
switch rule[0] {
case '+':
telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, rule[1:])
case '-':
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, rule[1:])
default:
fmt.Printf("[WARN] Filter rule must begin with either '+' or '-': %q\n", rule)
}
}

var prometheusRetentionTime time.Duration
if telemetry.PrometheusRetentionTime != nil {
d, err := time.ParseDuration(*telemetry.PrometheusRetentionTime)
if err != nil {
return fmt.Errorf("prometheus_retention_time: invalid duration: %q: %s", *telemetry.PrometheusRetentionTime, err)
}

prometheusRetentionTime = d
}

dst.Telemetry = lib.TelemetryConfig{
CirconusAPIApp: stringVal(telemetry.CirconusAPIApp),
CirconusAPIToken: stringVal(telemetry.CirconusAPIToken),
CirconusAPIURL: stringVal(telemetry.CirconusAPIURL),
CirconusBrokerID: stringVal(telemetry.CirconusBrokerID),
CirconusBrokerSelectTag: stringVal(telemetry.CirconusBrokerSelectTag),
CirconusCheckDisplayName: stringVal(telemetry.CirconusCheckDisplayName),
CirconusCheckForceMetricActivation: stringVal(telemetry.CirconusCheckForceMetricActivation),
CirconusCheckID: stringVal(telemetry.CirconusCheckID),
CirconusCheckInstanceID: stringVal(telemetry.CirconusCheckInstanceID),
CirconusCheckSearchTag: stringVal(telemetry.CirconusCheckSearchTag),
CirconusCheckTags: stringVal(telemetry.CirconusCheckTags),
CirconusSubmissionInterval: stringVal(telemetry.CirconusSubmissionInterval),
CirconusSubmissionURL: stringVal(telemetry.CirconusSubmissionURL),
DisableHostname: boolVal(telemetry.DisableHostname),
DogstatsdAddr: stringVal(telemetry.DogstatsdAddr),
DogstatsdTags: telemetry.DogstatsdTags,
PrometheusRetentionTime: prometheusRetentionTime,
FilterDefault: boolVal(telemetry.FilterDefault),
AllowedPrefixes: telemetryAllowedPrefixes,
BlockedPrefixes: telemetryBlockedPrefixes,
MetricsPrefix: stringVal(telemetry.MetricsPrefix),
StatsdAddr: stringVal(telemetry.StatsdAddr),
StatsiteAddr: stringVal(telemetry.StatsiteAddr),
}
}

return nil
}
11 changes: 11 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"testing"
"time"

"github.com/hashicorp/consul/lib"
)

func TestDecodeMergeConfig(t *testing.T) {
Expand All @@ -31,6 +33,10 @@ key_file = "key.pem"
tls_server_name = "example.io"
disable_coordinate_updates = true
ping_type = "socket"
telemetry {
statsd_address = "example.io:8888"
prefix_filter = ["+good", "-bad", "+better", "-worse", "wrong", ""]
}
`)

expected := &Config{
Expand All @@ -55,6 +61,11 @@ ping_type = "socket"
TLSServerName: "example.io",
DisableCoordinateUpdates: true,
PingType: PingTypeSocket,
Telemetry: lib.TelemetryConfig{
StatsdAddr: "example.io:8888",
AllowedPrefixes: []string{"good", "better"},
BlockedPrefixes: []string{"bad", "worse"},
},
}

result := &Config{}
Expand Down
2 changes: 2 additions & 0 deletions coordinate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -264,6 +265,7 @@ func (a *Agent) updateFailedNodeTxn(node *api.Node, kvClient *api.KV, key string

// updateNodeCheck updates the node's externalNodeHealth check with the given status/output.
func (a *Agent) updateNodeCheck(node *api.Node, ops api.TxnOps, status, output string) error {
metrics.IncrCounter([]string{"coord", "txn"}, 1)
// Update the external health check status.
ops = append(ops, &api.TxnOp{
Check: &api.CheckTxnOp{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
github.com/DataDog/datadog-go v3.2.0+incompatible // indirect
github.com/Microsoft/go-winio v0.4.5 // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/hashicorp/consul v1.6.1
github.com/hashicorp/consul/api v1.2.0
github.com/hashicorp/consul/sdk v0.4.0
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func main() {
ui.Info(fmt.Sprintf(" Service ID: %q", agent.serviceID()))
ui.Info(fmt.Sprintf("Node Reconnect Timeout: %q", config.NodeReconnectTimeout.String()))
ui.Info(fmt.Sprintf(" Disable coordinates: %t", config.DisableCoordinateUpdates))
ui.Info(fmt.Sprintf(" Statsd address: %q", config.Telemetry.StatsdAddr))
ui.Info(fmt.Sprintf(" Metrix prefix: %q", config.Telemetry.MetricsPrefix))
ui.Info("")
ui.Output("Log data will now stream in as it occurs:\n")

Expand Down

0 comments on commit 53794ff

Please sign in to comment.