From 7a6e13b9bbfa8e6a1d2e07afbd92a3ab5522de7c Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Fri, 10 Nov 2017 13:55:30 -0800 Subject: [PATCH 01/15] Add DC/OS input plugin --- Godeps | 7 +- plugins/inputs/all/all.go | 1 + plugins/inputs/dcos/README.md | 209 ++++++++++++++ plugins/inputs/dcos/client.go | 340 ++++++++++++++++++++++ plugins/inputs/dcos/client_test.go | 224 +++++++++++++++ plugins/inputs/dcos/dcos.go | 410 ++++++++++++++++++++++++++ plugins/inputs/dcos/dcos_test.go | 445 +++++++++++++++++++++++++++++ 7 files changed, 1633 insertions(+), 3 deletions(-) create mode 100644 plugins/inputs/dcos/README.md create mode 100644 plugins/inputs/dcos/client.go create mode 100644 plugins/inputs/dcos/client_test.go create mode 100644 plugins/inputs/dcos/dcos.go create mode 100644 plugins/inputs/dcos/dcos_test.go diff --git a/Godeps b/Godeps index 7e90dd061f162..0f8c5bebff4fb 100644 --- a/Godeps +++ b/Godeps @@ -10,20 +10,21 @@ github.com/couchbase/go-couchbase bfe555a140d53dc1adf390f1a1d4b0fd4ceadb28 github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29 github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6 github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 +github.com/dgrijalva/jwt-go dbeaa9332f19a944acb5736b4456cfcc02140e29 github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27 github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang d4f545eb108a2d19f9b1a735689dbfb719bc21fb -github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a -github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 +github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 +github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 +github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 87de7b53e8e86..49adf283c38e5 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/consul" _ "github.com/influxdata/telegraf/plugins/inputs/couchbase" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" + _ "github.com/influxdata/telegraf/plugins/inputs/dcos" _ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/dmcache" _ "github.com/influxdata/telegraf/plugins/inputs/dns_query" diff --git a/plugins/inputs/dcos/README.md b/plugins/inputs/dcos/README.md new file mode 100644 index 0000000000000..3d3845221ccde --- /dev/null +++ b/plugins/inputs/dcos/README.md @@ -0,0 +1,209 @@ +# DC/OS Input Plugin + +This input plugin gathers metrics from a DC/OS cluster's [metrics component](https://docs.mesosphere.com/1.10/metrics/). + +**Series Cardinality Warning** + +Depending on the work load of your DC/OS cluster, this plugin can quickly +create a high number of series which, when unchecked, can cause high load on +your database. + +- Use [measurement filtering](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#measurement-filtering) liberally to exclude unneeded metrics as well as the node, container, and app inclue/exclude options. +- Write to a database with an appropriate [retention policy](https://docs.influxdata.com/influxdb/v1.3/concepts/glossary/#retention-policy-rp). +- Limit the number of series allowed in your database. +- Consider enabling the [TSI](https://docs.influxdata.com/influxdb/v1.3/about_the_project/releasenotes-changelog/#release-notes-8) engine. +- Monitor your [series cardinality](https://docs.influxdata.com/influxdb/v1.3/troubleshooting/frequently-asked-questions/#how-can-i-query-for-series-cardinality). + +### Configuration: +```toml +[[inputs.dcos]] + ## The DC/OS cluster URL. + cluster_url = "https://dcos-master-1" + + ## The ID of the service account. + service_account_id = "telegraf" + ## The private key file for the service account. + service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem" + + ## Path containing login token. If set, will read on every gather. + # token_file = "/home/dcos/.dcos/token" + + ## In all filter options if both include and exclude are empty all items + ## will be collected. Arrays may contain glob patterns. + ## + ## Node IDs to collect metrics from. If a node is excluded, no metrics will + ## be collected for its containers or apps. + # node_include = [] + # node_exclude = [] + ## Container IDs to collect container metrics from. + # container_include = [] + # container_exclude = [] + ## Container IDs to collect app metrics from. + # app_include = [] + # app_exclude = [] + + ## Maximum concurrent connections to the cluster. + # max_connections = 1 + ## Maximum time to receive a response from cluster. + # response_timeout = "20s" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## If false, skip chain & host verification + # insecure_skip_verify = true + + ## Recommended filtering to reduce series cardinality. + # [inputs.dcos.tagdrop] + # path = ["/var/lib/mesos/slave/slaves/*"] +``` + +#### Enterprise Authentication + +When using Enterprise DC/OS, it is recommended to use a service account to +authenticate with the cluster. + +The plugin requires the following permissions: +``` +dcos:adminrouter:ops:system-metrics full +dcos:adminrouter:ops:mesos full +``` + +Follow the directions to [create a service account and assign permissions](https://docs.mesosphere.com/1.10/security/service-auth/custom-service-auth/). + +Quick configuration using the Enterprise CLI: +``` +dcos security org service-accounts keypair telegraf-sa-key.pem telegraf-sa-cert.pem +dcos security org service-accounts create -p telegraf-sa-cert.pem -d "Telegraf DC/OS input plugin" telegraf +dcos security org users grant telegraf dcos:adminrouter:ops:system-metrics full +dcos security org users grant telegraf dcos:adminrouter:ops:mesos full +``` + +#### Open Source Authentication + +The Open Source DC/OS does not provide service accounts. Instead you can use +of the following options: + +1. [Disable authentication](https://dcos.io/docs/1.10/security/managing-authentication/#authentication-opt-out) +2. Use the `token_file` parameter to read a authentication token from a file. + +Then `token_file` can be set by using the [dcos cli] to login periodically. +The cli can login for at most XXX days, you will need to ensure the cli +performs a new login before this time expires. +``` +dcos auth login --username foo --password bar +dcos config show core.dcos_acs_token > ~/.dcos/token +``` + +Another option to create a `token_file` is to generate a token using the +cluster secret. This will allow you to set the expiration date manually or +even create a never expiring token. However, if the cluster secret or the +token is compromised it cannot be revoked and may require a full reinstall of +the cluster. For more information on this technique reference +[this blog post](https://medium.com/@richardgirges/authenticating-open-source-dc-os-with-third-party-services-125fa33a5add). + +### Metrics: + +Please consult the [Metrics Reference](https://docs.mesosphere.com/1.10/metrics/reference/) +for details on interprete field interpretation. + +- dcos_node + - tags: + - cluster + - hostname + - path (filesystem fields only) + - interface (network fields only) + - fields: + - system_uptime (float) + - cpu_cores (float) + - cpu_total (float) + - cpu_user (float) + - cpu_system (float) + - cpu_idle (float) + - cpu_wait (float) + - load_1min (float) + - load_5min (float) + - load_15min (float) + - filesystem_capacity_total_bytes (int) + - filesystem_capacity_used_bytes (int) + - filesystem_capacity_free_bytes (int) + - filesystem_inode_total (float) + - filesystem_inode_used (float) + - filesystem_inode_free (float) + - memory_total_bytes (int) + - memory_free_bytes (int) + - memory_buffers_bytes (int) + - memory_cached_bytes (int) + - swap_total_bytes (int) + - swap_free_bytes (int) + - swap_used_bytes (int) + - network_in_bytes (int) + - network_out_bytes (int) + - network_in_packets (float) + - network_out_packets (float) + - network_in_dropped (float) + - network_out_dropped (float) + - network_in_errors (float) + - network_out_errors (float) + - process_count (float) + +- dcos_container + - tags: + - cluster + - hostname + - container_id + - task_name + - fields: + - cpus_limit (float) + - cpus_system_time (float) + - cpus_throttled_time (float) + - cpus_user_time (float) + - disk_limit_bytes (int) + - disk_used_bytes (int) + - mem_limit_bytes (int) + - mem_total_bytes (int) + - net_rx_bytes (int) + - net_rx_dropped (float) + - net_rx_errors (float) + - net_rx_packets (float) + - net_tx_bytes (int) + - net_tx_dropped (float) + - net_tx_errors (float) + - net_tx_packets (float) + +- dcos_app + - tags: + - cluster + - hostname + - container_id + - task_name + - fields: + - fields are application specific + +### Example Output: + +``` +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/boot filesystem_capacity_free_bytes=918188032i,filesystem_capacity_total_bytes=1063256064i,filesystem_capacity_used_bytes=145068032i,filesystem_inode_free=523958,filesystem_inode_total=524288,filesystem_inode_used=330 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=dummy0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=docker0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18 cpu_cores=2,cpu_idle=81.62,cpu_system=4.19,cpu_total=13.670000000000002,cpu_user=9.48,cpu_wait=0,load_15min=0.7,load_1min=0.22,load_5min=0.6,memory_buffers_bytes=970752i,memory_cached_bytes=1830473728i,memory_free_bytes=1178636288i,memory_total_bytes=3975073792i,process_count=198,swap_free_bytes=859828224i,swap_total_bytes=859828224i,swap_used_bytes=0i,system_uptime=18874 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=lo network_in_bytes=1090992450i,network_in_dropped=0,network_in_errors=0,network_in_packets=1546938,network_out_bytes=1090992450i,network_out_dropped=0,network_out_errors=0,network_out_packets=1546938 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/ filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=minuteman network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=eth0 network_in_bytes=539886216i,network_in_dropped=1,network_in_errors=0,network_in_packets=979808,network_out_bytes=112395836i,network_out_dropped=0,network_out_errors=0,network_out_packets=891239 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=spartan network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/overlay filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=vtep1024 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/plugins filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=d-dcos network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=9a78d34a-3bbf-467e-81cf-a57737f154ee,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 cpus_limit=0.3,cpus_system_time=307.31,cpus_throttled_time=102.029930607,cpus_user_time=268.57,disk_limit_bytes=268435456i,disk_used_bytes=30953472i,mem_limit_bytes=570425344i,mem_total_bytes=13316096i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18,task_name=hello-world cpus_limit=0.6,cpus_system_time=25.6,cpus_throttled_time=327.977109217,cpus_user_time=566.54,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=1107296256i,mem_total_bytes=335941632i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=c76e1488-4fb7-4010-a4cf-25725f8173f9,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=cbe0b2f9-061f-44ac-8f15-4844229e8231,hostname=192.168.122.18,task_name=telegraf cpus_limit=0.2,cpus_system_time=8.109999999,cpus_throttled_time=93.183916045,cpus_user_time=17.97,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=167772160i,mem_total_bytes=0i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 cpus_limit=0.2,cpus_system_time=2.69,cpus_throttled_time=20.064861214,cpus_user_time=6.56,disk_limit_bytes=268435456i,disk_used_bytes=29360128i,mem_limit_bytes=297795584i,mem_total_bytes=13733888i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +``` diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go new file mode 100644 index 0000000000000..691b128af44b7 --- /dev/null +++ b/plugins/inputs/dcos/client.go @@ -0,0 +1,340 @@ +package dcos + +import ( + "bytes" + "context" + "crypto/rsa" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + "time" + "unicode/utf8" + + jwt "github.com/dgrijalva/jwt-go" +) + +const ( + // How long to stayed logged in for + loginDuration = 65 * time.Minute + + // How long before expiration to renew token + relogDuration = 5 * time.Minute +) + +type Client interface { + Token() string + EnsureAuth(ctx context.Context) error + GetSummary(ctx context.Context) (*Summary, error) + GetContainers(ctx context.Context, node string) ([]string, error) + GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) + GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) + GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) +} + +type Credentials struct { + Username string + PrivateKey *rsa.PrivateKey + TokenFile string +} + +type APIError struct { + StatusCode int + Title string + Description string +} +type Login struct { + Token string + Title string + Description string +} + +type Slave struct { + ID string `json:"id"` +} + +type Summary struct { + Cluster string + Slaves []Slave +} + +type DataPoint struct { + Name string + Tags map[string]string + Unit string + Value float64 +} + +type Metrics struct { + Datapoints []DataPoint + Dimensions map[string]interface{} +} + +type client struct { + clusterURL *url.URL + httpClient *http.Client + credentials *Credentials + token *authToken + semaphore chan struct{} +} + +type claims struct { + Uid string `json:"uid"` + jwt.StandardClaims +} + +type authToken struct { + text string + expire time.Time +} + +func (e APIError) Error() string { + if e.Description != "" { + return fmt.Sprintf("%s: %s", e.Title, e.Description) + } + return e.Title +} + +func NewClient( + clusterURL *url.URL, + creds *Credentials, + timeout time.Duration, + maxConns int, + tlsConfig *tls.Config, +) *client { + httpClient := &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: maxConns, + TLSClientConfig: tlsConfig, + }, + Timeout: timeout, + } + semaphore := make(chan struct{}, maxConns) + + c := &client{ + clusterURL: clusterURL, + httpClient: httpClient, + credentials: creds, + semaphore: semaphore, + } + return c +} + +func (c *client) Token() string { + if c.token == nil { + return "" + } + return c.token.text +} + +func (c *client) EnsureAuth(ctx context.Context) error { + if c.credentials == nil { + return nil + } + + if c.credentials.TokenFile != "" { + tf := c.credentials.TokenFile + tokenData, err := ioutil.ReadFile(tf) + if err != nil { + return fmt.Errorf("Error opening token_file %q: %s", tf, err) + } + if !utf8.Valid(tokenData) { + return fmt.Errorf("Token file does not contain utf-8 encoded text: %s", tf) + } + token := strings.TrimSpace(string(tokenData)) + c.token = &authToken{text: token} + } + + if c.token == nil || c.token.expire.Add(relogDuration).After(time.Now()) { + token, err := c.login(ctx) + if err != nil { + return err + } + c.token = token + } + return nil +} + +func (c *client) GetSummary(ctx context.Context) (*Summary, error) { + summary := &Summary{} + err := c.doGet(ctx, c.url("/mesos/master/state-summary"), summary) + if err != nil { + return nil, err + } + + return summary, nil +} + +func (c *client) GetContainers(ctx context.Context, node string) ([]string, error) { + containers := []string{} + + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node) + err := c.doGet(ctx, c.url(path), &containers) + if err != nil { + return nil, err + } + + return containers, nil +} + +func (c *client) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { + metrics := &Metrics{} + + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) + err := c.doGet(ctx, c.url(path), metrics) + if err != nil { + return nil, err + } + + return metrics, nil +} + +func (c *client) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { + metrics := &Metrics{} + + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container) + err := c.doGet(ctx, c.url(path), metrics) + if err != nil { + return nil, err + } + + return metrics, nil +} + +func (c *client) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { + metrics := &Metrics{} + + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container) + err := c.doGet(ctx, c.url(path), metrics) + if err != nil { + return nil, err + } + + return metrics, nil +} + +func createGetRequest(url string, token string) (*http.Request, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + if token != "" { + req.Header.Add("Authorization", "token="+token) + } + req.Header.Add("Accept", "application/json") + + return req, nil +} + +func (c *client) doGet(ctx context.Context, url string, v interface{}) error { + req, err := createGetRequest(url, c.Token()) + if err != nil { + return err + } + + select { + case c.semaphore <- struct{}{}: + break + case <-ctx.Done(): + return ctx.Err() + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + <-c.semaphore + return err + } + defer resp.Body.Close() + + // Clear invalid token if unauthorized + if resp.StatusCode == 401 { + c.token = nil + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + <-c.semaphore + return &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + } + } + + if resp.StatusCode == 204 { + <-c.semaphore + return nil + } + + err = json.NewDecoder(resp.Body).Decode(v) + <-c.semaphore + return err +} +func (c *client) url(path string) string { + c.clusterURL.Path = path + return c.clusterURL.String() +} + +func (c *client) login(ctx context.Context) (*authToken, error) { + token, err := c.createLoginToken() + if err != nil { + return nil, err + } + + exp := time.Now().Add(loginDuration) + + body := map[string]interface{}{ + "uid": c.credentials.Username, + "exp": exp.Unix(), + "token": token, + } + + octets, err := json.Marshal(body) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", c.url("/acs/api/v1/auth/login"), bytes.NewBuffer(octets)) + if err != nil { + return nil, err + } + req.Header.Add("Content-Type", "application/json") + + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + login := Login{} + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&login) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 || login.Token == "" { + return nil, &APIError{resp.StatusCode, login.Title, login.Description} + } + + authToken := &authToken{ + text: login.Token, + expire: exp, + } + + return authToken, err +} + +func (c *client) createLoginToken() (string, error) { + token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ + Uid: c.credentials.Username, + StandardClaims: jwt.StandardClaims{ + ExpiresAt: 0, + }, + }) + ss, err := token.SignedString(c.credentials.PrivateKey) + return ss, err +} diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go new file mode 100644 index 0000000000000..7a38a2b0c25ac --- /dev/null +++ b/plugins/inputs/dcos/client_test.go @@ -0,0 +1,224 @@ +package dcos + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + jwt "github.com/dgrijalva/jwt-go" + "github.com/stretchr/testify/require" +) + +const ( + privateKey = `-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQCwlGyzVp9cqtwiNCgCnaR0kilPZhr4xFBcnXxvQ8/uzOHaWKxj +XWR38cKR3gPh5+4iSmzMdo3HDJM5ks6imXGnp+LPOA5iNewnpLNs7UxA2arwKH/6 +4qIaAXAtf5jE46wZIMgc2EW9wGL3dxC0JY8EXPpBFB/3J8gADkorFR8lwwIDAQAB +AoGBAJaFHxfMmjHK77U0UnrQWFSKFy64cftmlL4t/Nl3q7L68PdIKULWZIMeEWZ4 +I0UZiFOwr4em83oejQ1ByGSwekEuiWaKUI85IaHfcbt+ogp9hY/XbOEo56OPQUAd +bEZv1JqJOqta9Ug1/E1P9LjEEyZ5F5ubx7813rxAE31qKtKJAkEA1zaMlCWIr+Rj +hGvzv5rlHH3wbOB4kQFXO4nqj3J/ttzR5QiJW24STMDcbNngFlVcDVju56LrNTiD +dPh9qvl7nwJBANILguR4u33OMksEZTYB7nQZSurqXsq6382zH7pTl29ANQTROHaM +PKC8dnDWq8RGTqKuvWblIzzGIKqIMovZo10CQC96T0UXirITFolOL3XjvAuvFO1Q +EAkdXJs77805m0dCK+P1IChVfiAEpBw3bKJArpAbQIlFfdI953JUp5SieU0CQEub +BSSEKMjh/cxu6peEHnb/262vayuCFKkQPu1sxWewLuVrAe36EKCy9dcsDmv5+rgo +Odjdxc9Madm4aKlaT6kCQQCpAgeblDrrxTrNQ+Typzo37PlnQrvI+0EceAUuJ72G +P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge +-----END RSA PRIVATE KEY-----` +) + +func TestEnsureAuth(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedError error + expectedToken string + }{ + { + name: "Login successful", + responseCode: 200, + responseBody: `{"token": "XXX.YYY.ZZZ"}`, + expectedError: nil, + expectedToken: "XXX.YYY.ZZZ", + }, + { + name: "Unauthorized Error", + responseCode: http.StatusUnauthorized, + responseBody: `{"title": "x", "description": "y"}`, + expectedError: &APIError{http.StatusUnauthorized, "x", "y"}, + expectedToken: "", + }, + } + + key, err := jwt.ParseRSAPrivateKeyFromPEM([]byte(privateKey)) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClient(u, &Credentials{"sa", key, ""}, defaultResponseTimeout, 1, nil) + err = client.EnsureAuth(ctx) + + require.Equal(t, tt.expectedToken, client.Token()) + require.Equal(t, tt.expectedError, err) + + ts.Close() + }) + } + +} + +func TestGetSummary(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedValue *Summary + expectedError error + }{ + { + name: "No nodes", + responseCode: 200, + responseBody: `{"cluster": "a", "slaves": []}`, + expectedValue: &Summary{Cluster: "a", Slaves: []Slave{}}, + expectedError: nil, + }, + { + name: "Unauthorized Error", + responseCode: http.StatusUnauthorized, + responseBody: ``, + expectedValue: nil, + expectedError: &APIError{StatusCode: http.StatusUnauthorized, Title: "401 Unauthorized"}, + }, + { + name: "Has nodes", + responseCode: 200, + responseBody: `{"cluster": "a", "slaves": [{"id": "a"}, {"id": "b"}]}`, + expectedValue: &Summary{ + Cluster: "a", + Slaves: []Slave{ + Slave{ID: "a"}, + Slave{ID: "b"}, + }, + }, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the path + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClient(u, nil, defaultResponseTimeout, 1, nil) + summary, err := client.GetSummary(ctx) + + require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedValue, summary) + + ts.Close() + }) + } + +} + +func TestGetNodeMetrics(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedValue *Metrics + expectedError error + }{ + { + name: "Empty Body", + responseCode: 200, + responseBody: `{}`, + expectedValue: &Metrics{}, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the path + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClient(u, nil, defaultResponseTimeout, 1, nil) + m, err := client.GetNodeMetrics(ctx, "foo") + + require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedValue, m) + + ts.Close() + }) + } + +} + +func TestGetContainerMetrics(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedValue *Metrics + expectedError error + }{ + { + name: "204 No Contents", + responseCode: 204, + responseBody: ``, + expectedValue: &Metrics{}, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the path + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClient(u, nil, defaultResponseTimeout, 1, nil) + m, err := client.GetContainerMetrics(ctx, "foo", "bar") + + require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedValue, m) + + ts.Close() + }) + } + +} diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go new file mode 100644 index 0000000000000..5c6b83a4534ec --- /dev/null +++ b/plugins/inputs/dcos/dcos.go @@ -0,0 +1,410 @@ +package dcos + +import ( + "context" + "io/ioutil" + "net/url" + "sort" + "strings" + "sync" + "time" + + jwt "github.com/dgrijalva/jwt-go" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + defaultMaxConnections = 10 + defaultResponseTimeout = 20 * time.Second +) + +var ( + nodeDimensions = []string{ + "hostname", + "path", + "interface", + } + containerDimensions = []string{ + "hostname", + "container_id", + "task_name", + } + appDimensions = []string{ + "hostname", + "container_id", + "task_name", + } +) + +type DCOS struct { + ClusterURL string `toml:"cluster_url"` + + ServiceAccountID string `toml:"service_account_id"` + ServiceAccountPrivateKey string + + TokenFile string + + NodeInclude []string + NodeExclude []string + ContainerInclude []string + ContainerExclude []string + AppInclude []string + AppExclude []string + + MaxConnections int + ResponseTimeout internal.Duration + + SSLCA string `toml:"ssl_ca"` + SSLCert string `toml:"ssl_cert"` + SSLKey string `toml:"ssl_key"` + InsecureSkipVerify bool `toml:"insecure_skip_verify"` + + client Client + + initialized bool + nodeFilter filter.Filter + containerFilter filter.Filter + appFilter filter.Filter + taskNameFilter filter.Filter +} + +func (d *DCOS) Description() string { + return "Input plugin for DC/OS metrics" +} + +var sampleConfig = ` + ## The DC/OS cluster URL. + cluster_url = "https://dcos-ee-master-1" + + ## The ID of the service account. + service_account_id = "telegraf" + ## The private key file for the service account. + service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem" + + ## Path containing login token. If set, will read on every gather. + # token_file = "/home/dcos/.dcos/token" + + ## In all filter options if both include and exclude are empty all items + ## will be collected. Arrays may contain glob patterns. + ## + ## Node IDs to collect metrics from. If a node is excluded, no metrics will + ## be collected for its containers or apps. + # node_include = [] + # node_exclude = [] + ## Container IDs to collect container metrics from. + # container_include = [] + # container_exclude = [] + ## Container IDs to collect app metrics from. + # app_include = [] + # app_exclude = [] + + ## Maximum concurrent connections to the cluster. + # max_connections = 1 + ## Maximum time to receive a response from cluster. + # response_timeout = "20s" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## If false, skip chain & host verification + # insecure_skip_verify = true + + ## Recommended filtering to reduce series cardinality. + # [inputs.dcos.tagdrop] + # path = ["/var/lib/mesos/slave/slaves/*"] +` + +func (d *DCOS) SampleConfig() string { + return sampleConfig +} + +func (d *DCOS) Gather(acc telegraf.Accumulator) error { + if !d.initialized { + err := d.createFilters() + if err != nil { + return err + } + + d.initialized = true + } + + if d.client == nil { + client, err := d.createClient() + if err != nil { + return err + } + d.client = client + } + + ctx := context.Background() + err := d.client.EnsureAuth(ctx) + if err != nil { + return err + } + + summary, err := d.client.GetSummary(ctx) + if err != nil { + return err + } + + var wg sync.WaitGroup + for _, node := range summary.Slaves { + wg.Add(1) + go func(node string) { + defer wg.Done() + d.GatherNode(ctx, acc, summary.Cluster, node) + }(node.ID) + } + wg.Wait() + + return nil +} + +func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { + if !d.nodeFilter.Match(node) { + return + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + m, err := d.client.GetNodeMetrics(ctx, node) + if err != nil { + acc.AddError(err) + return + } + d.addNodeMetrics(acc, cluster, m) + }() + + d.GatherContainers(ctx, acc, cluster, node) + wg.Wait() +} + +func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { + containers, err := d.client.GetContainers(ctx, node) + if err != nil { + acc.AddError(err) + return + } + + var wg sync.WaitGroup + for _, container := range containers { + if d.containerFilter.Match(container) { + wg.Add(1) + go func(container string) { + defer wg.Done() + m, err := d.client.GetContainerMetrics(ctx, node, container) + if err != nil { + if err, ok := err.(APIError); ok && err.StatusCode == 404 { + return + } + acc.AddError(err) + return + } + d.addContainerMetrics(acc, cluster, m) + }(container) + } + + if d.appFilter.Match(container) { + wg.Add(1) + go func(container string) { + defer wg.Done() + m, err := d.client.GetAppMetrics(ctx, node, container) + if err != nil { + if err, ok := err.(APIError); ok && err.StatusCode == 404 { + return + } + acc.AddError(err) + return + } + d.addAppMetrics(acc, cluster, m) + }(container) + } + } + wg.Wait() +} + +type point struct { + tags map[string]string + labels map[string]string + fields map[string]interface{} +} + +func (d *DCOS) createPoints(acc telegraf.Accumulator, m *Metrics) []*point { + points := make(map[string]*point) + for _, dp := range m.Datapoints { + fieldKey := strings.Replace(dp.Name, ".", "_", -1) + + tags := dp.Tags + if tags == nil { + tags = make(map[string]string) + } + + if dp.Unit == "bytes" && !strings.HasSuffix(fieldKey, "_bytes") { + fieldKey = fieldKey + "_bytes" + } + + tagset := []string{} + for k, v := range tags { + tagset = append(tagset, k+"="+v) + } + sort.Strings(tagset) + seriesParts := make([]string, 0, len(tagset)) + seriesParts = append(seriesParts, tagset...) + seriesKey := strings.Join(seriesParts, ",") + + p, ok := points[seriesKey] + if !ok { + p = &point{} + p.tags = tags + p.labels = make(map[string]string) + p.fields = make(map[string]interface{}) + points[seriesKey] = p + } + + if dp.Unit == "bytes" { + p.fields[fieldKey] = int64(dp.Value) + } else { + p.fields[fieldKey] = dp.Value + } + } + + results := []*point{} + for _, p := range points { + for k, v := range m.Dimensions { + switch v := v.(type) { + case string: + p.tags[k] = v + case map[string]string: + if k == "labels" { + for k, v := range v { + p.labels[k] = v + } + } + } + } + results = append(results, p) + } + return results +} + +func (d *DCOS) addMetrics(acc telegraf.Accumulator, cluster, mname string, m *Metrics, tagDimensions []string) { + tm := time.Now() + + points := d.createPoints(acc, m) + + for _, p := range points { + tags := make(map[string]string) + tags["cluster"] = cluster + for _, tagkey := range tagDimensions { + v, ok := p.tags[tagkey] + if ok { + tags[tagkey] = v + } + } + for k, v := range p.labels { + tags[k] = v + } + + fields := make(map[string]interface{}) + for k, v := range p.fields { + if strings.HasPrefix(k, "dcos_metrics_module_") { + k = strings.TrimPrefix(k, "dcos_metrics_module_") + } + fields[k] = v + } + + acc.AddFields(mname, fields, tags, tm) + } +} + +func (d *DCOS) addNodeMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) { + d.addMetrics(acc, cluster, "dcos_node", m, nodeDimensions) +} + +func (d *DCOS) addContainerMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) { + d.addMetrics(acc, cluster, "dcos_container", m, containerDimensions) +} + +func (d *DCOS) addAppMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) { + d.addMetrics(acc, cluster, "dcos_app", m, appDimensions) +} + +func (d *DCOS) createClient() (*client, error) { + tlsCfg, err := internal.GetTLSConfig( + d.SSLCert, d.SSLKey, d.SSLCA, d.InsecureSkipVerify) + if err != nil { + return nil, err + } + + var creds *Credentials + + if d.ServiceAccountID != "" && d.ServiceAccountPrivateKey != "" { + bs, err := ioutil.ReadFile(d.ServiceAccountPrivateKey) + if err != nil { + return nil, err + } + + privateKey, err := jwt.ParseRSAPrivateKeyFromPEM(bs) + if err != nil { + return nil, err + } + + creds = &Credentials{d.ServiceAccountID, privateKey, ""} + } else if d.TokenFile != "" { + creds = &Credentials{"", nil, d.TokenFile} + } + + url, err := url.Parse(d.ClusterURL) + if err != nil { + return nil, err + } + + client := NewClient( + url, + creds, + d.ResponseTimeout.Duration, + d.MaxConnections, + tlsCfg, + ) + return client, nil +} + +func (d *DCOS) createFilters() error { + var err error + d.nodeFilter, err = filter.NewIncludeExcludeFilter( + d.NodeInclude, d.NodeExclude) + if err != nil { + return err + } + + d.containerFilter, err = filter.NewIncludeExcludeFilter( + d.ContainerInclude, d.ContainerExclude) + if err != nil { + return err + } + + d.appFilter, err = filter.NewIncludeExcludeFilter( + d.AppInclude, d.AppExclude) + if err != nil { + return err + } + + return nil +} + +func init() { + inputs.Add("dcos", func() telegraf.Input { + return &DCOS{ + MaxConnections: defaultMaxConnections, + ResponseTimeout: internal.Duration{ + Duration: defaultResponseTimeout, + }, + } + }) +} diff --git a/plugins/inputs/dcos/dcos_test.go b/plugins/inputs/dcos/dcos_test.go new file mode 100644 index 0000000000000..7b9fff2d4a9c6 --- /dev/null +++ b/plugins/inputs/dcos/dcos_test.go @@ -0,0 +1,445 @@ +package dcos + +import ( + "context" + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +type mockClient struct { + TokenF func() string + EnsureAuthF func(ctx context.Context) error + GetSummaryF func(ctx context.Context) (*Summary, error) + GetContainersF func(ctx context.Context, node string) ([]string, error) + GetNodeMetricsF func(ctx context.Context, node string) (*Metrics, error) + GetContainerMetricsF func(ctx context.Context, node, container string) (*Metrics, error) + GetAppMetricsF func(ctx context.Context, node, container string) (*Metrics, error) +} + +func (c *mockClient) Token() string { + return c.TokenF() +} + +func (c *mockClient) EnsureAuth(ctx context.Context) error { + return c.EnsureAuthF(ctx) +} + +func (c *mockClient) GetSummary(ctx context.Context) (*Summary, error) { + return c.GetSummaryF(ctx) +} + +func (c *mockClient) GetContainers(ctx context.Context, node string) ([]string, error) { + return c.GetContainersF(ctx, node) +} + +func (c *mockClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { + return c.GetNodeMetricsF(ctx, node) +} + +func (c *mockClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { + return c.GetContainerMetricsF(ctx, node, container) +} + +func (c *mockClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { + return c.GetAppMetricsF(ctx, node, container) +} + +func TestAddNodeMetrics(t *testing.T) { + var tests = []struct { + name string + metrics *Metrics + check func(*testutil.Accumulator) []bool + }{ + { + name: "basic datapoint conversion", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "process.count", + Unit: "count", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + }, + "process_count", 42.0, + )} + }, + }, + { + name: "path added as tag", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "filesystem.inode.free", + Tags: map[string]string{ + "path": "/var/lib", + }, + Unit: "count", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "path": "/var/lib", + }, + "filesystem_inode_free", 42.0, + )} + }, + }, + { + name: "interface added as tag", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "network.out.dropped", + Tags: map[string]string{ + "interface": "eth0", + }, + Unit: "count", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "interface": "eth0", + }, + "network_out_dropped", 42.0, + )} + }, + }, + { + name: "bytes unit appended to fieldkey", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "network.in", + Tags: map[string]string{ + "interface": "eth0", + }, + Unit: "bytes", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "interface": "eth0", + }, + "network_in_bytes", int64(42), + )} + }, + }, + { + name: "dimensions added as tags", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "process.count", + Tags: map[string]string{}, + Unit: "count", + Value: 42.0, + }, + { + Name: "memory.total", + Tags: map[string]string{}, + Unit: "bytes", + Value: 42, + }, + }, + Dimensions: map[string]interface{}{ + "cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63", + "hostname": "192.168.122.18", + "mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1", + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "hostname": "192.168.122.18", + }, + "process_count", 42.0), + acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "hostname": "192.168.122.18", + }, + "memory_total_bytes", int64(42)), + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{} + dcos.addNodeMetrics(&acc, "a", tt.metrics) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } + +} + +func TestAddContainerMetrics(t *testing.T) { + var tests = []struct { + name string + metrics *Metrics + check func(*testutil.Accumulator) []bool + }{ + { + name: "container", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "net.rx.errors", + Tags: map[string]string{ + "container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb", + "executor_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + "executor_name": "Command Executor (Task: telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a) (Command: NO EXECUTABLE)", + "framework_id": "ab2f3a8b-06db-4e8c-95b6-fb1940874a30-0001", + "source": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + }, + Unit: "count", + Value: 42.0, + }, + }, + Dimensions: map[string]interface{}{ + "cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63", + "container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb", + "executor_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + "framework_id": "ab2f3a8b-06db-4e8c-95b6-fb1940874a30-0001", + "framework_name": "marathon", + "framework_principal": "dcos_marathon", + "framework_role": "slave_public", + "hostname": "192.168.122.18", + "labels": map[string]string{ + "DCOS_SPACE": "/telegraf", + }, + "mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1", + "task_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + "task_name": "telegraf", + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_container", + map[string]string{ + "cluster": "a", + "container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb", + "hostname": "192.168.122.18", + "task_name": "telegraf", + "DCOS_SPACE": "/telegraf", + }, + "net_rx_errors", + 42.0, + ), + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{} + dcos.addContainerMetrics(&acc, "a", tt.metrics) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } +} + +func TestAddAppMetrics(t *testing.T) { + var tests = []struct { + name string + metrics *Metrics + check func(*testutil.Accumulator) []bool + }{ + { + name: "tags are optional", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "dcos.metrics.module.container_throttled_bytes_per_sec", + Unit: "", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_app", + map[string]string{ + "cluster": "a", + }, + "container_throttled_bytes_per_sec", 42.0, + ), + } + }, + }, + { + name: "dimensions are tagged", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "dcos.metrics.module.container_throttled_bytes_per_sec", + Unit: "", + Value: 42.0, + }, + }, + Dimensions: map[string]interface{}{ + "cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63", + "container_id": "02d31175-1c01-4459-8520-ef8b1339bc52", + "hostname": "192.168.122.18", + "mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1", + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_app", + map[string]string{ + "cluster": "a", + "container_id": "02d31175-1c01-4459-8520-ef8b1339bc52", + "hostname": "192.168.122.18", + }, + "container_throttled_bytes_per_sec", 42.0, + ), + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{} + dcos.addAppMetrics(&acc, "a", tt.metrics) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } +} + +func TestGatherFilterNode(t *testing.T) { + var tests = []struct { + name string + nodeInclude []string + nodeExclude []string + client Client + check func(*testutil.Accumulator) []bool + }{ + { + name: "cluster without nodes has no metrics", + client: &mockClient{ + EnsureAuthF: func(ctx context.Context) error { + return nil + }, + GetSummaryF: func(ctx context.Context) (*Summary, error) { + return &Summary{ + Cluster: "a", + Slaves: []Slave{}, + }, nil + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.NMetrics() == 0, + } + }, + }, + { + name: "node include", + nodeInclude: []string{"x"}, + client: &mockClient{ + EnsureAuthF: func(ctx context.Context) error { + return nil + }, + GetSummaryF: func(ctx context.Context) (*Summary, error) { + return &Summary{ + Cluster: "a", + Slaves: []Slave{ + Slave{ID: "x"}, + Slave{ID: "y"}, + }, + }, nil + }, + GetContainersF: func(ctx context.Context, node string) ([]string, error) { + return []string{}, nil + }, + GetNodeMetricsF: func(ctx context.Context, node string) (*Metrics, error) { + return &Metrics{ + Datapoints: []DataPoint{ + { + Name: "value", + Value: 42.0, + }, + }, + Dimensions: map[string]interface{}{ + "hostname": "x", + }, + }, nil + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "hostname": "x", + }, + "value", 42.0, + ), + acc.NMetrics() == 1, + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{ + NodeInclude: tt.nodeInclude, + NodeExclude: tt.nodeExclude, + client: tt.client, + } + err := dcos.Gather(&acc) + require.NoError(t, err) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } +} From 6eb920775a37783ad471019b182fbe143a17e775 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 28 Nov 2017 11:09:32 -0800 Subject: [PATCH 02/15] fixup Godeps --- Godeps | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Godeps b/Godeps index 0f8c5bebff4fb..8c13d6db8ee20 100644 --- a/Godeps +++ b/Godeps @@ -17,14 +17,14 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang d4f545eb108a2d19f9b1a735689dbfb719bc21fb +github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a -github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 +github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 -github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 -github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 From ab478f3180419fd100fc834cc74a451246640501 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 28 Nov 2017 11:12:15 -0800 Subject: [PATCH 03/15] fixup readme --- plugins/inputs/dcos/README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/plugins/inputs/dcos/README.md b/plugins/inputs/dcos/README.md index 3d3845221ccde..3277a65bb8fd6 100644 --- a/plugins/inputs/dcos/README.md +++ b/plugins/inputs/dcos/README.md @@ -155,14 +155,14 @@ for details on interprete field interpretation. - container_id - task_name - fields: - - cpus_limit (float) - - cpus_system_time (float) - - cpus_throttled_time (float) - - cpus_user_time (float) - - disk_limit_bytes (int) - - disk_used_bytes (int) - - mem_limit_bytes (int) - - mem_total_bytes (int) + - cpus_limit (float) + - cpus_system_time (float) + - cpus_throttled_time (float) + - cpus_user_time (float) + - disk_limit_bytes (int) + - disk_used_bytes (int) + - mem_limit_bytes (int) + - mem_total_bytes (int) - net_rx_bytes (int) - net_rx_dropped (float) - net_rx_errors (float) @@ -179,7 +179,7 @@ for details on interprete field interpretation. - container_id - task_name - fields: - - fields are application specific + - fields are application specific ### Example Output: From 68aa4ccf234408ecd34c92c5af350560ab05c227 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 28 Nov 2017 23:51:52 -0800 Subject: [PATCH 04/15] Refactor credentials/login --- plugins/inputs/dcos/README.md | 2 +- plugins/inputs/dcos/client.go | 176 +++++++++++------------------ plugins/inputs/dcos/client_test.go | 24 ++-- plugins/inputs/dcos/creds.go | 75 ++++++++++++ plugins/inputs/dcos/dcos.go | 99 ++++++++++------ plugins/inputs/dcos/dcos_test.go | 20 ++-- 6 files changed, 228 insertions(+), 168 deletions(-) create mode 100644 plugins/inputs/dcos/creds.go diff --git a/plugins/inputs/dcos/README.md b/plugins/inputs/dcos/README.md index 3277a65bb8fd6..ace068c7271fb 100644 --- a/plugins/inputs/dcos/README.md +++ b/plugins/inputs/dcos/README.md @@ -43,7 +43,7 @@ your database. # app_exclude = [] ## Maximum concurrent connections to the cluster. - # max_connections = 1 + # max_connections = 10 ## Maximum time to receive a response from cluster. # response_timeout = "20s" diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index 691b128af44b7..c9d03edabd129 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -3,31 +3,20 @@ package dcos import ( "bytes" "context" - "crypto/rsa" "crypto/tls" "encoding/json" "fmt" - "io/ioutil" "net/http" "net/url" - "strings" "time" - "unicode/utf8" jwt "github.com/dgrijalva/jwt-go" ) -const ( - // How long to stayed logged in for - loginDuration = 65 * time.Minute - - // How long before expiration to renew token - relogDuration = 5 * time.Minute -) - type Client interface { - Token() string - EnsureAuth(ctx context.Context) error + SetToken(token string) + + Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) GetSummary(ctx context.Context) (*Summary, error) GetContainers(ctx context.Context, node string) ([]string, error) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) @@ -35,17 +24,12 @@ type Client interface { GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) } -type Credentials struct { - Username string - PrivateKey *rsa.PrivateKey - TokenFile string -} - type APIError struct { StatusCode int Title string Description string } + type Login struct { Token string Title string @@ -77,7 +61,7 @@ type client struct { clusterURL *url.URL httpClient *http.Client credentials *Credentials - token *authToken + token string semaphore chan struct{} } @@ -86,7 +70,7 @@ type claims struct { jwt.StandardClaims } -type authToken struct { +type AuthToken struct { text string expire time.Time } @@ -100,7 +84,6 @@ func (e APIError) Error() string { func NewClient( clusterURL *url.URL, - creds *Credentials, timeout time.Duration, maxConns int, tlsConfig *tls.Config, @@ -115,47 +98,66 @@ func NewClient( semaphore := make(chan struct{}, maxConns) c := &client{ - clusterURL: clusterURL, - httpClient: httpClient, - credentials: creds, - semaphore: semaphore, + clusterURL: clusterURL, + httpClient: httpClient, + semaphore: semaphore, } return c } -func (c *client) Token() string { - if c.token == nil { - return "" - } - return c.token.text +func (c *client) SetToken(token string) { + c.token = token } -func (c *client) EnsureAuth(ctx context.Context) error { - if c.credentials == nil { - return nil +func (c *client) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) { + token, err := c.createLoginToken(sa) + if err != nil { + return nil, err } - if c.credentials.TokenFile != "" { - tf := c.credentials.TokenFile - tokenData, err := ioutil.ReadFile(tf) - if err != nil { - return fmt.Errorf("Error opening token_file %q: %s", tf, err) - } - if !utf8.Valid(tokenData) { - return fmt.Errorf("Token file does not contain utf-8 encoded text: %s", tf) - } - token := strings.TrimSpace(string(tokenData)) - c.token = &authToken{text: token} + exp := time.Now().Add(loginDuration) + + body := map[string]interface{}{ + "uid": sa.AccountID, + "exp": exp.Unix(), + "token": token, } - if c.token == nil || c.token.expire.Add(relogDuration).After(time.Now()) { - token, err := c.login(ctx) - if err != nil { - return err - } - c.token = token + octets, err := json.Marshal(body) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", c.url("/acs/api/v1/auth/login"), bytes.NewBuffer(octets)) + if err != nil { + return nil, err + } + req.Header.Add("Content-Type", "application/json") + + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + login := Login{} + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&login) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 || login.Token == "" { + return nil, &APIError{resp.StatusCode, login.Title, login.Description} + } + + authToken := &AuthToken{ + text: login.Token, + expire: exp, } - return nil + + return authToken, err } func (c *client) GetSummary(ctx context.Context) (*Summary, error) { @@ -231,7 +233,7 @@ func createGetRequest(url string, token string) (*http.Request, error) { } func (c *client) doGet(ctx context.Context, url string, v interface{}) error { - req, err := createGetRequest(url, c.Token()) + req, err := createGetRequest(url, c.token) if err != nil { return err } @@ -252,7 +254,7 @@ func (c *client) doGet(ctx context.Context, url string, v interface{}) error { // Clear invalid token if unauthorized if resp.StatusCode == 401 { - c.token = nil + c.token = "" } if resp.StatusCode < 200 || resp.StatusCode >= 300 { @@ -272,69 +274,19 @@ func (c *client) doGet(ctx context.Context, url string, v interface{}) error { <-c.semaphore return err } -func (c *client) url(path string) string { - c.clusterURL.Path = path - return c.clusterURL.String() -} - -func (c *client) login(ctx context.Context) (*authToken, error) { - token, err := c.createLoginToken() - if err != nil { - return nil, err - } - - exp := time.Now().Add(loginDuration) - - body := map[string]interface{}{ - "uid": c.credentials.Username, - "exp": exp.Unix(), - "token": token, - } - - octets, err := json.Marshal(body) - if err != nil { - return nil, err - } - - req, err := http.NewRequest("POST", c.url("/acs/api/v1/auth/login"), bytes.NewBuffer(octets)) - if err != nil { - return nil, err - } - req.Header.Add("Content-Type", "application/json") - req = req.WithContext(ctx) - resp, err := c.httpClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - login := Login{} - dec := json.NewDecoder(resp.Body) - err = dec.Decode(&login) - if err != nil { - return nil, err - } - - if resp.StatusCode != 200 || login.Token == "" { - return nil, &APIError{resp.StatusCode, login.Title, login.Description} - } - - authToken := &authToken{ - text: login.Token, - expire: exp, - } - - return authToken, err +func (c *client) url(path string) string { + url := c.clusterURL + url.Path = path + return url.String() } -func (c *client) createLoginToken() (string, error) { +func (c *client) createLoginToken(sa *ServiceAccount) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ - Uid: c.credentials.Username, + Uid: sa.AccountID, StandardClaims: jwt.StandardClaims{ ExpiresAt: 0, }, }) - ss, err := token.SignedString(c.credentials.PrivateKey) - return ss, err + return token.SignedString(sa.PrivateKey) } diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go index 7a38a2b0c25ac..9a86a7b8ea2a4 100644 --- a/plugins/inputs/dcos/client_test.go +++ b/plugins/inputs/dcos/client_test.go @@ -30,7 +30,7 @@ P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge -----END RSA PRIVATE KEY-----` ) -func TestEnsureAuth(t *testing.T) { +func TestLogin(t *testing.T) { var tests = []struct { name string responseCode int @@ -68,16 +68,24 @@ func TestEnsureAuth(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, &Credentials{"sa", key, ""}, defaultResponseTimeout, 1, nil) - err = client.EnsureAuth(ctx) + sa := &ServiceAccount{ + AccountID: "telegraf", + PrivateKey: key, + } + client := NewClient(u, defaultResponseTimeout, 1, nil) + auth, err := client.Login(ctx, sa) - require.Equal(t, tt.expectedToken, client.Token()) require.Equal(t, tt.expectedError, err) + if tt.expectedToken != "" { + require.Equal(t, tt.expectedToken, auth.text) + } else { + require.Nil(t, auth) + } + ts.Close() }) } - } func TestGetSummary(t *testing.T) { @@ -129,7 +137,7 @@ func TestGetSummary(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, nil, defaultResponseTimeout, 1, nil) + client := NewClient(u, defaultResponseTimeout, 1, nil) summary, err := client.GetSummary(ctx) require.Equal(t, tt.expectedError, err) @@ -170,7 +178,7 @@ func TestGetNodeMetrics(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, nil, defaultResponseTimeout, 1, nil) + client := NewClient(u, defaultResponseTimeout, 1, nil) m, err := client.GetNodeMetrics(ctx, "foo") require.Equal(t, tt.expectedError, err) @@ -211,7 +219,7 @@ func TestGetContainerMetrics(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, nil, defaultResponseTimeout, 1, nil) + client := NewClient(u, defaultResponseTimeout, 1, nil) m, err := client.GetContainerMetrics(ctx, "foo", "bar") require.Equal(t, tt.expectedError, err) diff --git a/plugins/inputs/dcos/creds.go b/plugins/inputs/dcos/creds.go new file mode 100644 index 0000000000000..95cbaaaa15188 --- /dev/null +++ b/plugins/inputs/dcos/creds.go @@ -0,0 +1,75 @@ +package dcos + +import ( + "context" + "crypto/rsa" + "fmt" + "io/ioutil" + "strings" + "time" + "unicode/utf8" +) + +const ( + // How long to stayed logged in for + loginDuration = 65 * time.Minute + + // How long before expiration to renew token + relogDuration = 5 * time.Minute +) + +type Credentials interface { + Token(ctx context.Context, client Client) (string, error) + IsExpired() bool +} + +type ServiceAccount struct { + AccountID string + PrivateKey *rsa.PrivateKey + + auth *AuthToken +} + +type TokenCreds struct { + Path string +} + +type NullCreds struct { +} + +func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, error) { + auth, err := client.Login(ctx, c) + if err != nil { + return "", err + } + c.auth = auth + return auth.text, nil +} + +func (c *ServiceAccount) IsExpired() bool { + return c.auth.text != "" || c.auth.expire.Add(relogDuration).After(time.Now()) +} + +func (c *TokenCreds) Token(ctx context.Context, client Client) (string, error) { + octets, err := ioutil.ReadFile(c.Path) + if err != nil { + return "", fmt.Errorf("Error reading token file %q: %s", c.Path, err) + } + if !utf8.Valid(octets) { + return "", fmt.Errorf("Token file does not contain utf-8 encoded text: %s", c.Path) + } + token := strings.TrimSpace(string(octets)) + return token, nil +} + +func (c *TokenCreds) IsExpired() bool { + return true +} + +func (c *NullCreds) Token(ctx context.Context, client Client) (string, error) { + return "", nil +} + +func (c *NullCreds) IsExpired() bool { + return true +} diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index 5c6b83a4534ec..2f41f8ef006f8 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -63,6 +63,7 @@ type DCOS struct { InsecureSkipVerify bool `toml:"insecure_skip_verify"` client Client + creds Credentials initialized bool nodeFilter filter.Filter @@ -102,7 +103,7 @@ var sampleConfig = ` # app_exclude = [] ## Maximum concurrent connections to the cluster. - # max_connections = 1 + # max_connections = 10 ## Maximum time to receive a response from cluster. # response_timeout = "20s" @@ -123,28 +124,18 @@ func (d *DCOS) SampleConfig() string { } func (d *DCOS) Gather(acc telegraf.Accumulator) error { - if !d.initialized { - err := d.createFilters() - if err != nil { - return err - } - - d.initialized = true - } - - if d.client == nil { - client, err := d.createClient() - if err != nil { - return err - } - d.client = client + err := d.init() + if err != nil { + return err } ctx := context.Background() - err := d.client.EnsureAuth(ctx) + + token, err := d.creds.Token(ctx, d.client) if err != nil { return err } + d.client.SetToken(token) summary, err := d.client.GetSummary(ctx) if err != nil { @@ -335,15 +326,57 @@ func (d *DCOS) addAppMetrics(acc telegraf.Accumulator, cluster string, m *Metric d.addMetrics(acc, cluster, "dcos_app", m, appDimensions) } -func (d *DCOS) createClient() (*client, error) { +func (d *DCOS) init() error { + if !d.initialized { + err := d.createFilters() + if err != nil { + return err + } + + if d.client == nil { + client, err := d.createClient() + if err != nil { + return err + } + d.client = client + } + + if d.creds == nil { + creds, err := d.createCredentials() + if err != nil { + return err + } + d.creds = creds + } + + d.initialized = true + } + return nil +} + +func (d *DCOS) createClient() (Client, error) { tlsCfg, err := internal.GetTLSConfig( d.SSLCert, d.SSLKey, d.SSLCA, d.InsecureSkipVerify) if err != nil { return nil, err } - var creds *Credentials + url, err := url.Parse(d.ClusterURL) + if err != nil { + return nil, err + } + + client := NewClient( + url, + d.ResponseTimeout.Duration, + d.MaxConnections, + tlsCfg, + ) + + return client, nil +} +func (d *DCOS) createCredentials() (Credentials, error) { if d.ServiceAccountID != "" && d.ServiceAccountPrivateKey != "" { bs, err := ioutil.ReadFile(d.ServiceAccountPrivateKey) if err != nil { @@ -355,24 +388,20 @@ func (d *DCOS) createClient() (*client, error) { return nil, err } - creds = &Credentials{d.ServiceAccountID, privateKey, ""} + creds := &ServiceAccount{ + AccountID: d.ServiceAccountID, + PrivateKey: privateKey, + } + return creds, nil } else if d.TokenFile != "" { - creds = &Credentials{"", nil, d.TokenFile} - } - - url, err := url.Parse(d.ClusterURL) - if err != nil { - return nil, err + creds := &TokenCreds{ + Path: d.TokenFile, + } + return creds, nil + } else { + creds := &NullCreds{} + return creds, nil } - - client := NewClient( - url, - creds, - d.ResponseTimeout.Duration, - d.MaxConnections, - tlsCfg, - ) - return client, nil } func (d *DCOS) createFilters() error { diff --git a/plugins/inputs/dcos/dcos_test.go b/plugins/inputs/dcos/dcos_test.go index 7b9fff2d4a9c6..419d82dd0545a 100644 --- a/plugins/inputs/dcos/dcos_test.go +++ b/plugins/inputs/dcos/dcos_test.go @@ -10,8 +10,8 @@ import ( ) type mockClient struct { - TokenF func() string - EnsureAuthF func(ctx context.Context) error + SetTokenF func(token string) + LoginF func(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) GetSummaryF func(ctx context.Context) (*Summary, error) GetContainersF func(ctx context.Context, node string) ([]string, error) GetNodeMetricsF func(ctx context.Context, node string) (*Metrics, error) @@ -19,12 +19,12 @@ type mockClient struct { GetAppMetricsF func(ctx context.Context, node, container string) (*Metrics, error) } -func (c *mockClient) Token() string { - return c.TokenF() +func (c *mockClient) SetToken(token string) { + c.SetTokenF(token) } -func (c *mockClient) EnsureAuth(ctx context.Context) error { - return c.EnsureAuthF(ctx) +func (c *mockClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) { + return c.LoginF(ctx, sa) } func (c *mockClient) GetSummary(ctx context.Context) (*Summary, error) { @@ -362,9 +362,7 @@ func TestGatherFilterNode(t *testing.T) { { name: "cluster without nodes has no metrics", client: &mockClient{ - EnsureAuthF: func(ctx context.Context) error { - return nil - }, + SetTokenF: func(token string) {}, GetSummaryF: func(ctx context.Context) (*Summary, error) { return &Summary{ Cluster: "a", @@ -382,9 +380,7 @@ func TestGatherFilterNode(t *testing.T) { name: "node include", nodeInclude: []string{"x"}, client: &mockClient{ - EnsureAuthF: func(ctx context.Context) error { - return nil - }, + SetTokenF: func(token string) {}, GetSummaryF: func(ctx context.Context) (*Summary, error) { return &Summary{ Cluster: "a", From 767e2f5644b4c6c466b1bc9f987870cbb66da6fd Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 00:41:21 -0800 Subject: [PATCH 05/15] Improve Login type handling --- plugins/inputs/dcos/client.go | 69 +++++++++++++++++++++++++---------- plugins/inputs/dcos/creds.go | 3 -- 2 files changed, 49 insertions(+), 23 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index c9d03edabd129..0f5e23b41ffdc 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -13,6 +13,11 @@ import ( jwt "github.com/dgrijalva/jwt-go" ) +const ( + // How long to stayed logged in for + loginDuration = 65 * time.Minute +) + type Client interface { SetToken(token string) @@ -31,9 +36,18 @@ type APIError struct { } type Login struct { - Token string - Title string - Description string + UID string `json:"uid"` + Exp int64 `json:"exp"` + Token string `json:"token"` +} + +type LoginError struct { + Title string `json:"title"` + Description string `json:"description"` +} + +type LoginAuth struct { + Token string `json:"token"` } type Slave struct { @@ -66,7 +80,7 @@ type client struct { } type claims struct { - Uid string `json:"uid"` + UID string `json:"uid"` jwt.StandardClaims } @@ -117,10 +131,10 @@ func (c *client) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, err exp := time.Now().Add(loginDuration) - body := map[string]interface{}{ - "uid": sa.AccountID, - "exp": exp.Unix(), - "token": token, + body := &Login{ + UID: sa.AccountID, + Exp: exp.Unix(), + Token: token, } octets, err := json.Marshal(body) @@ -141,23 +155,38 @@ func (c *client) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, err } defer resp.Body.Close() - login := Login{} + if resp.StatusCode == http.StatusOK { + auth := &LoginAuth{} + dec := json.NewDecoder(resp.Body) + err = dec.Decode(auth) + if err != nil { + return nil, err + } + + token := &AuthToken{ + text: auth.Token, + expire: exp, + } + return token, nil + } + + loginError := &LoginError{} dec := json.NewDecoder(resp.Body) - err = dec.Decode(&login) + err = dec.Decode(loginError) if err != nil { + err := &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + } return nil, err } - if resp.StatusCode != 200 || login.Token == "" { - return nil, &APIError{resp.StatusCode, login.Title, login.Description} - } - - authToken := &AuthToken{ - text: login.Token, - expire: exp, + err = &APIError{ + StatusCode: resp.StatusCode, + Title: loginError.Title, + Description: loginError.Description, } - - return authToken, err + return nil, err } func (c *client) GetSummary(ctx context.Context) (*Summary, error) { @@ -283,7 +312,7 @@ func (c *client) url(path string) string { func (c *client) createLoginToken(sa *ServiceAccount) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ - Uid: sa.AccountID, + UID: sa.AccountID, StandardClaims: jwt.StandardClaims{ ExpiresAt: 0, }, diff --git a/plugins/inputs/dcos/creds.go b/plugins/inputs/dcos/creds.go index 95cbaaaa15188..c118bed32bad2 100644 --- a/plugins/inputs/dcos/creds.go +++ b/plugins/inputs/dcos/creds.go @@ -11,9 +11,6 @@ import ( ) const ( - // How long to stayed logged in for - loginDuration = 65 * time.Minute - // How long before expiration to renew token relogDuration = 5 * time.Minute ) From a1a160c8cf354c059f3b4e0fb8737e60149560b9 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 00:52:21 -0800 Subject: [PATCH 06/15] Add Container struct --- plugins/inputs/dcos/client.go | 18 ++++++++++++++---- plugins/inputs/dcos/dcos.go | 8 ++++---- plugins/inputs/dcos/dcos_test.go | 8 ++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index 0f5e23b41ffdc..9c565c603d418 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -23,7 +23,7 @@ type Client interface { Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) GetSummary(ctx context.Context) (*Summary, error) - GetContainers(ctx context.Context, node string) ([]string, error) + GetContainers(ctx context.Context, node string) ([]Container, error) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) @@ -59,6 +59,10 @@ type Summary struct { Slaves []Slave } +type Container struct { + ID string +} + type DataPoint struct { Name string Tags map[string]string @@ -199,15 +203,21 @@ func (c *client) GetSummary(ctx context.Context) (*Summary, error) { return summary, nil } -func (c *client) GetContainers(ctx context.Context, node string) ([]string, error) { - containers := []string{} +func (c *client) GetContainers(ctx context.Context, node string) ([]Container, error) { + list := []string{} path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node) - err := c.doGet(ctx, c.url(path), &containers) + err := c.doGet(ctx, c.url(path), &list) if err != nil { return nil, err } + containers := make([]Container, 0, len(list)) + for _, c := range list { + containers = append(containers, Container{ID: c}) + + } + return containers, nil } diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index 2f41f8ef006f8..6ec38004f3b3a 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -185,7 +185,7 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c var wg sync.WaitGroup for _, container := range containers { - if d.containerFilter.Match(container) { + if d.containerFilter.Match(container.ID) { wg.Add(1) go func(container string) { defer wg.Done() @@ -198,10 +198,10 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c return } d.addContainerMetrics(acc, cluster, m) - }(container) + }(container.ID) } - if d.appFilter.Match(container) { + if d.appFilter.Match(container.ID) { wg.Add(1) go func(container string) { defer wg.Done() @@ -214,7 +214,7 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c return } d.addAppMetrics(acc, cluster, m) - }(container) + }(container.ID) } } wg.Wait() diff --git a/plugins/inputs/dcos/dcos_test.go b/plugins/inputs/dcos/dcos_test.go index 419d82dd0545a..6a76f7b6443e1 100644 --- a/plugins/inputs/dcos/dcos_test.go +++ b/plugins/inputs/dcos/dcos_test.go @@ -13,7 +13,7 @@ type mockClient struct { SetTokenF func(token string) LoginF func(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) GetSummaryF func(ctx context.Context) (*Summary, error) - GetContainersF func(ctx context.Context, node string) ([]string, error) + GetContainersF func(ctx context.Context, node string) ([]Container, error) GetNodeMetricsF func(ctx context.Context, node string) (*Metrics, error) GetContainerMetricsF func(ctx context.Context, node, container string) (*Metrics, error) GetAppMetricsF func(ctx context.Context, node, container string) (*Metrics, error) @@ -31,7 +31,7 @@ func (c *mockClient) GetSummary(ctx context.Context) (*Summary, error) { return c.GetSummaryF(ctx) } -func (c *mockClient) GetContainers(ctx context.Context, node string) ([]string, error) { +func (c *mockClient) GetContainers(ctx context.Context, node string) ([]Container, error) { return c.GetContainersF(ctx, node) } @@ -390,8 +390,8 @@ func TestGatherFilterNode(t *testing.T) { }, }, nil }, - GetContainersF: func(ctx context.Context, node string) ([]string, error) { - return []string{}, nil + GetContainersF: func(ctx context.Context, node string) ([]Container, error) { + return []Container{}, nil }, GetNodeMetricsF: func(ctx context.Context, node string) (*Metrics, error) { return &Metrics{ From 8a190a08f62ca37174fb8835704b2baafd95dfd4 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 00:56:04 -0800 Subject: [PATCH 07/15] Release semaphore after closing response body --- plugins/inputs/dcos/client.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index 9c565c603d418..a85a3f121b027 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -289,28 +289,28 @@ func (c *client) doGet(ctx context.Context, url string, v interface{}) error { <-c.semaphore return err } - defer resp.Body.Close() + defer func() { + resp.Body.Close() + <-c.semaphore + }() // Clear invalid token if unauthorized - if resp.StatusCode == 401 { + if resp.StatusCode == http.StatusUnauthorized { c.token = "" } if resp.StatusCode < 200 || resp.StatusCode >= 300 { - <-c.semaphore return &APIError{ StatusCode: resp.StatusCode, Title: resp.Status, } } - if resp.StatusCode == 204 { - <-c.semaphore + if resp.StatusCode == http.StatusNoContent { return nil } err = json.NewDecoder(resp.Body).Decode(v) - <-c.semaphore return err } From 8b14b6276ba39860a9b2fbbc85724f252d79418c Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 01:07:33 -0800 Subject: [PATCH 08/15] Allocate capacity when possible --- plugins/inputs/dcos/dcos.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index 6ec38004f3b3a..c7980f994867d 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -240,7 +240,7 @@ func (d *DCOS) createPoints(acc telegraf.Accumulator, m *Metrics) []*point { fieldKey = fieldKey + "_bytes" } - tagset := []string{} + tagset := make([]string, 0, len(tags)) for k, v := range tags { tagset = append(tagset, k+"="+v) } @@ -265,7 +265,7 @@ func (d *DCOS) createPoints(acc telegraf.Accumulator, m *Metrics) []*point { } } - results := []*point{} + results := make([]*point, 0, len(points)) for _, p := range points { for k, v := range m.Dimensions { switch v := v.(type) { From 7da586abcc95cdcd5d1fbf6fd2609502e3a2afd8 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 01:10:35 -0800 Subject: [PATCH 09/15] Strip dcos_metrics_module_ earlier --- plugins/inputs/dcos/dcos.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index c7980f994867d..451ef0649b5a1 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -240,6 +240,10 @@ func (d *DCOS) createPoints(acc telegraf.Accumulator, m *Metrics) []*point { fieldKey = fieldKey + "_bytes" } + if strings.HasPrefix(fieldKey, "dcos_metrics_module_") { + fieldKey = strings.TrimPrefix(fieldKey, "dcos_metrics_module_") + } + tagset := make([]string, 0, len(tags)) for k, v := range tags { tagset = append(tagset, k+"="+v) @@ -302,15 +306,7 @@ func (d *DCOS) addMetrics(acc telegraf.Accumulator, cluster, mname string, m *Me tags[k] = v } - fields := make(map[string]interface{}) - for k, v := range p.fields { - if strings.HasPrefix(k, "dcos_metrics_module_") { - k = strings.TrimPrefix(k, "dcos_metrics_module_") - } - fields[k] = v - } - - acc.AddFields(mname, fields, tags, tm) + acc.AddFields(mname, p.fields, tags, tm) } } From 9d6019446bcfa637964da16f6ebe16dc173c5f12 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 01:14:04 -0800 Subject: [PATCH 10/15] Add getMetrics helper function --- plugins/inputs/dcos/client.go | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index a85a3f121b027..6ff2e0c2f78aa 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -221,11 +221,10 @@ func (c *client) GetContainers(ctx context.Context, node string) ([]Container, e return containers, nil } -func (c *client) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { +func (c *client) getMetrics(ctx context.Context, url string) (*Metrics, error) { metrics := &Metrics{} - path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) - err := c.doGet(ctx, c.url(path), metrics) + err := c.doGet(ctx, url, metrics) if err != nil { return nil, err } @@ -233,28 +232,19 @@ func (c *client) GetNodeMetrics(ctx context.Context, node string) (*Metrics, err return metrics, nil } -func (c *client) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { - metrics := &Metrics{} +func (c *client) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) + return c.getMetrics(ctx, c.url(path)) +} +func (c *client) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container) - err := c.doGet(ctx, c.url(path), metrics) - if err != nil { - return nil, err - } - - return metrics, nil + return c.getMetrics(ctx, c.url(path)) } func (c *client) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { - metrics := &Metrics{} - path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container) - err := c.doGet(ctx, c.url(path), metrics) - if err != nil { - return nil, err - } - - return metrics, nil + return c.getMetrics(ctx, c.url(path)) } func createGetRequest(url string, token string) (*http.Request, error) { From a90b83877faf79d7b75a85d144cee06dff526bc9 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 01:22:39 -0800 Subject: [PATCH 11/15] Create login token for limited time --- plugins/inputs/dcos/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index 6ff2e0c2f78aa..c25595a0b3058 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -314,7 +314,8 @@ func (c *client) createLoginToken(sa *ServiceAccount) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ UID: sa.AccountID, StandardClaims: jwt.StandardClaims{ - ExpiresAt: 0, + // How long we have to login with this token + ExpiresAt: int64(5 * time.Minute / time.Second), }, }) return token.SignedString(sa.PrivateKey) From 95101371409de0a26b9ca6099dd065cbaebbcff4 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 01:26:44 -0800 Subject: [PATCH 12/15] Make client public and rename ClusterClient --- plugins/inputs/dcos/client.go | 40 +++++++++++++++--------------- plugins/inputs/dcos/client_test.go | 8 +++--- plugins/inputs/dcos/dcos.go | 2 +- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index c25595a0b3058..e181fc9e6f456 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -75,7 +75,12 @@ type Metrics struct { Dimensions map[string]interface{} } -type client struct { +type AuthToken struct { + text string + expire time.Time +} + +type ClusterClient struct { clusterURL *url.URL httpClient *http.Client credentials *Credentials @@ -88,11 +93,6 @@ type claims struct { jwt.StandardClaims } -type AuthToken struct { - text string - expire time.Time -} - func (e APIError) Error() string { if e.Description != "" { return fmt.Sprintf("%s: %s", e.Title, e.Description) @@ -100,12 +100,12 @@ func (e APIError) Error() string { return e.Title } -func NewClient( +func NewClusterClient( clusterURL *url.URL, timeout time.Duration, maxConns int, tlsConfig *tls.Config, -) *client { +) *ClusterClient { httpClient := &http.Client{ Transport: &http.Transport{ MaxIdleConns: maxConns, @@ -115,7 +115,7 @@ func NewClient( } semaphore := make(chan struct{}, maxConns) - c := &client{ + c := &ClusterClient{ clusterURL: clusterURL, httpClient: httpClient, semaphore: semaphore, @@ -123,11 +123,11 @@ func NewClient( return c } -func (c *client) SetToken(token string) { +func (c *ClusterClient) SetToken(token string) { c.token = token } -func (c *client) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) { +func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) { token, err := c.createLoginToken(sa) if err != nil { return nil, err @@ -193,7 +193,7 @@ func (c *client) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, err return nil, err } -func (c *client) GetSummary(ctx context.Context) (*Summary, error) { +func (c *ClusterClient) GetSummary(ctx context.Context) (*Summary, error) { summary := &Summary{} err := c.doGet(ctx, c.url("/mesos/master/state-summary"), summary) if err != nil { @@ -203,7 +203,7 @@ func (c *client) GetSummary(ctx context.Context) (*Summary, error) { return summary, nil } -func (c *client) GetContainers(ctx context.Context, node string) ([]Container, error) { +func (c *ClusterClient) GetContainers(ctx context.Context, node string) ([]Container, error) { list := []string{} path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node) @@ -221,7 +221,7 @@ func (c *client) GetContainers(ctx context.Context, node string) ([]Container, e return containers, nil } -func (c *client) getMetrics(ctx context.Context, url string) (*Metrics, error) { +func (c *ClusterClient) getMetrics(ctx context.Context, url string) (*Metrics, error) { metrics := &Metrics{} err := c.doGet(ctx, url, metrics) @@ -232,17 +232,17 @@ func (c *client) getMetrics(ctx context.Context, url string) (*Metrics, error) { return metrics, nil } -func (c *client) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { +func (c *ClusterClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) return c.getMetrics(ctx, c.url(path)) } -func (c *client) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { +func (c *ClusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container) return c.getMetrics(ctx, c.url(path)) } -func (c *client) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { +func (c *ClusterClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container) return c.getMetrics(ctx, c.url(path)) } @@ -261,7 +261,7 @@ func createGetRequest(url string, token string) (*http.Request, error) { return req, nil } -func (c *client) doGet(ctx context.Context, url string, v interface{}) error { +func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) error { req, err := createGetRequest(url, c.token) if err != nil { return err @@ -304,13 +304,13 @@ func (c *client) doGet(ctx context.Context, url string, v interface{}) error { return err } -func (c *client) url(path string) string { +func (c *ClusterClient) url(path string) string { url := c.clusterURL url.Path = path return url.String() } -func (c *client) createLoginToken(sa *ServiceAccount) (string, error) { +func (c *ClusterClient) createLoginToken(sa *ServiceAccount) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ UID: sa.AccountID, StandardClaims: jwt.StandardClaims{ diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go index 9a86a7b8ea2a4..f928a0affeee9 100644 --- a/plugins/inputs/dcos/client_test.go +++ b/plugins/inputs/dcos/client_test.go @@ -72,7 +72,7 @@ func TestLogin(t *testing.T) { AccountID: "telegraf", PrivateKey: key, } - client := NewClient(u, defaultResponseTimeout, 1, nil) + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) auth, err := client.Login(ctx, sa) require.Equal(t, tt.expectedError, err) @@ -137,7 +137,7 @@ func TestGetSummary(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, defaultResponseTimeout, 1, nil) + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) summary, err := client.GetSummary(ctx) require.Equal(t, tt.expectedError, err) @@ -178,7 +178,7 @@ func TestGetNodeMetrics(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, defaultResponseTimeout, 1, nil) + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) m, err := client.GetNodeMetrics(ctx, "foo") require.Equal(t, tt.expectedError, err) @@ -219,7 +219,7 @@ func TestGetContainerMetrics(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClient(u, defaultResponseTimeout, 1, nil) + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) m, err := client.GetContainerMetrics(ctx, "foo", "bar") require.Equal(t, tt.expectedError, err) diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index 451ef0649b5a1..91370b81f81fc 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -362,7 +362,7 @@ func (d *DCOS) createClient() (Client, error) { return nil, err } - client := NewClient( + client := NewClusterClient( url, d.ResponseTimeout.Duration, d.MaxConnections, From 13390d03cf2abc4b2140b90c22282bc54ca4b078 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 01:29:55 -0800 Subject: [PATCH 13/15] Make AuthToken values public --- plugins/inputs/dcos/client.go | 16 ++++++++-------- plugins/inputs/dcos/client_test.go | 2 +- plugins/inputs/dcos/creds.go | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index e181fc9e6f456..4977939b48722 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -64,10 +64,10 @@ type Container struct { } type DataPoint struct { - Name string - Tags map[string]string - Unit string - Value float64 + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Unit string `json:"unit"` + Value float64 `json:"value"` } type Metrics struct { @@ -76,8 +76,8 @@ type Metrics struct { } type AuthToken struct { - text string - expire time.Time + Text string + Expire time.Time } type ClusterClient struct { @@ -168,8 +168,8 @@ func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthTok } token := &AuthToken{ - text: auth.Token, - expire: exp, + Text: auth.Token, + Expire: exp, } return token, nil } diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go index f928a0affeee9..2781d10b78f41 100644 --- a/plugins/inputs/dcos/client_test.go +++ b/plugins/inputs/dcos/client_test.go @@ -78,7 +78,7 @@ func TestLogin(t *testing.T) { require.Equal(t, tt.expectedError, err) if tt.expectedToken != "" { - require.Equal(t, tt.expectedToken, auth.text) + require.Equal(t, tt.expectedToken, auth.Text) } else { require.Nil(t, auth) } diff --git a/plugins/inputs/dcos/creds.go b/plugins/inputs/dcos/creds.go index c118bed32bad2..0178315bb7076 100644 --- a/plugins/inputs/dcos/creds.go +++ b/plugins/inputs/dcos/creds.go @@ -40,11 +40,11 @@ func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, erro return "", err } c.auth = auth - return auth.text, nil + return auth.Text, nil } func (c *ServiceAccount) IsExpired() bool { - return c.auth.text != "" || c.auth.expire.Add(relogDuration).After(time.Now()) + return c.auth.Text != "" || c.auth.Expire.Add(relogDuration).After(time.Now()) } func (c *TokenCreds) Token(ctx context.Context, client Client) (string, error) { From b1d4fca1296d5c529aff57f327082a9f5d9eae0d Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 10:24:38 -0800 Subject: [PATCH 14/15] Add more details about limiting series --- plugins/inputs/dcos/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/dcos/README.md b/plugins/inputs/dcos/README.md index ace068c7271fb..a1384402fb019 100644 --- a/plugins/inputs/dcos/README.md +++ b/plugins/inputs/dcos/README.md @@ -10,7 +10,7 @@ your database. - Use [measurement filtering](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#measurement-filtering) liberally to exclude unneeded metrics as well as the node, container, and app inclue/exclude options. - Write to a database with an appropriate [retention policy](https://docs.influxdata.com/influxdb/v1.3/concepts/glossary/#retention-policy-rp). -- Limit the number of series allowed in your database. +- Limit the number of series allowed in your database using the `max-series-per-database` and `max-values-per-tag` settings. - Consider enabling the [TSI](https://docs.influxdata.com/influxdb/v1.3/about_the_project/releasenotes-changelog/#release-notes-8) engine. - Monitor your [series cardinality](https://docs.influxdata.com/influxdb/v1.3/troubleshooting/frequently-asked-questions/#how-can-i-query-for-series-cardinality). From 839c625027943bef7a18107e2e7633c0ea3bffe0 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 Nov 2017 10:42:01 -0800 Subject: [PATCH 15/15] Add some comments --- plugins/inputs/dcos/client.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index 4977939b48722..71165e9fbffd7 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -18,6 +18,7 @@ const ( loginDuration = 65 * time.Minute ) +// Client is an interface for communicating with the DC/OS API. type Client interface { SetToken(token string) @@ -35,30 +36,36 @@ type APIError struct { Description string } +// Login is request data for logging in. type Login struct { UID string `json:"uid"` Exp int64 `json:"exp"` Token string `json:"token"` } +// LoginError is the response when login fails. type LoginError struct { Title string `json:"title"` Description string `json:"description"` } +// LoginAuth is the response to a successful login. type LoginAuth struct { Token string `json:"token"` } +// Slave is a node in the cluster. type Slave struct { ID string `json:"id"` } +// Summary provides high level cluster wide information. type Summary struct { Cluster string Slaves []Slave } +// Container is a container on a node. type Container struct { ID string } @@ -70,16 +77,19 @@ type DataPoint struct { Value float64 `json:"value"` } +// Metrics are the DCOS metrics type Metrics struct { - Datapoints []DataPoint - Dimensions map[string]interface{} + Datapoints []DataPoint `json:"datapoints"` + Dimensions map[string]interface{} `json:"dimensions"` } +// AuthToken is the authentication token. type AuthToken struct { Text string Expire time.Time } +// ClusterClient is a Client that uses the cluster URL. type ClusterClient struct { clusterURL *url.URL httpClient *http.Client