diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 0ade51902b492..ffa168fde43d2 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -25,6 +25,8 @@ const ( FilenameLabel = "filename" ) +var errFileTargetStopped = errors.New("File target is stopped") + // Config describes behavior for Target type Config struct { SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"` @@ -223,6 +225,11 @@ func (t *FileTarget) run() { } case <-ticker.C: err := t.sync() + if errors.Is(err, errFileTargetStopped) { + // This file target has been stopped. + // This is normal and there is no need to log an error. + return + } if err != nil { level.Error(t.logger).Log("msg", "error running sync function", "error", err) } @@ -291,14 +298,20 @@ func (t *FileTarget) sync() error { t.watchesMutex.Lock() toStartWatching := missing(t.watches, dirs) t.watchesMutex.Unlock() - t.startWatching(toStartWatching) + err := t.startWatching(toStartWatching) + if errors.Is(err, errFileTargetStopped) { + return err + } // Remove any directories which no longer need watching. t.watchesMutex.Lock() toStopWatching := missing(dirs, t.watches) t.watchesMutex.Unlock() - t.stopWatching(toStopWatching) + err = t.stopWatching(toStopWatching) + if errors.Is(err, errFileTargetStopped) { + return err + } // fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves. t.watchesMutex.Lock() @@ -321,32 +334,42 @@ func (t *FileTarget) sync() error { return nil } -func (t *FileTarget) startWatching(dirs map[string]struct{}) { +func (t *FileTarget) startWatching(dirs map[string]struct{}) error { for dir := range dirs { if _, ok := t.getWatch(dir); ok { continue } level.Info(t.logger).Log("msg", "watching new directory", "directory", dir) - t.targetEventHandler <- fileTargetEvent{ + select { + case <-t.quit: + return errFileTargetStopped + case t.targetEventHandler <- fileTargetEvent{ path: dir, eventType: fileTargetEventWatchStart, + }: } } + return nil } -func (t *FileTarget) stopWatching(dirs map[string]struct{}) { +func (t *FileTarget) stopWatching(dirs map[string]struct{}) error { for dir := range dirs { if _, ok := t.getWatch(dir); !ok { continue } level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir) - t.targetEventHandler <- fileTargetEvent{ + select { + case <-t.quit: + return errFileTargetStopped + case t.targetEventHandler <- fileTargetEvent{ path: dir, eventType: fileTargetEventWatchStop, + }: } } + return nil } func (t *FileTarget) startTailing(ps []string) { diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 579ea19e2e56e..caf33395ba201 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -336,6 +336,93 @@ func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) { ps.Stop() } +// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send. +func TestFileTarget_StopAbruptly(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + dirName := newTestLogDirectories(t) + positionsFileName := filepath.Join(dirName, "positions.yml") + logDir1 := filepath.Join(dirName, "log1") + logDir2 := filepath.Join(dirName, "log2") + logDir3 := filepath.Join(dirName, "log3") + + logfile1 := filepath.Join(logDir1, "test1.log") + logfile2 := filepath.Join(logDir2, "test1.log") + logfile3 := filepath.Join(logDir3, "test1.log") + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Millisecond, + PositionsFile: positionsFileName, + }) + require.NoError(t, err) + + client := fake.New(func() {}) + defer client.Stop() + + // fakeHandler has to be a buffered channel so that we can call the len() function on it. + // We need to call len() to check if the channel is full. + fakeHandler := make(chan fileTargetEvent, 1) + pathToWatch := filepath.Join(dirName, "**", "*.log") + registry := prometheus.NewRegistry() + target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{ + SyncPeriod: 10 * time.Millisecond, + }, DefaultWatchConig, nil, fakeHandler, "", nil) + assert.NoError(t, err) + + // Create a directory, still nothing is watched. + err = os.MkdirAll(logDir1, 0750) + assert.NoError(t, err) + _, err = os.Create(logfile1) + assert.NoError(t, err) + + // There should be only one WatchStart event in the channel so far. + ftEvent := <-fakeHandler + require.Equal(t, fileTargetEventWatchStart, ftEvent.eventType) + + requireEventually(t, func() bool { + return target.getReadersLen() == 1 + }, "expected 1 tailer to be created") + + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP promtail_files_active_total Number of active files. + # TYPE promtail_files_active_total gauge + promtail_files_active_total 1 + `), "promtail_files_active_total")) + + // Create two directories - one more than the buffer of fakeHandler, + // so that the file target hands until we call Stop(). + err = os.MkdirAll(logDir2, 0750) + assert.NoError(t, err) + _, err = os.Create(logfile2) + assert.NoError(t, err) + + err = os.MkdirAll(logDir3, 0750) + assert.NoError(t, err) + _, err = os.Create(logfile3) + assert.NoError(t, err) + + // Wait until the file target is waiting on a channel send due to a full channel buffer. + requireEventually(t, func() bool { + return len(fakeHandler) == 1 + }, "expected an event in the fakeHandler channel") + + // If FileHandler works well, then it will stop waiting for + // the blocked fakeHandler and stop cleanly. + // This is why this time we don't drain fakeHandler. + requireEventually(t, func() bool { + target.Stop() + ps.Stop() + return true + }, "expected FileTarget not to hang") + + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP promtail_files_active_total Number of active files. + # TYPE promtail_files_active_total gauge + promtail_files_active_total 0 + `), "promtail_files_active_total")) +} + func TestFileTargetPathExclusion(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w)