Skip to content

Commit

Permalink
Refactor docker watcher to fix flaky test and other small issues (ela…
Browse files Browse the repository at this point in the history
…stic#21851)

Refactor docker watcher to fix some small issues and improve testability:
* Actually release resources of previous connections when reconnecting.
* Watcher uses a clock that can be mocked in tests for time-sensitive functionality.
* Use nanoseconds-precision from events timestamps, this is important to avoid duplicated events on reconnection.
* Fix logger initialization (it was being initialized as docker.docker).
* Refactor test helpers to have more control on test watcher when needed.
* Some other code refactors.

(cherry picked from commit 4427fa5)
  • Loading branch information
jsoriano committed Oct 16, 2020
1 parent 178002a commit 0dc5af5
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 180 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21258[21258]
- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349]
- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851]

*Auditbeat*

Expand Down
282 changes: 154 additions & 128 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package docker

import (
"fmt"
"context"
"io"
"net/http"
"sync"
"time"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/go-connections/tlsconfig"
"golang.org/x/net/context"

"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -39,7 +39,6 @@ import (
const (
shortIDLen = 12
dockerRequestTimeout = 10 * time.Second
dockerWatchRequestTimeout = 60 * time.Minute
dockerEventsWatchPityTimerInterval = 10 * time.Second
dockerEventsWatchPityTimerTimeout = 10 * time.Minute
)
Expand Down Expand Up @@ -74,20 +73,30 @@ type TLSConfig struct {

type watcher struct {
sync.RWMutex
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
lastWatchReceivedEventTime time.Time
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
clock clock
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
}

// clock is an interface used to provide mocked time on testing
type clock interface {
Now() time.Time
}

// systemClock implements the clock interface using the system clock via the time package
type systemClock struct{}

// Now returns the current time
func (*systemClock) Now() time.Time { return time.Now() }

// Container info retrieved by the watcher
type Container struct {
ID string
Expand Down Expand Up @@ -147,8 +156,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool

// NewWatcherWithClient creates a new Watcher from a given Docker client
func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) {
log = log.Named("docker")

ctx, cancel := context.WithCancel(context.Background())
return &watcher{
log: log,
Expand All @@ -160,6 +167,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D
cleanupTimeout: cleanupTimeout,
bus: bus.New(log, "docker"),
shortID: storeShortID,
clock: &systemClock{},
}, nil
}

Expand All @@ -177,7 +185,7 @@ func (w *watcher) Container(ID string) *Container {
// Update last access time if it's deleted
if ok {
w.Lock()
w.deleted[container.ID] = time.Now()
w.deleted[container.ID] = w.clock.Now()
w.Unlock()
}

Expand All @@ -201,7 +209,6 @@ func (w *watcher) Containers() map[string]*Container {
func (w *watcher) Start() error {
// Do initial scan of existing containers
w.log.Debug("Start docker containers scanner")
w.lastValidTimestamp = time.Now().Unix()

w.Lock()
defer w.Unlock()
Expand Down Expand Up @@ -236,108 +243,124 @@ func (w *watcher) Start() error {

func (w *watcher) Stop() {
w.stop()
w.stopped.Wait()
}

func (w *watcher) watch() {
log := w.log
defer w.stopped.Done()

filter := filters.NewArgs()
filter.Add("type", "container")

for {
// Ticker to restart the watcher when no events are received after some time.
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

lastValidTimestamp := w.clock.Now()

watch := func() bool {
lastReceivedEventTime := w.clock.Now()

w.log.Debugf("Fetching events since %s", lastValidTimestamp)

options := types.EventsOptions{
Since: fmt.Sprintf("%d", w.lastValidTimestamp),
Since: lastValidTimestamp.Format(time.RFC3339Nano),
Filters: filter,
}

log.Debugf("Fetching events since %s", options.Since)
ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout)
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()

events, errors := w.client.Events(ctx, options)

//ticker for timeout to restart watcher when no events are received
w.lastWatchReceivedEventTime = time.Now()
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

WATCH:
for {
select {
case event := <-events:
log.Debugf("Got a new docker event: %v", event)
w.lastValidTimestamp = event.Time
w.lastWatchReceivedEventTime = time.Now()

// Add / update
if event.Action == "start" || event.Action == "update" {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)

containers, err := w.listContainers(types.ContainerListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
log.Errorf("Error getting container info: %v", err)
continue
}
container := containers[0]

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()

w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}

// Delete
if event.Action == "die" {
container := w.Container(event.Actor.ID)
if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}

w.Lock()
w.deleted[event.Actor.ID] = time.Now()
w.Unlock()
w.log.Debugf("Got a new docker event: %v", event)
lastValidTimestamp = time.Unix(event.Time, event.TimeNano)
lastReceivedEventTime = w.clock.Now()

switch event.Action {
case "start", "update":
w.containerUpdate(event)
case "die":
w.containerDelete(event)
}

case err := <-errors:
// Restart watch call
if err == context.DeadlineExceeded {
log.Info("Context deadline exceeded for docker request, restarting watch call")
} else {
log.Errorf("Error watching for docker events: %+v", err)
switch err {
case io.EOF:
// Client disconnected, watch is not done, reconnect
w.log.Debug("EOF received in events stream, restarting watch call")
case context.DeadlineExceeded:
w.log.Debug("Context deadline exceeded for docker request, restarting watch call")
case context.Canceled:
// Parent context has been canceled, watch is done.
return true
default:
w.log.Errorf("Error watching for docker events: %+v", err)
}

time.Sleep(1 * time.Second)
break WATCH

return false
case <-tickChan.C:
if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
time.Sleep(1 * time.Second)
break WATCH
if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
return false
}

case <-w.ctx.Done():
log.Debug("Watcher stopped")
w.stopped.Done()
return
w.log.Debug("Watcher stopped")
return true
}
}
}

for {
done := watch()
if done {
return
}
// Wait before trying to reconnect
time.Sleep(1 * time.Second)
}
}

func (w *watcher) containerUpdate(event events.Message) {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)

containers, err := w.listContainers(types.ContainerListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
w.log.Errorf("Error getting container info: %v", err)
return
}
container := containers[0]

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()

w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}

func (w *watcher) containerDelete(event events.Message) {
container := w.Container(event.Actor.ID)

w.Lock()
w.deleted[event.Actor.ID] = w.clock.Now()
w.Unlock()

if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}
}

Expand Down Expand Up @@ -393,49 +416,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain

// Clean up deleted containers after they are not used anymore
func (w *watcher) cleanupWorker() {
log := w.log
defer w.stopped.Done()

for {
select {
case <-w.ctx.Done():
w.stopped.Done()
return
// Wait a full period
case <-time.After(w.cleanupTimeout):
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}
w.runCleanup()
}
}
}

w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
func (w *watcher) runCleanup() {
// Check entries for timeout
var toDelete []string
timeout := w.clock.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
w.log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}

w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
}

// ListenStart returns a bus listener to receive container started events, with a `container` key holding it
Expand Down
Loading

0 comments on commit 0dc5af5

Please sign in to comment.