Skip to content

Commit

Permalink
Refactor the docker plugin, use go-dockerclient throughout
Browse files Browse the repository at this point in the history
fixes #503
fixes #463
  • Loading branch information
sparrc committed Jan 21, 2016
1 parent e0dc1ef commit 96ddd1f
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 283 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
_ "github.com/influxdata/telegraf/plugins/inputs/docker"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
Expand Down
299 changes: 299 additions & 0 deletions plugins/inputs/docker/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package system

import (
"fmt"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf/plugins/inputs"

dc "github.com/fsouza/go-dockerclient"
)

type Docker struct {
Endpoint string
ContainerNames []string

client *dc.Client
}

var sampleConfig = `
# Docker Endpoint
# To use on OSX, set endpoint = "ENV" to use docker-machine env variables
endpoint = "unix:///var/run/docker.sock"
# Only collect metrics for these containers, collect all if empty
container_names = []
`

func (d *Docker) Description() string {
return "Read metrics about docker containers"
}

func (d *Docker) SampleConfig() string { return sampleConfig }

func (d *Docker) Gather(acc inputs.Accumulator) error {
if d.client == nil {
var c *dc.Client
var err error
if d.Endpoint == "ENV" {
c, err = dc.NewClientFromEnv()
if err != nil {
return err
}
} else if d.Endpoint == "" {
c, err = dc.NewClient("unix:///var/run/docker.sock")
if err != nil {
return err
}
} else {
c, err = dc.NewClient(d.Endpoint)
if err != nil {
return err
}
}
d.client = c
}

opts := dc.ListContainersOptions{}
containers, err := d.client.ListContainers(opts)
if err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(len(containers))
for _, container := range containers {
go func(c dc.APIContainers) {
defer wg.Done()
err := d.gatherContainerStats(c, acc)
if err != nil {
fmt.Println(err.Error())
}
}(container)
}
wg.Wait()

return nil
}

func (d *Docker) gatherContainerStats(
container dc.APIContainers,
acc inputs.Accumulator,
) error {
// Parse container name
cname := "unknown"
if len(container.Names) > 0 {
// Not sure what to do with other names, just take the first.
cname = strings.TrimPrefix(container.Names[0], "/")
}

tags := map[string]string{
"id": container.ID,
"container_name": cname,
}
if len(d.ContainerNames) > 0 {
if !sliceContains(cname, container.Names) {
return nil
}
}

//now := time.Now()
statChan := make(chan *dc.Stats)
done := make(chan bool)
statOpts := dc.StatsOptions{
Stream: false,
ID: container.ID,
Stats: statChan,
Done: done,
Timeout: time.Duration(time.Second * 5),
}

var err error
go func() {
err = d.client.Stats(statOpts)
}()

stat := <-statChan
if err != nil {
return err
}

now := stat.Read

// Add labels to tags
for k, v := range container.Labels {
tags[k] = v
}

memfields := map[string]interface{}{
"max_usage": stat.MemoryStats.MaxUsage,
"usage": stat.MemoryStats.Usage,
"fail_count": stat.MemoryStats.Failcnt,
"limit": stat.MemoryStats.Limit,
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault,
"cache": stat.MemoryStats.Stats.Cache,
"mapped_file": stat.MemoryStats.Stats.MappedFile,
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile,
"pgpgout": stat.MemoryStats.Stats.Pgpgout,
"rss": stat.MemoryStats.Stats.Rss,
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile,
"writeback": stat.MemoryStats.Stats.Writeback,
"unevictable": stat.MemoryStats.Stats.Unevictable,
"pgpgin": stat.MemoryStats.Stats.Pgpgin,
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable,
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault,
"total_rss": stat.MemoryStats.Stats.TotalRss,
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge,
"total_writeback": stat.MemoryStats.Stats.TotalWriteback,
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon,
"rss_huge": stat.MemoryStats.Stats.RssHuge,
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit,
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault,
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile,
"active_anon": stat.MemoryStats.Stats.ActiveAnon,
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon,
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout,
"total_cache": stat.MemoryStats.Stats.TotalCache,
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon,
"active_file": stat.MemoryStats.Stats.ActiveFile,
"pgfault": stat.MemoryStats.Stats.Pgfault,
"inactive_file": stat.MemoryStats.Stats.InactiveFile,
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin,
}
acc.AddFields("docker_mem", memfields, tags, now)

cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemCPUUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
}
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_cpu", cpufields, cputags, now)

for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage {
percputags := copyTags(tags)
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now)
}

for network, netstats := range stat.Networks {
netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped,
"rx_bytes": netstats.RxBytes,
"rx_errors": netstats.RxErrors,
"tx_packets": netstats.TxPackets,
"tx_dropped": netstats.TxDropped,
"rx_packets": netstats.RxPackets,
"tx_errors": netstats.TxErrors,
"tx_bytes": netstats.TxBytes,
}
// Create a new network tag dictionary for the "network" tag
nettags := copyTags(tags)
nettags["network"] = network
acc.AddFields("docker_net", netfields, nettags, now)
}

gatherBlockIOMetrics(stat, acc, tags, now)

return nil
}

func gatherBlockIOMetrics(
stat *dc.Stats,
acc inputs.Accumulator,
tags map[string]string,
now time.Time,
) error {
blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats
deviceStatMap := make(map[string]map[string]interface{})

for _, metric := range blkioStats.IOServiceBytesRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}

field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOServicedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOQueueRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOServiceTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOWaitTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOMergedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.IOTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for _, metric := range blkioStats.SectorsRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}

for device, fields := range deviceStatMap {
iotags := copyTags(tags)
iotags["device"] = device
acc.AddFields("docker_blkio", fields, iotags, now)
}
return nil
}

func copyTags(in map[string]string) map[string]string {
out := make(map[string]string)
for k, v := range in {
out[k] = v
}
return out
}

func sliceContains(in string, sl []string) bool {
for _, str := range sl {
if str == in {
return true
}
}
return false
}

func init() {
inputs.Add("docker", func() inputs.Input {
return &Docker{}
})
}
Loading

0 comments on commit 96ddd1f

Please sign in to comment.