diff --git a/pkg/promtail/targets/filetarget.go b/pkg/promtail/targets/filetarget.go index 94f49817cafdc..2a70dd1e36161 100644 --- a/pkg/promtail/targets/filetarget.go +++ b/pkg/promtail/targets/filetarget.go @@ -153,7 +153,7 @@ func (t *FileTarget) run() { defer func() { helpers.LogError("closing watcher", t.watcher.Close) for _, v := range t.tails { - helpers.LogError("updating tailer last position", v.markPosition) + helpers.LogError("updating tailer last position", v.markPositionAndSize) helpers.LogError("stopping tailer", v.stop) } level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved") @@ -325,15 +325,13 @@ func toStopTailing(nt []string, et map[string]*tailer) []string { func (t *FileTarget) reportSize(ms []string) { for _, m := range ms { - // Ask the tailer for the file size as it keeps a direct handle on the file - // and avoids issues with renaming/replacing a file + // Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync if tailer, ok := t.tails[m]; ok { - s, err := tailer.size() + err := tailer.markPositionAndSize() if err != nil { level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err) return } - totalBytes.WithLabelValues(m).Set(float64(s)) } else { // Must be a new file, just directly read the size of it fi, err := os.Stat(m) diff --git a/pkg/promtail/targets/tailer.go b/pkg/promtail/targets/tailer.go index 103e91293b832..ed44867406d2d 100644 --- a/pkg/promtail/targets/tailer.go +++ b/pkg/promtail/targets/tailer.go @@ -2,6 +2,7 @@ package targets import ( "os" + "sync" "time" "github.com/go-kit/kit/log" @@ -22,6 +23,8 @@ type tailer struct { path string tail *tail.Tail + posAndSizeMtx sync.Mutex + quit chan struct{} done chan struct{} } @@ -86,9 +89,9 @@ func (t *tailer) run() { for { select { case <-positionWait.C: - err := t.markPosition() + err := t.markPositionAndSize() if err != nil { - level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err) + level.Error(t.logger).Log("msg", "error getting tail position and/or size", "path", t.path, "error", err) continue } @@ -112,28 +115,30 @@ func (t *tailer) run() { } } -func (t *tailer) markPosition() error { +func (t *tailer) markPositionAndSize() error { + // Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file. + t.posAndSizeMtx.Lock() + defer t.posAndSizeMtx.Unlock() + pos, err := t.tail.Tell() if err != nil { return err } - readBytes.WithLabelValues(t.path).Set(float64(pos)) t.positions.Put(t.path, pos) - return nil -} -func (t *tailer) size() (int64, error) { - s, err := t.tail.Size() + size, err := t.tail.Size() if err != nil { - return 0, err + return err } - return s, nil + totalBytes.WithLabelValues(t.path).Set(float64(size)) + + return nil } func (t *tailer) stop() error { // Save the current position before shutting down tailer - err := t.markPosition() + err := t.markPositionAndSize() if err != nil { level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err) }