Skip to content

Commit

Permalink
update the position and size metrics at the same time to try to end t…
Browse files Browse the repository at this point in the history
…he race between these two metrics and the two timers that care about position and size

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean committed Jan 13, 2020
1 parent 3a2e5d4 commit 2a9b69f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
8 changes: 3 additions & 5 deletions pkg/promtail/targets/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (t *FileTarget) run() {
defer func() {
helpers.LogError("closing watcher", t.watcher.Close)
for _, v := range t.tails {
helpers.LogError("updating tailer last position", v.markPosition)
helpers.LogError("updating tailer last position", v.markPositionAndSize)
helpers.LogError("stopping tailer", v.stop)
}
level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
Expand Down Expand Up @@ -325,15 +325,13 @@ func toStopTailing(nt []string, et map[string]*tailer) []string {

func (t *FileTarget) reportSize(ms []string) {
for _, m := range ms {
// Ask the tailer for the file size as it keeps a direct handle on the file
// and avoids issues with renaming/replacing a file
// Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync
if tailer, ok := t.tails[m]; ok {
s, err := tailer.size()
err := tailer.markPositionAndSize()
if err != nil {
level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err)
return
}
totalBytes.WithLabelValues(m).Set(float64(s))
} else {
// Must be a new file, just directly read the size of it
fi, err := os.Stat(m)
Expand Down
27 changes: 16 additions & 11 deletions pkg/promtail/targets/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package targets

import (
"os"
"sync"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -22,6 +23,8 @@ type tailer struct {
path string
tail *tail.Tail

posAndSizeMtx sync.Mutex

quit chan struct{}
done chan struct{}
}
Expand Down Expand Up @@ -86,9 +89,9 @@ func (t *tailer) run() {
for {
select {
case <-positionWait.C:
err := t.markPosition()
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err)
level.Error(t.logger).Log("msg", "error getting tail position and/or size", "path", t.path, "error", err)
continue
}

Expand All @@ -112,28 +115,30 @@ func (t *tailer) run() {
}
}

func (t *tailer) markPosition() error {
func (t *tailer) markPositionAndSize() error {
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
t.posAndSizeMtx.Lock()
defer t.posAndSizeMtx.Unlock()

pos, err := t.tail.Tell()
if err != nil {
return err
}

readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)
return nil
}

func (t *tailer) size() (int64, error) {
s, err := t.tail.Size()
size, err := t.tail.Size()
if err != nil {
return 0, err
return err
}
return s, nil
totalBytes.WithLabelValues(t.path).Set(float64(size))

return nil
}

func (t *tailer) stop() error {
// Save the current position before shutting down tailer
err := t.markPosition()
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err)
}
Expand Down

0 comments on commit 2a9b69f

Please sign in to comment.