Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

--dry-run Promtail. #1652

Merged
merged 10 commits into from
Feb 13, 2020
3 changes: 2 additions & 1 deletion cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func init() {

func main() {
printVersion := flag.Bool("version", false, "Print this builds version information")
dryRun := flag.Bool("dry-run", false, "Start Promtail but print entries instead of sending them to Loki.")

// Load config, merging config file and CLI flags
var config config.Config
Expand Down Expand Up @@ -57,7 +58,7 @@ func main() {
stages.Debug = true
}

p, err := promtail.New(config)
p, err := promtail.New(config, *dryRun)
if err != nil {
level.Error(util.Logger).Log("msg", "error creating promtail", "error", err)
os.Exit(1)
Expand Down
13 changes: 13 additions & 0 deletions docs/clients/promtail/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
This document describes known failure modes of `promtail` on edge cases and the
adopted trade-offs.

## Dry running

Promtail can be configured to print log stream entries instead of sending them to Loki.
This can be used in combination with [piping data](#pipe-data-to-promtail) to debug or troubleshoot promtail log parsing.

In dry run mode, Promtail still support reading from a [positions](configuration.md#position_config) file however no update will be made to the targeted file, this is to ensure you can easily retry the same set of lines.

To start Promtail in dry run mode use the flag `--dry-run` as shown in the example below:

```bash
cat my.log | promtail --dry-run --client.url http://127.0.0.1:3100/loki/api/v1/push
```

## Pipe data to Promtail

Promtail supports piping data for sending logs to Loki. This is a very useful way to troubleshooting your configuration.
Expand Down
56 changes: 56 additions & 0 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package client

import (
"fmt"
"os"
"sync"
"text/tabwriter"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/fatih/color"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
)

type logger struct {
*tabwriter.Writer
sync.Mutex
}

// NewLogger creates a new client logger that logs entries instead of sending them.
func NewLogger(cfgs ...Config) (Client, error) {
// make sure the clients config is valid
c, err := NewMulti(util.Logger, cfgs...)
if err != nil {
return nil, err
}
c.Stop()
fmt.Println(color.YellowString("Clients configured:"))
for _, cfg := range cfgs {
yaml, err := yaml.Marshal(cfg)
if err != nil {
return nil, err
}
fmt.Println("----------------------")
fmt.Println(string(yaml))
}
return &logger{
Writer: tabwriter.NewWriter(os.Stdout, 0, 8, 0, '\t', 0),
}, nil
}

func (*logger) Stop() {}

func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error {
l.Lock()
defer l.Unlock()
fmt.Fprint(l.Writer, color.BlueString(time.Format("2006-01-02T15:04:05")))
fmt.Fprint(l.Writer, "\t")
fmt.Fprint(l.Writer, color.YellowString(labels.String()))
fmt.Fprint(l.Writer, "\t")
fmt.Fprint(l.Writer, entry)
fmt.Fprint(l.Writer, "\n")
l.Flush()
return nil
}
21 changes: 21 additions & 0 deletions pkg/promtail/client/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import (
"net/url"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func TestNewLogger(t *testing.T) {
_, err := NewLogger([]Config{}...)
require.Error(t, err)

l, err := NewLogger([]Config{{URL: flagext.URLValue{URL: &url.URL{Host: "string"}}}}...)
require.NoError(t, err)
err = l.Handle(model.LabelSet{"foo": "bar"}, time.Now(), "entry")
require.NoError(t, err)
}
72 changes: 43 additions & 29 deletions pkg/promtail/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
IgnoreInvalidYaml bool `yaml:"ignore_invalid_yaml"`
ReadOnly bool `yaml:"-"`
}

// RegisterFlags register flags.
Expand All @@ -33,7 +34,7 @@ func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
}

// Positions tracks how far through each file we've read.
type Positions struct {
type positions struct {
logger log.Logger
cfg Config
mtx sync.Mutex
Expand All @@ -47,17 +48,40 @@ type File struct {
Positions map[string]string `yaml:"positions"`
}

type Positions interface {
// GetString returns how far we've through a file as a string.
// JournalTarget writes a journal cursor to the positions file, while
// FileTarget writes an integer offset. Use Get to read the integer
// offset.
GetString(path string) string
// Get returns how far we've read through a file. Returns an error
// if the value stored for the file is not an integer.
Get(path string) (int64, error)
// PutString records (asynchronsouly) how far we've read through a file.
// Unlike Put, it records a string offset and is only useful for
// JournalTargets which doesn't have integer offsets.
PutString(path string, pos string)
// Put records (asynchronously) how far we've read through a file.
Put(path string, pos int64)
// Remove removes the position tracking for a filepath
Remove(path string)
// SyncPeriod returns how often the positions file gets resynced
SyncPeriod() time.Duration
// Stop the Position tracker.
Stop()
}

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (*Positions, error) {
positions, err := readPositionsFile(cfg, logger)
func New(logger log.Logger, cfg Config) (Positions, error) {
positionData, err := readPositionsFile(cfg, logger)
if err != nil {
return nil, err
}

p := &Positions{
p := &positions{
logger: logger,
cfg: cfg,
positions: positions,
positions: positionData,
quit: make(chan struct{}),
done: make(chan struct{}),
}
Expand All @@ -66,39 +90,28 @@ func New(logger log.Logger, cfg Config) (*Positions, error) {
return p, nil
}

// Stop the Position tracker.
func (p *Positions) Stop() {
func (p *positions) Stop() {
close(p.quit)
<-p.done
}

// PutString records (asynchronsouly) how far we've read through a file.
// Unlike Put, it records a string offset and is only useful for
// JournalTargets which doesn't have integer offsets.
func (p *Positions) PutString(path string, pos string) {
func (p *positions) PutString(path string, pos string) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.positions[path] = pos
}

// Put records (asynchronously) how far we've read through a file.
func (p *Positions) Put(path string, pos int64) {
func (p *positions) Put(path string, pos int64) {
p.PutString(path, strconv.FormatInt(pos, 10))
}

// GetString returns how far we've through a file as a string.
// JournalTarget writes a journal cursor to the positions file, while
// FileTarget writes an integer offset. Use Get to read the integer
// offset.
func (p *Positions) GetString(path string) string {
func (p *positions) GetString(path string) string {
p.mtx.Lock()
defer p.mtx.Unlock()
return p.positions[path]
}

// Get returns how far we've read through a file. Returns an error
// if the value stored for the file is not an integer.
func (p *Positions) Get(path string) (int64, error) {
func (p *positions) Get(path string) (int64, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
pos, ok := p.positions[path]
Expand All @@ -108,23 +121,21 @@ func (p *Positions) Get(path string) (int64, error) {
return strconv.ParseInt(pos, 10, 64)
}

// Remove removes the position tracking for a filepath
func (p *Positions) Remove(path string) {
func (p *positions) Remove(path string) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.remove(path)
}

func (p *Positions) remove(path string) {
func (p *positions) remove(path string) {
delete(p.positions, path)
}

// SyncPeriod returns how often the positions file gets resynced
func (p *Positions) SyncPeriod() time.Duration {
func (p *positions) SyncPeriod() time.Duration {
return p.cfg.SyncPeriod
}

func (p *Positions) run() {
func (p *positions) run() {
defer func() {
p.save()
level.Debug(p.logger).Log("msg", "positions saved")
Expand All @@ -143,7 +154,10 @@ func (p *Positions) run() {
}
}

func (p *Positions) save() {
func (p *positions) save() {
if p.cfg.ReadOnly {
return
}
p.mtx.Lock()
positions := make(map[string]string, len(p.positions))
for k, v := range p.positions {
Expand All @@ -156,7 +170,7 @@ func (p *Positions) save() {
}
}

func (p *Positions) cleanup() {
func (p *positions) cleanup() {
p.mtx.Lock()
defer p.mtx.Unlock()
toRemove := []string{}
Expand Down
44 changes: 44 additions & 0 deletions pkg/promtail/positions/positions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -136,3 +138,45 @@ func TestReadPositionsFromBadYamlIgnoreCorruption(t *testing.T) {
require.NoError(t, err)
require.Equal(t, map[string]string{}, out)
}

func Test_ReadOnly(t *testing.T) {
temp := tempFilename(t)
defer func() {
_ = os.Remove(temp)
}()
yaml := []byte(`positions:
/tmp/random.log: "17623"
`)
err := ioutil.WriteFile(temp, yaml, 0644)
if err != nil {
t.Fatal(err)
}
p, err := New(util.Logger, Config{
SyncPeriod: 20 * time.Nanosecond,
PositionsFile: temp,
ReadOnly: true,
})
if err != nil {
t.Fatal(err)
}
defer p.Stop()
p.Put("/foo/bar/f", 12132132)
p.PutString("/foo/f", "100")
pos, err := p.Get("/tmp/random.log")
if err != nil {
t.Fatal(err)
}
require.Equal(t, int64(17623), pos)
p.(*positions).save()
out, err := readPositionsFile(Config{
PositionsFile: temp,
IgnoreInvalidYaml: true,
ReadOnly: true,
}, log.NewNopLogger())

require.NoError(t, err)
require.Equal(t, map[string]string{
"/tmp/random.log": "17623",
}, out)

}
22 changes: 16 additions & 6 deletions pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,33 @@ type Promtail struct {
}

// New makes a new Promtail.
func New(cfg config.Config) (*Promtail, error) {
func New(cfg config.Config, dryRun bool) (*Promtail, error) {

if cfg.ClientConfig.URL.URL != nil {
// if a single client config is used we add it to the multiple client config for backward compatibility
cfg.ClientConfigs = append(cfg.ClientConfigs, cfg.ClientConfig)
}

client, err := client.NewMulti(util.Logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
var err error
var cl client.Client
if dryRun {
cl, err = client.NewLogger(cfg.ClientConfigs...)
if err != nil {
return nil, err
}
cfg.PositionsConfig.ReadOnly = true
} else {
cl, err = client.NewMulti(util.Logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
}
}

promtail := &Promtail{
client: client,
client: cl,
}

tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, client, cfg.ScrapeConfig, &cfg.TargetConfig)
tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, cl, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
}
Expand Down
Loading