Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Improve position and size metrics #1515

Merged
merged 1 commit into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions pkg/promtail/targets/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (t *FileTarget) run() {
defer func() {
helpers.LogError("closing watcher", t.watcher.Close)
for _, v := range t.tails {
helpers.LogError("updating tailer last position", v.markPosition)
helpers.LogError("updating tailer last position", v.markPositionAndSize)
helpers.LogError("stopping tailer", v.stop)
}
level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
Expand Down Expand Up @@ -325,15 +325,13 @@ func toStopTailing(nt []string, et map[string]*tailer) []string {

func (t *FileTarget) reportSize(ms []string) {
for _, m := range ms {
// Ask the tailer for the file size as it keeps a direct handle on the file
// and avoids issues with renaming/replacing a file
// Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync
if tailer, ok := t.tails[m]; ok {
s, err := tailer.size()
err := tailer.markPositionAndSize()
if err != nil {
level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err)
return
}
totalBytes.WithLabelValues(m).Set(float64(s))
} else {
// Must be a new file, just directly read the size of it
fi, err := os.Stat(m)
Expand Down
27 changes: 16 additions & 11 deletions pkg/promtail/targets/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package targets

import (
"os"
"sync"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -22,6 +23,8 @@ type tailer struct {
path string
tail *tail.Tail

posAndSizeMtx sync.Mutex

quit chan struct{}
done chan struct{}
}
Expand Down Expand Up @@ -86,9 +89,9 @@ func (t *tailer) run() {
for {
select {
case <-positionWait.C:
err := t.markPosition()
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err)
level.Error(t.logger).Log("msg", "error getting tail position and/or size", "path", t.path, "error", err)
continue
}

Expand All @@ -112,28 +115,30 @@ func (t *tailer) run() {
}
}

func (t *tailer) markPosition() error {
func (t *tailer) markPositionAndSize() error {
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
t.posAndSizeMtx.Lock()
defer t.posAndSizeMtx.Unlock()

pos, err := t.tail.Tell()
if err != nil {
return err
}

readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)
return nil
}

func (t *tailer) size() (int64, error) {
s, err := t.tail.Size()
size, err := t.tail.Size()
if err != nil {
return 0, err
return err
}
return s, nil
totalBytes.WithLabelValues(t.path).Set(float64(size))

return nil
}

func (t *tailer) stop() error {
// Save the current position before shutting down tailer
err := t.markPosition()
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err)
}
Expand Down