Skip to content

Commit

Permalink
positions.ignore-corruptions (#1472)
Browse files Browse the repository at this point in the history
* positions.ignore-corruptions

* semantic change: s/ignore-corruptions/ignore-invalid-yaml/

* Update pkg/promtail/positions/positions.go

Co-Authored-By: Robert Fratto <robertfratto@gmail.com>
  • Loading branch information
owen-d and rfratto committed Jan 6, 2020
1 parent 52931a1 commit 0b72ddb
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## master / unreleased

* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.

# 1.2.0 (2019-12-09)

One week has passed since the last Loki release, and it's time for a new one!
Expand Down
3 changes: 3 additions & 0 deletions docs/clients/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ is restarted to allow it to continue from where it left off.
# How often to update the positions file
[sync_period: <duration> | default = 10s]
# Whether to ignore & later overwrite positions files that are corrupted
[ignore_invalid_yaml: <boolean> | default = false]
```

## scrape_config
Expand Down
24 changes: 17 additions & 7 deletions pkg/promtail/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ const positionFileMode = 0600

// Config describes where to get postition information from.
type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
IgnoreInvalidYaml bool `yaml:"ignore_invalid_yaml"`
}

// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
flags.DurationVar(&cfg.SyncPeriod, "positions.sync-period", 10*time.Second, "Period with this to sync the position file.")
flag.StringVar(&cfg.PositionsFile, "positions.file", "/var/log/positions.yaml", "Location to read/write positions from.")
flag.BoolVar(&cfg.IgnoreInvalidYaml, "positions.ignore-invalid-yaml", false, "whether to ignore & later overwrite positions files that are corrupted")
}

// Positions tracks how far through each file we've read.
Expand All @@ -47,7 +49,7 @@ type File struct {

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (*Positions, error) {
positions, err := readPositionsFile(cfg.PositionsFile)
positions, err := readPositionsFile(cfg, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,8 +183,9 @@ func (p *Positions) cleanup() {
}
}

func readPositionsFile(filename string) (map[string]string, error) {
cleanfn := filepath.Clean(filename)
func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {

cleanfn := filepath.Clean(cfg.PositionsFile)
buf, err := ioutil.ReadFile(cleanfn)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -192,8 +195,15 @@ func readPositionsFile(filename string) (map[string]string, error) {
}

var p File
if err := yaml.UnmarshalStrict(buf, &p); err != nil {
return nil, fmt.Errorf("%s: %v", cleanfn, err)
err = yaml.UnmarshalStrict(buf, &p)
if err != nil {
// return empty if cfg option enabled
if cfg.IgnoreInvalidYaml {
level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
return map[string]string{}, nil
}

return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
}

return p.Positions, nil
Expand Down
39 changes: 36 additions & 3 deletions pkg/promtail/positions/positions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -44,7 +45,10 @@ func TestReadPositionsOK(t *testing.T) {
t.Fatal(err)
}

pos, err := readPositionsFile(temp)
pos, err := readPositionsFile(Config{
PositionsFile: temp,
}, log.NewNopLogger())

require.NoError(t, err)
require.Equal(t, "17623", pos["/tmp/random.log"])
}
Expand All @@ -60,7 +64,10 @@ func TestReadPositionsFromDir(t *testing.T) {
_ = os.Remove(temp)
}()

_, err = readPositionsFile(temp)
_, err = readPositionsFile(Config{
PositionsFile: temp,
}, log.NewNopLogger())

require.Error(t, err)
require.True(t, strings.Contains(err.Error(), temp)) // error must contain filename
}
Expand All @@ -79,7 +86,33 @@ func TestReadPositionsFromBadYaml(t *testing.T) {
t.Fatal(err)
}

_, err = readPositionsFile(temp)
_, err = readPositionsFile(Config{
PositionsFile: temp,
}, log.NewNopLogger())

require.Error(t, err)
require.True(t, strings.Contains(err.Error(), temp)) // error must contain filename
}

func TestReadPositionsFromBadYamlIgnoreCorruption(t *testing.T) {
temp := tempFilename(t)
defer func() {
_ = os.Remove(temp)
}()

badYaml := []byte(`positions:
/tmp/random.log: "176
`)
err := ioutil.WriteFile(temp, badYaml, 0644)
if err != nil {
t.Fatal(err)
}

out, err := readPositionsFile(Config{
PositionsFile: temp,
IgnoreInvalidYaml: true,
}, log.NewNopLogger())

require.NoError(t, err)
require.Equal(t, map[string]string{}, out)
}

0 comments on commit 0b72ddb

Please sign in to comment.