diff --git a/pkg/ifaces/watcher.go b/pkg/ifaces/watcher.go index 1f25f445..f15597f7 100644 --- a/pkg/ifaces/watcher.go +++ b/pkg/ifaces/watcher.go @@ -20,6 +20,8 @@ const ( netnsVolume = "/var/run/netns" ) +var log = logrus.WithField("component", "ifaces.Watcher") + // Watcher uses system's netlink to get real-time information events about network interfaces' // addition or removal. type Watcher struct { @@ -31,7 +33,7 @@ type Watcher struct { linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error mutex *sync.Mutex netnsWatcher *fsnotify.Watcher - nsDone map[string]chan struct{} + nsDone sync.Map } func NewWatcher(bufLen int) *Watcher { @@ -42,7 +44,7 @@ func NewWatcher(bufLen int) *Watcher { linkSubscriberAt: netlink.LinkSubscribeAt, mutex: &sync.Mutex{}, netnsWatcher: &fsnotify.Watcher{}, - nsDone: make(map[string]chan struct{}), + nsDone: sync.Map{}, } } @@ -50,11 +52,11 @@ func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) { out := make(chan Event, w.bufLen) netns, err := getNetNS() if err != nil { - w.nsDone[""] = make(chan struct{}) + w.nsDone.Store("", make(chan struct{})) go w.sendUpdates(ctx, "", out) } else { for _, n := range netns { - w.nsDone[n] = make(chan struct{}) + w.nsDone.Store(n, make(chan struct{})) go w.sendUpdates(ctx, n, out) } } @@ -66,11 +68,13 @@ func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) { func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { var netnsHandle netns.NsHandle var err error - log := logrus.WithField("component", "ifaces.Watcher") - doneChan := w.nsDone[ns] + ch, ok := w.nsDone.Load(ns) + if !ok { + log.WithError(err).Warnf("netns %s not found in netns map", ns) + return + } + doneChan := ch.(chan struct{}) defer func() { - close(doneChan) - delete(w.nsDone, ns) if netnsHandle.IsOpen() { netnsHandle.Close() } @@ -181,9 +185,41 @@ func getNetNS() ([]string, error) { return netns, nil } +func (w *Watcher) handleEvent(ctx context.Context, event fsnotify.Event, out chan Event) { + ns := filepath.Base(event.Name) + + switch { + case event.Op&fsnotify.Create == fsnotify.Create: + log.WithField("netns", ns).Debug("netns create notification") + w.createNamespace(ctx, ns, out) + case event.Op&fsnotify.Remove == fsnotify.Remove: + log.WithField("netns", ns).Debug("netns delete notification") + w.deleteNamespace(ns) + } +} + +func (w *Watcher) createNamespace(ctx context.Context, ns string, out chan Event) { + if ch, ok := w.nsDone.Load(ns); ok { + log.WithField("netns", ns).Debug("netns channel already exists, deleting it") + close(ch.(chan struct{})) + w.nsDone.Delete(ns) + } + + w.nsDone.Store(ns, make(chan struct{})) + go w.sendUpdates(ctx, ns, out) +} + +func (w *Watcher) deleteNamespace(ns string) { + if ch, ok := w.nsDone.Load(ns); ok { + close(ch.(chan struct{})) + w.nsDone.Delete(ns) + } else { + log.WithField("netns", ns).Debug("netns delete but no channel exists") + } +} + func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) { var err error - log := logrus.WithField("component", "ifaces.Watcher") w.netnsWatcher, err = fsnotify.NewWatcher() if err != nil { @@ -198,25 +234,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) { if !ok { return } - if event.Op&fsnotify.Create == fsnotify.Create { - ns := filepath.Base(event.Name) - log.WithField("netns", ns).Debug("netns create notification") - if _, ok := w.nsDone[ns]; ok { - log.WithField("netns", ns).Debug("netns channel already exists, delete it") - delete(w.nsDone, ns) - } - w.nsDone[ns] = make(chan struct{}) - go w.sendUpdates(ctx, ns, out) - } - if event.Op&fsnotify.Remove == fsnotify.Remove { - ns := filepath.Base(event.Name) - log.WithField("netns", ns).Debug("netns delete notification") - if _, ok := w.nsDone[ns]; ok { - w.nsDone[ns] <- struct{}{} - } else { - log.WithField("netns", ns).Debug("netns delete but there is no channel to send events to") - } - } + w.handleEvent(ctx, event, out) case err, ok := <-w.netnsWatcher.Errors: if !ok { return