diff --git a/internal/prefixcollector/collector_configmap_output_test.go b/internal/prefixcollector/collector_configmap_output_test.go index 39b58ce..8f4b569 100644 --- a/internal/prefixcollector/collector_configmap_output_test.go +++ b/internal/prefixcollector/collector_configmap_output_test.go @@ -56,14 +56,22 @@ const ( type dummyPrefixSource struct { prefixes []string + notify chan<- struct{} } func (d *dummyPrefixSource) Prefixes() []string { return d.prefixes } -func newDummyPrefixSource(prefixes []string) *dummyPrefixSource { - return &dummyPrefixSource{prefixes} +func (d *dummyPrefixSource) SendNotification() { + d.notify <- struct{}{} +} + +func newDummyPrefixSource(prefixes []string, notify chan<- struct{}) *dummyPrefixSource { + return &dummyPrefixSource{ + prefixes: prefixes, + notify: notify, + } } type ExcludedPrefixesSuite struct { @@ -94,6 +102,7 @@ func (eps *ExcludedPrefixesSuite) TestCollectorWithDummySources() { "127.0.2.1/16", "168.92.0.1/24", }, + notifyChan, ), newDummyPrefixSource( []string{ @@ -101,6 +110,7 @@ func (eps *ExcludedPrefixesSuite) TestCollectorWithDummySources() { "134.56.0.1/8", "168.92.0.1/16", }, + notifyChan, ), } @@ -263,6 +273,7 @@ func (eps *ExcludedPrefixesSuite) TestAllSources() { "127.0.2.1/16", "168.92.0.1/24", }, + notifyChan, ), prefixsource.NewKubeAdmPrefixSource(ctx, notifyChan), prefixsource.NewConfigMapPrefixSource( diff --git a/internal/prefixcollector/collector_file_output_test.go b/internal/prefixcollector/collector_file_output_test.go index 4dff331..6874b05 100644 --- a/internal/prefixcollector/collector_file_output_test.go +++ b/internal/prefixcollector/collector_file_output_test.go @@ -25,6 +25,7 @@ import ( "context" "os" "path/filepath" + "sync/atomic" "time" "github.com/fsnotify/fsnotify" @@ -61,6 +62,7 @@ func (eps *ExcludedPrefixesSuite) TestAllSourcesWithFileOutput() { "127.0.2.1/16", "168.92.0.1/24", }, + notifyChan, ), prefixsource.NewKubeAdmPrefixSource(ctx, notifyChan), prefixsource.NewConfigMapPrefixSource( @@ -75,6 +77,40 @@ func (eps *ExcludedPrefixesSuite) TestAllSourcesWithFileOutput() { eps.testCollectorWithFileOutput(ctx, notifyChan, expectedResult, sources) } +func (eps *ExcludedPrefixesSuite) TestDummySourceWithSeveralNotifications() { + defer goleak.VerifyNone(eps.T(), goleak.IgnoreCurrent()) + expectedResult := []string{ + "127.0.0.0/16", + "168.92.0.0/24", + } + + notificationCount := 5 + + notifyChan := make(chan struct{}, notificationCount) + ctx, cancel := context.WithCancel(prefixcollector.WithKubernetesInterface(context.Background(), eps.clientSet)) + defer cancel() + + eps.createConfigMap(ctx, prefixsource.KubeNamespace, kubeConfigMapPath) + eps.createConfigMap(ctx, configMapNamespace, configMapPath) + + source := newDummyPrefixSource( + []string{ + "127.0.0.1/16", + "127.0.2.1/16", + "168.92.0.1/24", + }, + notifyChan, + ) + + sources := []prefixcollector.PrefixSource{source} + + for i := 0; i < notificationCount; i++ { + source.SendNotification() + } + + eps.testCollectorWithFileOutput(ctx, notifyChan, expectedResult, sources) +} + func (eps *ExcludedPrefixesSuite) testCollectorWithFileOutput(ctx context.Context, notifyChan chan struct{}, expectedResult []string, sources []prefixcollector.PrefixSource) { prefixesFilePath := filepath.Join(os.TempDir(), prefixesFileName) @@ -90,7 +126,8 @@ func (eps *ExcludedPrefixesSuite) testCollectorWithFileOutput(ctx context.Contex ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - watcher, errCh := eps.watchFile(ctx, prefixesFilePath, len(sources)) + var modified atomic.Int32 + watcher, errCh := eps.watchFile(ctx, prefixesFilePath, &modified) go collector.Serve(ctx) @@ -112,14 +149,14 @@ func (eps *ExcludedPrefixesSuite) testCollectorWithFileOutput(ctx context.Contex eps.T().Fatalf("Error transforming yaml to prefixes: %v", err) } + eps.Require().LessOrEqual(int(modified.Load()), len(sources)*2) eps.Require().ElementsMatch(expectedResult, prefixes) } func (eps *ExcludedPrefixesSuite) watchFile(ctx context.Context, prefixesFilePath string, - maxModifyCount int) (watcher *fsnotify.Watcher, errorCh chan error) { + modified *atomic.Int32) (watcher *fsnotify.Watcher, errorCh chan error) { watcher, err := fsnotify.NewWatcher() errorCh = make(chan error) - modifyCount := 0 if err != nil { errorCh <- err @@ -141,11 +178,7 @@ func (eps *ExcludedPrefixesSuite) watchFile(ctx context.Context, prefixesFilePat return } if event.Op&fsnotify.Write == fsnotify.Write { - modifyCount++ - if modifyCount == maxModifyCount { - close(errorCh) - return - } + modified.Add(1) } case watcherError, ok := <-watcher.Errors: if !ok {