Skip to content

Commit

Permalink
Merge pull request #16 from lxs137/watcher-nil-event
Browse files Browse the repository at this point in the history
restart watcher when receive event with nil Object
  • Loading branch information
byxorna authored May 13, 2019
2 parents 425ac57 + d99c667 commit 700e129
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
7 changes: 6 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ func main() {
glog.Infof("launching watcher for ConfigMaps")
err := configWatcher.Watch(ctx, sigChan)
if err != nil {
glog.Fatalf("error watching for new ConfigMaps (terminating): %s", err.Error())
switch err {
case watcher.WatchChannelClosedError:
glog.Errorf("watcher got error, try to restart watcher: %s", err.Error())
default:
glog.Fatalf("error watching for new ConfigMaps (terminating): %s", err.Error())
}
}
}
}()
Expand Down
20 changes: 16 additions & 4 deletions internal/pkg/config/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package watcher

import (
"context"
"errors"
"fmt"
"io/ioutil"
"strings"

"github.com/golang/glog"
"github.com/tumblr/k8s-sidecar-injector/internal/pkg/config"
"k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -24,6 +26,9 @@ const (
serviceAccountNamespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
)

// WatchChannelClosedError: should restart watcher
var WatchChannelClosedError = errors.New("watcher channel has closed")

// K8sConfigMapWatcher is a struct that connects to the API and collects, parses, and emits sidecar configurations
type K8sConfigMapWatcher struct {
Config
Expand Down Expand Up @@ -98,14 +103,22 @@ func (c *K8sConfigMapWatcher) Watch(ctx context.Context, notifyMe chan<- interfa
watcher, err := c.client.ConfigMaps(c.Namespace).Watch(metav1.ListOptions{
LabelSelector: mapStringStringToLabelSelector(c.ConfigMapLabels),
})
defer watcher.Stop()
if err != nil {
return fmt.Errorf("unable to create watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
}
ch := watcher.ResultChan()
var e watch.Event
for {
select {
case e = <-ch:
case e, ok := <-watcher.ResultChan():
// channel may closed caused by HTTP timeout, should restart watcher
// detail at https://github.com/kubernetes/client-go/issues/334
if !ok {
glog.Errorf("channel has closed, should restart watcher")
return WatchChannelClosedError
}
if e.Type == watch.Error {
return apierrs.FromObject(e.Object)
}
glog.V(3).Infof("event: %s %s", e.Type, e.Object.GetObjectKind())
switch e.Type {
case watch.Added:
Expand All @@ -123,7 +136,6 @@ func (c *K8sConfigMapWatcher) Watch(ctx context.Context, notifyMe chan<- interfa
case <-ctx.Done():
glog.V(2).Infof("stopping configmap watcher, context indicated we are done")
// clean up, we cancelled the context, so stop the watch
watcher.Stop()
return nil
}
}
Expand Down
23 changes: 23 additions & 0 deletions internal/pkg/config/watcher/watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package watcher

import (
"context"
"k8s.io/apimachinery/pkg/watch"
testcore "k8s.io/client-go/testing"
"testing"

_ "github.com/tumblr/k8s-sidecar-injector/internal/pkg/testing"
Expand Down Expand Up @@ -30,3 +33,23 @@ func TestGet(t *testing.T) {
t.Fatalf("expected 0 messages, but got %d", len(messages))
}
}

func TestWatcherChannelClose(t *testing.T) {
client := fake.NewSimpleClientset()
watcher := watch.NewEmptyWatch()
client.PrependWatchReactor("configmaps", testcore.DefaultWatchReactor(watcher, nil))

w := K8sConfigMapWatcher{
Config: testConfig,
client: client.CoreV1(),
}

sigChan := make(chan interface{}, 10)
// background context never canceled, no deadline
ctx := context.Background()

err := w.Watch(ctx, sigChan)
if err != nil && err != WatchChannelClosedError {
t.Errorf("expect catch WatchChannelClosedError, but got %s", err)
}
}

0 comments on commit 700e129

Please sign in to comment.