Skip to content

Commit

Permalink
restart the tailer if we fail to read and upate current position (#2532)
Browse files Browse the repository at this point in the history
  • Loading branch information
slim-bean authored Aug 21, 2020
1 parent 5fcd399 commit b6d9fd5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 28 deletions.
31 changes: 25 additions & 6 deletions pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ func (t *FileTarget) run() {
defer func() {
helpers.LogError("closing watcher", t.watcher.Close)
for _, v := range t.tails {
helpers.LogError("updating tailer last position", v.markPositionAndSize)
helpers.LogError("stopping tailer", v.stop)
v.stop()
}
level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
close(t.done)
Expand Down Expand Up @@ -245,12 +244,16 @@ func (t *FileTarget) sync() error {
// fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves.
t.watches = dirs

// Check if any running tailers have stopped because of errors and remove them from the running list
// (They will be restarted in startTailing)
t.pruneStoppedTailers()

// Start tailing all of the matched files if not already doing so.
t.startTailing(matches)

// Stop tailing any files which no longer exist
toStopTailing := toStopTailing(matches, t.tails)
t.stopTailing(toStopTailing)
t.stopTailingAndRemovePosition(toStopTailing)

return nil
}
Expand Down Expand Up @@ -304,16 +307,32 @@ func (t *FileTarget) startTailing(ps []string) {
}
}

func (t *FileTarget) stopTailing(ps []string) {
// stopTailingAndRemovePositions will stop the tailer and remove the positions entry.
// Call this when a file no longer exists and you want to remove all traces of it.
func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
for _, p := range ps {
if tailer, ok := t.tails[p]; ok {
helpers.LogError("stopping tailer", tailer.stop)
tailer.cleanup()
tailer.stop()
t.positions.Remove(tailer.path)
delete(t.tails, p)
}
}
}

// pruneStoppedTailers removes any tailers which have stopped running from
// the list of active tailers. This allows them to be restarted if there were errors.
func (t *FileTarget) pruneStoppedTailers() {
toRemove := make([]string, 0, len(t.tails))
for k, t := range t.tails {
if !t.isRunning() {
toRemove = append(toRemove, k)
}
}
for _, tr := range toRemove {
delete(t.tails, tr)
}
}

func toStopTailing(nt []string, et map[string]*tailer) []string {
// Make a set of all existing tails
existingTails := make(map[string]struct{}, len(et))
Expand Down
62 changes: 40 additions & 22 deletions pkg/promtail/targets/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/hpcloud/tail"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
Expand All @@ -25,8 +26,9 @@ type tailer struct {

posAndSizeMtx sync.Mutex

quit chan struct{}
done chan struct{}
running *atomic.Bool
quit chan struct{}
done chan struct{}
}

func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string) (*tailer, error) {
Expand All @@ -46,9 +48,9 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.
}

tail, err := tail.TailFile(path, tail.Config{
Follow: true,
Poll: true,
ReOpen: true,
Follow: true,
Poll: true,
ReOpen: true,
MustExist: true,
Location: &tail.SeekInfo{
Offset: pos,
Expand All @@ -64,11 +66,11 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.
logger: logger,
handler: api.AddLabelsMiddleware(model.LabelSet{FilenameLabel: model.LabelValue(path)}).Wrap(handler),
positions: positions,

path: path,
tail: tail,
quit: make(chan struct{}),
done: make(chan struct{}),
path: path,
tail: tail,
running: atomic.NewBool(false),
quit: make(chan struct{}),
done: make(chan struct{}),
}
tail.Logger = util.NewLogAdapter(logger)

Expand All @@ -81,9 +83,20 @@ func (t *tailer) run() {
level.Info(t.logger).Log("msg", "start tailing file", "path", t.path)
positionSyncPeriod := t.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
t.running.Store(true)

// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
err := t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "error stopping tailer when exiting tail goroutine", "path", t.path, "error", err)
}

positionWait.Stop()
t.cleanupMetrics()
t.running.Store(false)

close(t.done)
}()

Expand All @@ -92,17 +105,19 @@ func (t *tailer) run() {
case <-positionWait.C:
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position and/or size", "path", t.path, "error", err)
continue
level.Error(t.logger).Log("msg", "error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
return
}

case line, ok := <-t.tail.Lines:
if !ok {
return
}

// Note currently the tail implementation hardcodes Err to nil, this should never hit.
if line.Err != nil {
level.Error(t.logger).Log("msg", "error reading line", "path", t.path, "error", line.Err)
continue
}

readLines.WithLabelValues(t.path).Inc()
Expand Down Expand Up @@ -137,25 +152,28 @@ func (t *tailer) markPositionAndSize() error {
return nil
}

func (t *tailer) stop() error {
func (t *tailer) stop() {
// Save the current position before shutting down tailer
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "path", t.path, "error", err)
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err)
}
err = t.tail.Stop()
close(t.quit)
<-t.done
filesActive.Add(-1.)
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
return
}

func (t *tailer) isRunning() bool {
return t.running.Load()
}

// cleanupMetrics removes all metrics exported by this tailer
func (t *tailer) cleanupMetrics() {
// When we stop tailing the file, also un-export metrics related to the file
filesActive.Add(-1.)
readLines.DeleteLabelValues(t.path)
readBytes.DeleteLabelValues(t.path)
totalBytes.DeleteLabelValues(t.path)
logLengthHistogram.DeleteLabelValues(t.path)
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
return err
}

func (t *tailer) cleanup() {
t.positions.Remove(t.path)
}

0 comments on commit b6d9fd5

Please sign in to comment.