Skip to content

Commit

Permalink
Refactor the docker plugin, use go-dockerclient throughout
Browse files Browse the repository at this point in the history
fixes #503
fixes #463
  • Loading branch information
sparrc committed Jan 21, 2016
1 parent 6647cfc commit 5385da4
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 283 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
_ "github.com/influxdata/telegraf/plugins/inputs/docker"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
Expand Down
191 changes: 191 additions & 0 deletions plugins/inputs/docker/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package system

import (
"fmt"
"sync"
"time"

"github.com/influxdata/telegraf/plugins/inputs"

dc "github.com/fsouza/go-dockerclient"
)

type Docker struct {
Endpoint string
ContainerNames []string
ContainerIDs []string `toml:"container_ids"`

client *dc.Client
}

var sampleConfig = `
# Docker Endpoint
# To use on OSX, set endpoint = "ENV" to use docker-machine env variables
endpoint = "unix:///var/run/docker.sock"
# Filter to specific container names, if empty gather all
container_names = []
# Filter to specific container IDs, if empty gather all
container_ids = []
`

func (d *Docker) Description() string {
return "Read metrics about docker containers"
}

func (d *Docker) SampleConfig() string { return sampleConfig }

func (d *Docker) Gather(acc inputs.Accumulator) error {
if d.client == nil {
var c *dc.Client
var err error
if d.Endpoint == "ENV" {
c, err = dc.NewClientFromEnv()
if err != nil {
return err
}
} else if d.Endpoint == "" {
c, err = dc.NewClient("unix:///var/run/docker.sock")
if err != nil {
return err
}
} else {
c, err = dc.NewClient(d.Endpoint)
if err != nil {
return err
}
}
d.client = c
}

opts := dc.ListContainersOptions{}
containers, err := d.client.ListContainers(opts)
if err != nil {
return err
}

var wg sync.WaitGroup
for _, container := range containers {
go func(incontainer dc.APIContainers) {
wg.Add(1)
defer wg.Done()
err := d.gatherContainerStats(incontainer, acc)
if err != nil {
fmt.Println(err.Error())
}
}(container)
}
wg.Wait()

return nil
}

func (d *Docker) gatherContainerStats(
container dc.APIContainers,
acc inputs.Accumulator,
) error {
fmt.Println(container.Names)
statChan := make(chan *dc.Stats)
done := make(chan bool)
statOpts := dc.StatsOptions{
Stream: false,
ID: container.ID,
Stats: statChan,
Done: done,
Timeout: time.Duration(time.Second * 5),
}

var err error
go func() {
err = d.client.Stats(statOpts)
}()

stat := <-statChan
if err != nil {
return err
}

tags := map[string]string{
"id": container.ID,
"name": container.Names[0],
"command": container.Command,
}
for k, v := range container.Labels {
tags[k] = v
}

memfields := map[string]interface{}{
"max_usage": stat.MemoryStats.MaxUsage,
"usage": stat.MemoryStats.Usage,
"fail_count": stat.MemoryStats.Failcnt,
"limit": stat.MemoryStats.Limit,
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault,
"cache": stat.MemoryStats.Stats.Cache,
"mapped_file": stat.MemoryStats.Stats.MappedFile,
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile,
"pgpgout": stat.MemoryStats.Stats.Pgpgout,
"rss": stat.MemoryStats.Stats.Rss,
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile,
"writeback": stat.MemoryStats.Stats.Writeback,
"unevictable": stat.MemoryStats.Stats.Unevictable,
"pgpgin": stat.MemoryStats.Stats.Pgpgin,
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable,
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault,
"total_rss": stat.MemoryStats.Stats.TotalRss,
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge,
"total_writeback": stat.MemoryStats.Stats.TotalWriteback,
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon,
"rss_huge": stat.MemoryStats.Stats.RssHuge,
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit,
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault,
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile,
"active_anon": stat.MemoryStats.Stats.ActiveAnon,
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon,
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout,
"total_cache": stat.MemoryStats.Stats.TotalCache,
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon,
"active_file": stat.MemoryStats.Stats.ActiveFile,
"pgfault": stat.MemoryStats.Stats.Pgfault,
"inactive_file": stat.MemoryStats.Stats.InactiveFile,
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin,
}
acc.AddFields("docker_mem", memfields, tags)

cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemCPUUsage,
"periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_time": stat.CPUStats.ThrottlingData.ThrottledTime,
}
acc.AddFields("docker_cpu", cpufields, tags)

for network, netstats := range stat.Networks {
netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped,
"rx_bytes": netstats.RxBytes,
"rx_errors": netstats.RxErrors,
"tx_packets": netstats.TxPackets,
"tx_dropped": netstats.TxDropped,
"rx_packets": netstats.RxPackets,
"tx_errors": netstats.TxErrors,
"tx_bytes": netstats.TxBytes,
}
// Create a new network tag dictionary for the "network" tag
nettags := make(map[string]string)
for k, v := range tags {
nettags[k] = v
}
nettags["network"] = network
acc.AddFields("docker_net", netfields, nettags)
}

return nil
}

func init() {
inputs.Add("docker", func() inputs.Input {
return &Docker{}
})
}
128 changes: 128 additions & 0 deletions plugins/inputs/docker/docker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// +build linux

package system

// import (
// "testing"

// "github.com/influxdata/telegraf/testutil"
// "github.com/shirou/gopsutil/cpu"
// "github.com/shirou/gopsutil/docker"

// "github.com/stretchr/testify/require"
// )

// func (m *MockPS) DockerStat() ([]*DockerContainerStat, error) {
// ret := m.Called()

// r0 := ret.Get(0).([]*DockerContainerStat)
// r1 := ret.Error(1)

// return r0, r1
// }

// func TestDockerStats_GenerateStats(t *testing.T) {
// var mps MockPS
// var acc testutil.Accumulator

// ds := &DockerContainerStat{
// Name: "blah",
// CPU: &cpu.CPUTimesStat{
// CPU: "all",
// User: 3.1,
// System: 8.2,
// Idle: 80.1,
// Nice: 1.3,
// Iowait: 0.2,
// Irq: 0.1,
// Softirq: 0.11,
// Steal: 0.0001,
// Guest: 8.1,
// GuestNice: 0.324,
// },
// Mem: &docker.CgroupMemStat{
// ContainerID: "blah",
// Cache: 1,
// RSS: 2,
// RSSHuge: 3,
// MappedFile: 4,
// Pgpgin: 5,
// Pgpgout: 6,
// Pgfault: 7,
// Pgmajfault: 8,
// InactiveAnon: 9,
// ActiveAnon: 10,
// InactiveFile: 11,
// ActiveFile: 12,
// Unevictable: 13,
// HierarchicalMemoryLimit: 14,
// TotalCache: 15,
// TotalRSS: 16,
// TotalRSSHuge: 17,
// TotalMappedFile: 18,
// TotalPgpgIn: 19,
// TotalPgpgOut: 20,
// TotalPgFault: 21,
// TotalPgMajFault: 22,
// TotalInactiveAnon: 23,
// TotalActiveAnon: 24,
// TotalInactiveFile: 25,
// TotalActiveFile: 26,
// TotalUnevictable: 27,
// },
// }

// mps.On("DockerStat").Return([]*DockerContainerStat{ds}, nil)

// err := (&DockerStats{&mps}).Gather(&acc)
// require.NoError(t, err)

// dockertags := map[string]string{
// "name": "blah",
// "id": "",
// "command": "",
// }

// fields := map[string]interface{}{
// "user": 3.1,
// "system": 8.2,
// "idle": 80.1,
// "nice": 1.3,
// "iowait": 0.2,
// "irq": 0.1,
// "softirq": 0.11,
// "steal": 0.0001,
// "guest": 8.1,
// "guest_nice": 0.324,

// "cache": uint64(1),
// "rss": uint64(2),
// "rss_huge": uint64(3),
// "mapped_file": uint64(4),
// "swap_in": uint64(5),
// "swap_out": uint64(6),
// "page_fault": uint64(7),
// "page_major_fault": uint64(8),
// "inactive_anon": uint64(9),
// "active_anon": uint64(10),
// "inactive_file": uint64(11),
// "active_file": uint64(12),
// "unevictable": uint64(13),
// "memory_limit": uint64(14),
// "total_cache": uint64(15),
// "total_rss": uint64(16),
// "total_rss_huge": uint64(17),
// "total_mapped_file": uint64(18),
// "total_swap_in": uint64(19),
// "total_swap_out": uint64(20),
// "total_page_fault": uint64(21),
// "total_page_major_fault": uint64(22),
// "total_inactive_anon": uint64(23),
// "total_active_anon": uint64(24),
// "total_inactive_file": uint64(25),
// "total_active_file": uint64(26),
// "total_unevictable": uint64(27),
// }

// acc.AssertContainsTaggedFields(t, "docker", fields, dockertags)
// }
Loading

0 comments on commit 5385da4

Please sign in to comment.