Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor docker watcher to fix flaky test and other small issues #21851

Merged
merged 9 commits into from
Oct 16, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258]
- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349]
- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851]

*Auditbeat*

Expand Down
279 changes: 151 additions & 128 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package docker

import (
"fmt"
"context"
"io"
"net/http"
"sync"
"time"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/go-connections/tlsconfig"
"golang.org/x/net/context"

"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -39,7 +39,6 @@ import (
const (
shortIDLen = 12
dockerRequestTimeout = 10 * time.Second
dockerWatchRequestTimeout = 60 * time.Minute
dockerEventsWatchPityTimerInterval = 10 * time.Second
dockerEventsWatchPityTimerTimeout = 10 * time.Minute
)
Expand Down Expand Up @@ -74,18 +73,17 @@ type TLSConfig struct {

type watcher struct {
sync.RWMutex
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
lastWatchReceivedEventTime time.Time
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
log *logp.Logger
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
clock clock
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
}

// Container info retrieved by the watcher
Expand Down Expand Up @@ -147,8 +145,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool

// NewWatcherWithClient creates a new Watcher from a given Docker client
func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) {
log = log.Named("docker")

ctx, cancel := context.WithCancel(context.Background())
return &watcher{
log: log,
Expand All @@ -160,6 +156,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D
cleanupTimeout: cleanupTimeout,
bus: bus.New(log, "docker"),
shortID: storeShortID,
clock: &systemClock{},
}, nil
}

Expand All @@ -177,7 +174,7 @@ func (w *watcher) Container(ID string) *Container {
// Update last access time if it's deleted
if ok {
w.Lock()
w.deleted[container.ID] = time.Now()
w.deleted[container.ID] = w.clock.Now()
w.Unlock()
}

Expand All @@ -201,7 +198,6 @@ func (w *watcher) Containers() map[string]*Container {
func (w *watcher) Start() error {
// Do initial scan of existing containers
w.log.Debug("Start docker containers scanner")
w.lastValidTimestamp = time.Now().Unix()

w.Lock()
defer w.Unlock()
Expand Down Expand Up @@ -236,108 +232,124 @@ func (w *watcher) Start() error {

func (w *watcher) Stop() {
w.stop()
w.stopped.Wait()
}

func (w *watcher) watch() {
log := w.log
defer w.stopped.Done()

filter := filters.NewArgs()
filter.Add("type", "container")

for {
// Ticker to restart the watcher when no events are received after some time.
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

lastValidTimestamp := w.clock.Now()

watch := func() bool {
lastReceivedEventTime := w.clock.Now()

w.log.Debugf("Fetching events since %s", lastValidTimestamp)

options := types.EventsOptions{
Since: fmt.Sprintf("%d", w.lastValidTimestamp),
Since: lastValidTimestamp.Format(time.RFC3339Nano),
Filters: filter,
}

log.Debugf("Fetching events since %s", options.Since)
ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout)
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()

events, errors := w.client.Events(ctx, options)

//ticker for timeout to restart watcher when no events are received
w.lastWatchReceivedEventTime = time.Now()
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

WATCH:
for {
select {
case event := <-events:
log.Debugf("Got a new docker event: %v", event)
w.lastValidTimestamp = event.Time
w.lastWatchReceivedEventTime = time.Now()

// Add / update
if event.Action == "start" || event.Action == "update" {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)

containers, err := w.listContainers(types.ContainerListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
log.Errorf("Error getting container info: %v", err)
continue
}
container := containers[0]

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()

w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}

// Delete
if event.Action == "die" {
container := w.Container(event.Actor.ID)
if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}

w.Lock()
w.deleted[event.Actor.ID] = time.Now()
w.Unlock()
w.log.Debugf("Got a new docker event: %v", event)
lastValidTimestamp = time.Unix(event.Time, event.TimeNano)
lastReceivedEventTime = w.clock.Now()

switch event.Action {
case "start", "update":
w.containerUpdate(event)
case "die":
w.containerDelete(event)
}

case err := <-errors:
// Restart watch call
if err == context.DeadlineExceeded {
log.Info("Context deadline exceeded for docker request, restarting watch call")
} else {
log.Errorf("Error watching for docker events: %+v", err)
switch err {
case io.EOF:
// Client disconnected, watch is not done, reconnect
w.log.Debug("EOF received in events stream, restarting watch call")
case context.DeadlineExceeded:
w.log.Debug("Context deadline exceeded for docker request, restarting watch call")
case context.Canceled:
// Parent context has been canceled, watch is done.
return true
default:
w.log.Errorf("Error watching for docker events: %+v", err)
}

time.Sleep(1 * time.Second)
break WATCH

return false
case <-tickChan.C:
if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
time.Sleep(1 * time.Second)
break WATCH
if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
return false
}
Comment on lines +303 to 306
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this reconnection is needed, leaving it just in case, I guess this is to handle connections being stalled for some reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


case <-w.ctx.Done():
log.Debug("Watcher stopped")
w.stopped.Done()
return
w.log.Debug("Watcher stopped")
return true
}
}
}

for {
done := watch()
if done {
return
}
// Wait before trying to reconnect
time.Sleep(1 * time.Second)
}
}

func (w *watcher) containerUpdate(event events.Message) {
filter := filters.NewArgs()
filter.Add("id", event.Actor.ID)

containers, err := w.listContainers(types.ContainerListOptions{
Filters: filter,
})
if err != nil || len(containers) != 1 {
w.log.Errorf("Error getting container info: %v", err)
return
}
container := containers[0]

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()

w.bus.Publish(bus.Event{
"start": true,
"container": container,
})
}

func (w *watcher) containerDelete(event events.Message) {
container := w.Container(event.Actor.ID)

w.Lock()
w.deleted[event.Actor.ID] = w.clock.Now()
w.Unlock()

if container != nil {
w.bus.Publish(bus.Event{
"stop": true,
"container": container,
})
}
}

Expand Down Expand Up @@ -393,49 +405,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain

// Clean up deleted containers after they are not used anymore
func (w *watcher) cleanupWorker() {
log := w.log
defer w.stopped.Done()

for {
select {
case <-w.ctx.Done():
w.stopped.Done()
return
// Wait a full period
case <-time.After(w.cleanupTimeout):
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}
w.runCleanup()
}
}
}

w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
func (w *watcher) runCleanup() {
// Check entries for timeout
var toDelete []string
timeout := w.clock.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
if lastSeen.Before(timeout) {
w.log.Debugf("Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
for _, key := range toDelete {
container := w.Container(key)
if container != nil {
w.bus.Publish(bus.Event{
"delete": true,
"container": container,
})
}
}

w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
}

// ListenStart returns a bus listener to receive container started events, with a `container` key holding it
Expand All @@ -447,3 +462,11 @@ func (w *watcher) ListenStart() bus.Listener {
func (w *watcher) ListenStop() bus.Listener {
return w.bus.Subscribe("stop")
}

type clock interface {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
Now() time.Time
}

type systemClock struct{}
jsoriano marked this conversation as resolved.
Show resolved Hide resolved

func (*systemClock) Now() time.Time { return time.Now() }
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
Loading