Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe data to Promtail #1649

Merged
merged 12 commits into from
Feb 11, 2020
3 changes: 1 addition & 2 deletions cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ func main() {
}

level.Info(util.Logger).Log("msg", "Starting Promtail", "version", version.Info())
defer p.Shutdown()

if err := p.Run(); err != nil {
level.Error(util.Logger).Log("msg", "error starting promtail", "error", err)
os.Exit(1)
}

p.Shutdown()
}
47 changes: 47 additions & 0 deletions docs/clients/promtail/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,53 @@
This document describes known failure modes of `promtail` on edge cases and the
adopted trade-offs.

## Pipe data to Promtail

Promtail supports piping data for sending logs to Loki. This is a very useful way to troubleshooting your configuration.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slim-bean Feel free to reword here. Also should this gets its own page ? I think for now having it in troubleshooting is fine.

Once you have promtail installed you can for instance use the following command to send logs to a local Loki instance:

```bash
cat my.log | promtail --client.url http://127.0.0.1:3100/loki/api/v1/push
```

You can also add additional labels from command line using:

```bash
cat my.log | promtail --client.url http://127.0.0.1:3100/loki/api/v1/push --client.external-labels=k1=v1,k2=v2
```

This will add labels `k1` and `k2` with respective values `v1` and `v2`.

In pipe mode Promtail also support file configuration using `--config.file`, however do note that positions config is not used and
only **the first scrape config is used**.

[`static_configs:`](./configuration) can be used to provide static labels, although the targets property is ignored.

If you don't provide any [`scrape_config:`](./configuration#scrape_config) a default one is used which will automatically adds the following default labels: `{job="stdin",hostname="<detected_hostname>"}`.

For example you could use this config below to parse and add the label `level` on all your piped logs:

```yaml
clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: system
pipeline_stages:
- regex:
expression: '(level|lvl|severity)=(?P<level>\\w+)'
- labels:
level:
static_configs:
- labels:
job: my-stdin-logs
```

```
cat my.log | promtail --config.file promtail.yaml
```


## A tailed file is truncated while `promtail` is not running

Given the following order of events:
Expand Down
47 changes: 29 additions & 18 deletions pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package promtail

import (
"sync"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/config"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/server"
"github.com/grafana/loki/pkg/promtail/targets"
)

// Promtail is the root struct for Promtail...
type Promtail struct {
client client.Client
positions *positions.Positions
targetManagers *targets.TargetManagers
server *server.Server

stopped bool
mtx sync.Mutex
}

// New makes a new Promtail.
func New(cfg config.Config) (*Promtail, error) {
positions, err := positions.New(util.Logger, cfg.PositionsConfig)
if err != nil {
return nil, err
}

if cfg.ClientConfig.URL.URL != nil {
// if a single client config is used we add it to the multiple client config for backward compatibility
Expand All @@ -35,33 +34,45 @@ func New(cfg config.Config) (*Promtail, error) {
return nil, err
}

tms, err := targets.NewTargetManagers(util.Logger, positions, client, cfg.ScrapeConfig, &cfg.TargetConfig)
promtail := &Promtail{
client: client,
}

tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, client, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
}

promtail.targetManagers = tms
server, err := server.New(cfg.ServerConfig, tms)
if err != nil {
return nil, err
}

return &Promtail{
client: client,
positions: positions,
targetManagers: tms,
server: server,
}, nil
promtail.server = server
return promtail, nil
}

// Run the promtail; will block until a signal is received.
func (p *Promtail) Run() error {
p.mtx.Lock()
// if we stopped promtail before the server even started we can return without starting.
if p.stopped {
p.mtx.Unlock()
return nil
rfratto marked this conversation as resolved.
Show resolved Hide resolved
}
p.mtx.Unlock() // unlock before blocking
return p.server.Run()
}

// Shutdown the promtail.
func (p *Promtail) Shutdown() {
p.server.Shutdown()
p.targetManagers.Stop()
p.positions.Stop()
p.mtx.Lock()
defer p.mtx.Unlock()
p.stopped = true
if p.server != nil {
p.server.Shutdown()
}
if p.targetManagers != nil {
p.targetManagers.Stop()
}
p.client.Stop()
}
27 changes: 25 additions & 2 deletions pkg/promtail/targets/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package targets

import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"

Expand All @@ -19,12 +20,14 @@ type targetManager interface {
// TargetManagers manages a list of target managers.
type TargetManagers struct {
targetManagers []targetManager
positions *positions.Positions
}

// NewTargetManagers makes a new TargetManagers
func NewTargetManagers(
app Shutdownable,
logger log.Logger,
positions *positions.Positions,
positionsConfig positions.Config,
client api.EntryHandler,
scrapeConfigs []scrape.Config,
targetConfig *Config,
Expand All @@ -34,6 +37,20 @@ func NewTargetManagers(
var journalScrapeConfigs []scrape.Config
var syslogScrapeConfigs []scrape.Config

if isStdinPipe() {
stdin, err := newStdinTargetManager(app, client, scrapeConfigs)
if err != nil {
return nil, err
}
targetManagers = append(targetManagers, stdin)
return &TargetManagers{targetManagers: targetManagers}, nil
}

positions, err := positions.New(util.Logger, positionsConfig)
if err != nil {
return nil, err
}

for _, cfg := range scrapeConfigs {
if cfg.HasServiceDiscoveryConfig() {
fileScrapeConfigs = append(fileScrapeConfigs, cfg)
Expand Down Expand Up @@ -84,7 +101,10 @@ func NewTargetManagers(
targetManagers = append(targetManagers, syslogTargetManager)
}

return &TargetManagers{targetManagers: targetManagers}, nil
return &TargetManagers{
targetManagers: targetManagers,
positions: positions,
}, nil

}

Expand Down Expand Up @@ -125,4 +145,7 @@ func (tm *TargetManagers) Stop() {
for _, t := range tm.targetManagers {
t.Stop()
}
if tm.positions != nil {
tm.positions.Stop()
}
}
168 changes: 168 additions & 0 deletions pkg/promtail/targets/stdin_target_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package targets

import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/scrape"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

// bufferSize is the size of the buffered reader
const bufferSize = 8096

// file is an interface allowing us to abstract a file.
type file interface {
Stat() (os.FileInfo, error)
io.Reader
}

var (
// stdIn is os.Stdin but can be replaced for testing purpose.
stdIn file = os.Stdin
hostName, _ = os.Hostname()
// defaultStdInCfg is the default config for stdin target if none provided.
defaultStdInCfg = scrape.Config{
JobName: "stdin",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*targetgroup.Group{
{Labels: model.LabelSet{"job": "stdin"}},
{Labels: model.LabelSet{"hostname": model.LabelValue(hostName)}},
},
},
}
)

func isStdinPipe() bool {
info, err := stdIn.Stat()
if err != nil {
level.Warn(util.Logger).Log("err", err)
return false
}
m := info.Mode()
if m&os.ModeCharDevice != 0 || info.Size() <= 0 {
return false
}
return true
}

type Shutdownable interface {
Shutdown()
}

type stdinTargetManager struct {
*readerTarget
app Shutdownable
}

func newStdinTargetManager(app Shutdownable, client api.EntryHandler, configs []scrape.Config) (*stdinTargetManager, error) {
reader, err := newReaderTarget(stdIn, client, getStdinConfig(configs))
if err != nil {
return nil, err
}
stdinManager := &stdinTargetManager{
readerTarget: reader,
app: app,
}
// when we're done flushing our stdin we can shutdown the app.
go func() {
<-reader.ctx.Done()
app.Shutdown()
}()
return stdinManager, nil
}

func getStdinConfig(configs []scrape.Config) scrape.Config {
cfg := defaultStdInCfg
// if we receive configs we use the first one.
if len(configs) > 0 {
if len(configs) > 1 {
level.Warn(util.Logger).Log("msg", fmt.Sprintf("too many scrape configs, skipping %d configs.", len(configs)-1))
}
cfg = configs[0]
}
return cfg
}

func (t *stdinTargetManager) Ready() bool {
return t.ctx.Err() == nil
}
func (t *stdinTargetManager) Stop() { t.cancel() }
func (t *stdinTargetManager) ActiveTargets() map[string][]Target { return nil }
func (t *stdinTargetManager) AllTargets() map[string][]Target { return nil }

type readerTarget struct {
in *bufio.Reader
out api.EntryHandler
lbs model.LabelSet
logger log.Logger

cancel context.CancelFunc
ctx context.Context
}

func newReaderTarget(in io.Reader, client api.EntryHandler, cfg scrape.Config) (*readerTarget, error) {
pipeline, err := stages.NewPipeline(log.With(util.Logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
lbs := model.LabelSet{}
for _, static := range cfg.ServiceDiscoveryConfig.StaticConfigs {
if static != nil && static.Labels != nil {
lbs = lbs.Merge(static.Labels)
}
}
ctx, cancel := context.WithCancel(context.Background())
t := &readerTarget{
in: bufio.NewReaderSize(in, bufferSize),
out: pipeline.Wrap(client),
cancel: cancel,
ctx: ctx,
lbs: lbs,
logger: log.With(util.Logger, "component", "reader"),
}
go t.read()

return t, nil
}

func (t *readerTarget) read() {
defer t.cancel()

for {
if t.ctx.Err() != nil {
return
}
line, err := t.in.ReadString('\n')
if err != nil && err != io.EOF {
level.Warn(t.logger).Log("msg", "error reading buffer", "err", err)
return
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
if err == io.EOF {
return
}
continue
}
if err := t.out.Handle(t.lbs, time.Now(), line); err != nil {
level.Error(t.logger).Log("msg", "error sending line", "err", err)
}
if err == io.EOF {
return
}
}
}
Loading