diff --git a/pkg/promtail/targets/file/filetarget.go b/pkg/promtail/targets/file/filetarget.go index 38bde073f58b0..93783ba0a4759 100644 --- a/pkg/promtail/targets/file/filetarget.go +++ b/pkg/promtail/targets/file/filetarget.go @@ -162,8 +162,7 @@ func (t *FileTarget) run() { defer func() { helpers.LogError("closing watcher", t.watcher.Close) for _, v := range t.tails { - helpers.LogError("updating tailer last position", v.markPositionAndSize) - helpers.LogError("stopping tailer", v.stop) + v.stop() } level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved") close(t.done) @@ -245,12 +244,16 @@ func (t *FileTarget) sync() error { // fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves. t.watches = dirs + // Check if any running tailers have stopped because of errors and remove them from the running list + // (They will be restarted in startTailing) + t.pruneStoppedTailers() + // Start tailing all of the matched files if not already doing so. t.startTailing(matches) // Stop tailing any files which no longer exist toStopTailing := toStopTailing(matches, t.tails) - t.stopTailing(toStopTailing) + t.stopTailingAndRemovePosition(toStopTailing) return nil } @@ -304,16 +307,32 @@ func (t *FileTarget) startTailing(ps []string) { } } -func (t *FileTarget) stopTailing(ps []string) { +// stopTailingAndRemovePositions will stop the tailer and remove the positions entry. +// Call this when a file no longer exists and you want to remove all traces of it. +func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { for _, p := range ps { if tailer, ok := t.tails[p]; ok { - helpers.LogError("stopping tailer", tailer.stop) - tailer.cleanup() + tailer.stop() + t.positions.Remove(tailer.path) delete(t.tails, p) } } } +// pruneStoppedTailers removes any tailers which have stopped running from +// the list of active tailers. This allows them to be restarted if there were errors. +func (t *FileTarget) pruneStoppedTailers() { + toRemove := make([]string, 0, len(t.tails)) + for k, t := range t.tails { + if !t.isRunning() { + toRemove = append(toRemove, k) + } + } + for _, tr := range toRemove { + delete(t.tails, tr) + } +} + func toStopTailing(nt []string, et map[string]*tailer) []string { // Make a set of all existing tails existingTails := make(map[string]struct{}, len(et)) diff --git a/pkg/promtail/targets/file/tailer.go b/pkg/promtail/targets/file/tailer.go index ebb72f71a652b..3e4258ae79ead 100644 --- a/pkg/promtail/targets/file/tailer.go +++ b/pkg/promtail/targets/file/tailer.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/hpcloud/tail" "github.com/prometheus/common/model" + "go.uber.org/atomic" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/positions" @@ -25,8 +26,9 @@ type tailer struct { posAndSizeMtx sync.Mutex - quit chan struct{} - done chan struct{} + running *atomic.Bool + quit chan struct{} + done chan struct{} } func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string) (*tailer, error) { @@ -46,9 +48,9 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions. } tail, err := tail.TailFile(path, tail.Config{ - Follow: true, - Poll: true, - ReOpen: true, + Follow: true, + Poll: true, + ReOpen: true, MustExist: true, Location: &tail.SeekInfo{ Offset: pos, @@ -64,11 +66,11 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions. logger: logger, handler: api.AddLabelsMiddleware(model.LabelSet{FilenameLabel: model.LabelValue(path)}).Wrap(handler), positions: positions, - - path: path, - tail: tail, - quit: make(chan struct{}), - done: make(chan struct{}), + path: path, + tail: tail, + running: atomic.NewBool(false), + quit: make(chan struct{}), + done: make(chan struct{}), } tail.Logger = util.NewLogAdapter(logger) @@ -81,9 +83,20 @@ func (t *tailer) run() { level.Info(t.logger).Log("msg", "start tailing file", "path", t.path) positionSyncPeriod := t.positions.SyncPeriod() positionWait := time.NewTicker(positionSyncPeriod) + t.running.Store(true) + // This function runs in a goroutine, if it exits this tailer will never do any more tailing. + // Clean everything up. defer func() { + err := t.tail.Stop() + if err != nil { + level.Error(t.logger).Log("msg", "error stopping tailer when exiting tail goroutine", "path", t.path, "error", err) + } + positionWait.Stop() + t.cleanupMetrics() + t.running.Store(false) + close(t.done) }() @@ -92,8 +105,8 @@ func (t *tailer) run() { case <-positionWait.C: err := t.markPositionAndSize() if err != nil { - level.Error(t.logger).Log("msg", "error getting tail position and/or size", "path", t.path, "error", err) - continue + level.Error(t.logger).Log("msg", "error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) + return } case line, ok := <-t.tail.Lines: @@ -101,8 +114,10 @@ func (t *tailer) run() { return } + // Note currently the tail implementation hardcodes Err to nil, this should never hit. if line.Err != nil { level.Error(t.logger).Log("msg", "error reading line", "path", t.path, "error", line.Err) + continue } readLines.WithLabelValues(t.path).Inc() @@ -137,25 +152,28 @@ func (t *tailer) markPositionAndSize() error { return nil } -func (t *tailer) stop() error { +func (t *tailer) stop() { // Save the current position before shutting down tailer err := t.markPositionAndSize() if err != nil { - level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err) + level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) } - err = t.tail.Stop() close(t.quit) <-t.done - filesActive.Add(-1.) + level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) + return +} + +func (t *tailer) isRunning() bool { + return t.running.Load() +} + +// cleanupMetrics removes all metrics exported by this tailer +func (t *tailer) cleanupMetrics() { // When we stop tailing the file, also un-export metrics related to the file + filesActive.Add(-1.) readLines.DeleteLabelValues(t.path) readBytes.DeleteLabelValues(t.path) totalBytes.DeleteLabelValues(t.path) logLengthHistogram.DeleteLabelValues(t.path) - level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) - return err -} - -func (t *tailer) cleanup() { - t.positions.Remove(t.path) }