From 0c06a921e63dd68588bb722698813477feec59a5 Mon Sep 17 00:00:00 2001 From: Chris Heng Date: Mon, 25 Jan 2016 09:15:17 +0800 Subject: [PATCH] Add -wait parameter to debounce watched events --- README.md | 6 ++ cmd/docker-gen/main.go | 7 ++ config.go | 53 ++++++++- generator.go | 237 +++++++++++++++++++++++++++++------------ generator_test.go | 207 +++++++++++++++++++++++++++++++++++ 5 files changed, 441 insertions(+), 69 deletions(-) create mode 100644 generator_test.go diff --git a/README.md b/README.md index c59ddfcb..49070d0a 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,8 @@ Options: show version -watch watch for container changes + -wait + minimum (and/or maximum) duration to wait after each container change before triggering Arguments: template - path to a template to generate @@ -153,6 +155,9 @@ path to a template to generate watch = true watch for container changes +wait = "500ms:2s" +debounce changes with a min:max duration. Only applicable if watch = true + [config.NotifyContainers] Starts a notify container section @@ -180,6 +185,7 @@ watch = true template = "/etc/docker-gen/templates/nginx.tmpl" dest = "/etc/nginx/conf.d/default.conf" watch = true +wait = "500ms:2s" [config.NotifyContainers] nginx = 1 # 1 is a signal number to be sent; here SIGINT diff --git a/cmd/docker-gen/main.go b/cmd/docker-gen/main.go index a5a514c4..b5af3228 100644 --- a/cmd/docker-gen/main.go +++ b/cmd/docker-gen/main.go @@ -19,6 +19,7 @@ var ( buildVersion string version bool watch bool + wait string notifyCmd string notifyOutput bool notifySigHUPContainerID string @@ -85,6 +86,7 @@ func initFlags() { } flag.BoolVar(&version, "version", false, "show version") flag.BoolVar(&watch, "watch", false, "watch for container changes") + flag.StringVar(&wait, "wait", "", "minimum and maximum durations to wait (e.g. \"500ms:2s\") before triggering generate") flag.BoolVar(&onlyExposed, "only-exposed", false, "only include containers with exposed ports") flag.BoolVar(&onlyPublished, "only-published", false, @@ -127,10 +129,15 @@ func main() { } } } else { + w, err := dockergen.ParseWait(wait) + if err != nil { + log.Fatalf("error parsing wait interval: %s\n", err) + } config := dockergen.Config{ Template: flag.Arg(0), Dest: flag.Arg(1), Watch: watch, + Wait: w, NotifyCmd: notifyCmd, NotifyOutput: notifyOutput, NotifyContainers: make(map[string]docker.Signal), diff --git a/config.go b/config.go index ed6ac7b2..a67def2d 100644 --- a/config.go +++ b/config.go @@ -1,11 +1,18 @@ package dockergen -import "github.com/fsouza/go-dockerclient" +import ( + "errors" + "strings" + "time" + + "github.com/fsouza/go-dockerclient" +) type Config struct { Template string Dest string Watch bool + Wait *Wait NotifyCmd string NotifyOutput bool NotifyContainers map[string]docker.Signal @@ -31,3 +38,47 @@ func (c *ConfigFile) FilterWatches() ConfigFile { Config: configWithWatches, } } + +type Wait struct { + Min time.Duration + Max time.Duration +} + +func (w *Wait) UnmarshalText(text []byte) error { + wait, err := ParseWait(string(text)) + if err == nil { + w.Min, w.Max = wait.Min, wait.Max + } + return err +} + +func ParseWait(s string) (*Wait, error) { + if len(strings.TrimSpace(s)) < 1 { + return &Wait{0, 0}, nil + } + + parts := strings.Split(s, ":") + + var ( + min time.Duration + max time.Duration + err error + ) + min, err = time.ParseDuration(strings.TrimSpace(parts[0])) + if err != nil { + return nil, err + } + if len(parts) > 1 { + max, err = time.ParseDuration(strings.TrimSpace(parts[1])) + if err != nil { + return nil, err + } + if max < min { + return nil, errors.New("Invalid wait interval: max must be larger than min") + } + } else { + max = 4 * min + } + + return &Wait{min, max}, nil +} diff --git a/generator.go b/generator.go index 1cae3201..e1d8a5ad 100644 --- a/generator.go +++ b/generator.go @@ -21,7 +21,8 @@ type generator struct { TLSVerify bool TLSCert, TLSCaCert, TLSKey string - wg sync.WaitGroup + wg sync.WaitGroup + retry bool } type GeneratorConfig struct { @@ -62,6 +63,7 @@ func NewGenerator(gc GeneratorConfig) (*generator, error) { TLSCaCert: gc.TLSCACert, TLSKey: gc.TLSKey, Configs: gc.ConfigFile, + retry: true, }, nil } @@ -76,15 +78,13 @@ func (g *generator) Generate() error { } func (g *generator) generateFromSignals() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) - g.wg.Add(1) go func() { defer g.wg.Done() + sigChan := newSignalChannel() for { - sig := <-sigs + sig := <-sigChan log.Printf("Received signal: %s\n", sig) switch sig { case syscall.SIGHUP: @@ -124,9 +124,10 @@ func (g *generator) generateAtInterval() { log.Printf("Generating every %d seconds", config.Interval) g.wg.Add(1) ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) - quit := make(chan struct{}) go func(config Config) { defer g.wg.Done() + + sigChan := newSignalChannel() for { select { case <-ticker.C: @@ -139,9 +140,13 @@ func (g *generator) generateAtInterval() { GenerateFile(config, containers) g.runNotifyCmd(config) g.sendSignalToContainer(config) - case <-quit: - ticker.Stop() - return + case sig := <-sigChan: + log.Printf("Received signal: %s\n", sig) + switch sig { + case syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGINT: + ticker.Stop() + return + } } } }(config) @@ -154,84 +159,128 @@ func (g *generator) generateFromEvents() { return } - g.wg.Add(1) - defer g.wg.Done() - client := g.Client - for { - if client == nil { - var err error - endpoint, err := GetEndpoint(g.Endpoint) - if err != nil { - log.Printf("Bad endpoint: %s", err) - time.Sleep(10 * time.Second) - continue - } + var watchers []chan *docker.APIEvents + + for _, config := range configs.Config { + g.wg.Add(1) + + go func(config Config, watcher chan *docker.APIEvents) { + defer g.wg.Done() + watchers = append(watchers, watcher) - client, err = NewDockerClient(endpoint, g.TLSVerify, g.TLSCert, g.TLSCaCert, g.TLSKey) - if err != nil { - log.Printf("Unable to connect to docker daemon: %s", err) - time.Sleep(10 * time.Second) - continue + debouncedChan := newDebounceChannel(watcher, config.Wait) + for _ = range debouncedChan { + containers, err := g.getContainers() + if err != nil { + log.Printf("Error listing containers: %s\n", err) + continue + } + changed := GenerateFile(config, containers) + if !changed { + log.Printf("Contents of %s did not change. Skipping notification '%s'", config.Dest, config.NotifyCmd) + continue + } + g.runNotifyCmd(config) + g.sendSignalToContainer(config) } - g.generateFromContainers() - } + }(config, make(chan *docker.APIEvents, 100)) + } + // maintains docker client connection and passes events to watchers + go func() { + // channel will be closed by go-dockerclient eventChan := make(chan *docker.APIEvents, 100) - defer close(eventChan) + sigChan := newSignalChannel() - watching := false for { + watching := false if client == nil { - break - } - err := client.Ping() - if err != nil { - log.Printf("Unable to ping docker daemon: %s", err) - if watching { - client.RemoveEventListener(eventChan) - watching = false - client = nil + var err error + endpoint, err := GetEndpoint(g.Endpoint) + if err != nil { + log.Printf("Bad endpoint: %s", err) + time.Sleep(10 * time.Second) + continue } - time.Sleep(10 * time.Second) - break - - } - - if !watching { - err = client.AddEventListener(eventChan) - if err != nil && err != docker.ErrListenerAlreadyExists { - log.Printf("Error registering docker event listener: %s", err) + client, err = NewDockerClient(endpoint, g.TLSVerify, g.TLSCert, g.TLSCaCert, g.TLSKey) + if err != nil { + log.Printf("Unable to connect to docker daemon: %s", err) time.Sleep(10 * time.Second) continue } - watching = true - log.Println("Watching docker events") } - select { - - case event := <-eventChan: - if event == nil { - if watching { - client.RemoveEventListener(eventChan) - watching = false - client = nil - } + for { + if client == nil { break } - - if event.Status == "start" || event.Status == "stop" || event.Status == "die" { - log.Printf("Received event %s for container %s", event.Status, event.ID[:12]) + if !watching { + err := client.AddEventListener(eventChan) + if err != nil && err != docker.ErrListenerAlreadyExists { + log.Printf("Error registering docker event listener: %s", err) + time.Sleep(10 * time.Second) + continue + } + watching = true + log.Println("Watching docker events") + // sync all configs after resuming listener g.generateFromContainers() } - case <-time.After(10 * time.Second): - // check for docker liveness + select { + case event, ok := <-eventChan: + if !ok { + log.Printf("Docker daemon connection interrupted") + if watching { + client.RemoveEventListener(eventChan) + watching = false + client = nil + } + if !g.retry { + // close all watchers and exit + for _, watcher := range watchers { + close(watcher) + } + return + } + // recreate channel and attempt to resume + eventChan = make(chan *docker.APIEvents, 100) + time.Sleep(10 * time.Second) + break + } + if event.Status == "start" || event.Status == "stop" || event.Status == "die" { + log.Printf("Received event %s for container %s", event.Status, event.ID[:12]) + // fanout event to all watchers + for _, watcher := range watchers { + watcher <- event + } + } + case <-time.After(10 * time.Second): + // check for docker liveness + err := client.Ping() + if err != nil { + log.Printf("Unable to ping docker daemon: %s", err) + if watching { + client.RemoveEventListener(eventChan) + watching = false + client = nil + } + } + case sig := <-sigChan: + log.Printf("Received signal: %s\n", sig) + switch sig { + case syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGINT: + // close all watchers and exit + for _, watcher := range watchers { + close(watcher) + } + return + } + } } - } - } + }() } func (g *generator) runNotifyCmd(config Config) { @@ -275,10 +324,10 @@ func (g *generator) getContainers() ([]*RuntimeContainer, error) { apiInfo, err := g.Client.Info() if err != nil { log.Printf("error retrieving docker server info: %s\n", err) + } else { + SetServerInfo(apiInfo) } - SetServerInfo(apiInfo) - apiContainers, err := g.Client.ListContainers(docker.ListContainersOptions{ All: false, Size: false, @@ -381,3 +430,55 @@ func (g *generator) getContainers() ([]*RuntimeContainer, error) { return containers, nil } + +func newSignalChannel() <-chan os.Signal { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) + + return sig +} + +func newDebounceChannel(input chan *docker.APIEvents, wait *Wait) chan *docker.APIEvents { + if wait == nil { + return input + } + if wait.Min == 0 { + return input + } + + output := make(chan *docker.APIEvents, 100) + + go func() { + var ( + event *docker.APIEvents + minTimer <-chan time.Time + maxTimer <-chan time.Time + ) + + defer close(output) + + for { + select { + case buffer, ok := <-input: + if !ok { + return + } + event = buffer + minTimer = time.After(wait.Min) + if maxTimer == nil { + maxTimer = time.After(wait.Max) + } + case <-minTimer: + log.Println("Debounce minTimer fired") + minTimer, maxTimer = nil, nil + output <- event + case <-maxTimer: + log.Println("Debounce maxTimer fired") + minTimer, maxTimer = nil, nil + output <- event + } + } + }() + + return output +} diff --git a/generator_test.go b/generator_test.go new file mode 100644 index 00000000..87b8ce71 --- /dev/null +++ b/generator_test.go @@ -0,0 +1,207 @@ +package dockergen + +import ( + "bufio" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/fsouza/go-dockerclient" + dockertest "github.com/fsouza/go-dockerclient/testing" +) + +func TestGenerateFromEvents(t *testing.T) { + log.SetOutput(ioutil.Discard) + containerID := "8dfafdbc3a40" + counter := 0 + + eventsResponse := ` +{"status":"start","id":"8dfafdbc3a40","from":"base:latest","time":1374067924} +{"status":"stop","id":"8dfafdbc3a40","from":"base:latest","time":1374067966} +{"status":"start","id":"8dfafdbc3a40","from":"base:latest","time":1374067970} +{"status":"destroy","id":"8dfafdbc3a40","from":"base:latest","time":1374067990}` + infoResponse := `{"Containers":1,"Images":1,"Debug":0,"NFd":11,"NGoroutines":21,"MemoryLimit":1,"SwapLimit":0}` + versionResponse := `{"Version":"1.8.0","Os":"Linux","KernelVersion":"3.18.5-tinycore64","GoVersion":"go1.4.1","GitCommit":"a8a31ef","Arch":"amd64","ApiVersion":"1.19"}` + + server, _ := dockertest.NewServer("127.0.0.1:0", nil, nil) + server.CustomHandler("/events", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rsc := bufio.NewScanner(strings.NewReader(eventsResponse)) + for rsc.Scan() { + w.Write([]byte(rsc.Text())) + w.(http.Flusher).Flush() + time.Sleep(15 * time.Millisecond) + } + time.Sleep(500 * time.Millisecond) + })) + server.CustomHandler("/info", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(infoResponse)) + w.(http.Flusher).Flush() + })) + server.CustomHandler("/version", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(versionResponse)) + w.(http.Flusher).Flush() + })) + server.CustomHandler("/containers/json", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + result := []docker.APIContainers{ + docker.APIContainers{ + ID: containerID, + Image: "base:latest", + Command: "/bin/sh", + Created: time.Now().Unix(), + Status: "running", + Ports: []docker.APIPort{}, + Names: []string{"/docker-gen-test"}, + }, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(result) + })) + server.CustomHandler(fmt.Sprintf("/containers/%s/json", containerID), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + counter++ + container := docker.Container{ + Name: "docker-gen-test", + ID: containerID, + Created: time.Now(), + Path: "/bin/sh", + Args: []string{}, + Config: &docker.Config{ + Hostname: "docker-gen", + AttachStdout: true, + AttachStderr: true, + Env: []string{fmt.Sprintf("COUNTER=%d", counter)}, + Cmd: []string{"/bin/sh"}, + Image: "base:latest", + }, + State: docker.State{ + Running: true, + Pid: 400, + ExitCode: 0, + StartedAt: time.Now(), + }, + Image: "0ff407d5a7d9ed36acdf3e75de8cc127afecc9af234d05486be2981cdc01a38d", + NetworkSettings: &docker.NetworkSettings{ + IPAddress: fmt.Sprintf("10.0.0.10"), + IPPrefixLen: 24, + Gateway: "10.0.0.1", + Bridge: "docker0", + PortMapping: map[string]docker.PortMapping{}, + Ports: map[docker.Port][]docker.PortBinding{}, + }, + ResolvConfPath: "/etc/resolv.conf", + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(container) + })) + + serverURL := fmt.Sprintf("tcp://%s", strings.TrimRight(strings.TrimPrefix(server.URL(), "http://"), "/")) + client, err := NewDockerClient(serverURL, false, "", "", "") + if err != nil { + t.Errorf("Failed to create client: %s", err) + } + client.SkipServerVersionCheck = true + + tmplFile, err := ioutil.TempFile(os.TempDir(), "docker-gen-tmpl") + if err != nil { + t.Errorf("Failed to create temp file: %v\n", err) + } + defer func() { + tmplFile.Close() + os.Remove(tmplFile.Name()) + }() + err = ioutil.WriteFile(tmplFile.Name(), []byte("{{range $key, $value := .}}{{$value.ID}}.{{$value.Env.COUNTER}}{{end}}"), 0644) + if err != nil { + t.Errorf("Failed to write to temp file: %v\n", err) + } + + var destFiles []*os.File + for i := 0; i < 4; i++ { + destFile, err := ioutil.TempFile(os.TempDir(), "docker-gen-out") + if err != nil { + t.Errorf("Failed to create temp file: %v\n", err) + } + destFiles = append(destFiles, destFile) + } + defer func() { + for _, destFile := range destFiles { + destFile.Close() + os.Remove(destFile.Name()) + } + }() + + apiVersion, err := client.Version() + if err != nil { + t.Errorf("Failed to retrieve docker server version info: %v\n", err) + } + SetDockerEnv(apiVersion) // prevents a panic + + generator := &generator{ + Client: client, + Endpoint: serverURL, + Configs: ConfigFile{ + []Config{ + Config{ + Template: tmplFile.Name(), + Dest: destFiles[0].Name(), + Watch: false, + }, + Config{ + Template: tmplFile.Name(), + Dest: destFiles[1].Name(), + Watch: true, + Wait: &Wait{0, 0}, + }, + Config{ + Template: tmplFile.Name(), + Dest: destFiles[2].Name(), + Watch: true, + Wait: &Wait{20 * time.Millisecond, 25 * time.Millisecond}, + }, + Config{ + Template: tmplFile.Name(), + Dest: destFiles[3].Name(), + Watch: true, + Wait: &Wait{25 * time.Millisecond, 100 * time.Millisecond}, + }, + }, + }, + retry: false, + } + + generator.generateFromEvents() + generator.wg.Wait() + + var ( + value []byte + expected string + ) + + // The counter is incremented in each output file in the following sequence: + // + // init 0ms 5ms 10ms 15ms 20ms 25ms 30ms 35ms 40ms 45ms 50ms 55ms + // ├──────╫──────┼──────┼──────╫──────┼──────┼──────╫──────┼──────┼──────┼──────┼──────┤ + // File0 ├─ 1 ║ ║ ║ + // File1 ├─ 1 ╟─ 2 ╟─ 3 ╟─ 5 + // File2 ├─ 1 ╟───── max (25ms) ───║───────────> 4 ╟─────── min (20ms) ──────> 6 + // File3 └─ 1 ╟──────────────────> ╟──────────────────> ╟─────────── min (25ms) ─────────> 7 + // ┌───╨───┐ ┌───╨──┐ ┌───╨───┐ + // │ start │ │ stop │ │ start │ + // └───────┘ └──────┘ └───────┘ + + expectedCounters := []int{1, 5, 6, 7} + + for i, counter := range expectedCounters { + value, _ = ioutil.ReadFile(destFiles[i].Name()) + expected = fmt.Sprintf("%s.%d", containerID, counter) + if string(value) != expected { + t.Errorf("expected: %s. got: %s", expected, value) + } + } +}