diff --git a/cmd/docker-driver/loki.go b/cmd/docker-driver/loki.go index 63a736bdcb934..e8a313ee46697 100644 --- a/cmd/docker-driver/loki.go +++ b/cmd/docker-driver/loki.go @@ -33,7 +33,7 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) { if err != nil { return nil, err } - c, err := client.New(cfg.clientConfig, logger) + c, err := client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger) if err != nil { return nil, err } diff --git a/cmd/fluent-bit/client.go b/cmd/fluent-bit/client.go index 42be94a06bcd6..25bdb05b2bd5d 100644 --- a/cmd/fluent-bit/client.go +++ b/cmd/fluent-bit/client.go @@ -2,6 +2,7 @@ package main import ( "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/promtail/client" ) @@ -11,5 +12,5 @@ func NewClient(cfg *config, logger log.Logger) (client.Client, error) { if cfg.bufferConfig.buffer { return NewBuffer(cfg, logger) } - return client.New(cfg.clientConfig, logger) + return client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger) } diff --git a/cmd/fluent-bit/dque.go b/cmd/fluent-bit/dque.go index 6515b11307f97..9648ba42e7ff2 100644 --- a/cmd/fluent-bit/dque.go +++ b/cmd/fluent-bit/dque.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/joncrlsn/dque" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" @@ -71,7 +72,7 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) { _ = q.queue.TurboOn() } - q.loki, err = client.New(cfg.clientConfig, logger) + q.loki, err = client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger) if err != nil { return nil, err } diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index f59dea5146bc8..4c4054fdfb156 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -42,60 +42,62 @@ const ( ) var ( - encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + UserAgent = fmt.Sprintf("promtail/%s", version.Version) +) + +type metrics struct { + encodedBytes *prometheus.CounterVec + sentBytes *prometheus.CounterVec + droppedBytes *prometheus.CounterVec + sentEntries *prometheus.CounterVec + droppedEntries *prometheus.CounterVec + requestDuration *prometheus.HistogramVec + batchRetries *prometheus.CounterVec + streamLag *metric.Gauges + countersWithHost []*prometheus.CounterVec +} + +func newMetrics(reg prometheus.Registerer) *metrics { + var m metrics + + m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", }, []string{HostLabel}) - sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_bytes_total", Help: "Number of bytes sent.", }, []string{HostLabel}) - droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", }, []string{HostLabel}) - sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_entries_total", Help: "Number of log entries sent to the ingester.", }, []string{HostLabel}) - droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", }, []string{HostLabel}) - requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "promtail", Name: "request_duration_seconds", Help: "Duration of send requests.", }, []string{"status_code", HostLabel}) - batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "batch_retries_total", Help: "Number of times batches has had to be retried.", }, []string{HostLabel}) - streamLag *metric.Gauges - - countersWithHost = []*prometheus.CounterVec{ - encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries, - } - UserAgent = fmt.Sprintf("promtail/%s", version.Version) -) - -func init() { - prometheus.MustRegister(encodedBytes) - prometheus.MustRegister(sentBytes) - prometheus.MustRegister(droppedBytes) - prometheus.MustRegister(sentEntries) - prometheus.MustRegister(droppedEntries) - prometheus.MustRegister(requestDuration) - prometheus.MustRegister(batchRetries) var err error - streamLag, err = metric.NewGauges("promtail_stream_lag_seconds", + m.streamLag, err = metric.NewGauges("promtail_stream_lag_seconds", "Difference between current time and last batch timestamp for successful sends", metric.GaugeConfig{Action: "set"}, int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric. @@ -103,7 +105,25 @@ func init() { if err != nil { panic(err) } - prometheus.MustRegister(streamLag) + + m.countersWithHost = []*prometheus.CounterVec{ + m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries, + } + + if reg != nil { + reg.MustRegister( + m.encodedBytes, + m.sentBytes, + m.droppedBytes, + m.sentEntries, + m.droppedEntries, + m.requestDuration, + m.batchRetries, + m.streamLag, + ) + } + + return &m } // Client pushes entries to Loki and can be stopped @@ -115,6 +135,7 @@ type Client interface { // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { + metrics *metrics logger log.Logger cfg Config client *http.Client @@ -131,7 +152,7 @@ type client struct { } // New makes a new Client. -func New(cfg Config, logger log.Logger) (Client, error) { +func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) { if cfg.URL.URL == nil { return nil, errors.New("client needs target URL") } @@ -142,6 +163,7 @@ func New(cfg Config, logger log.Logger) (Client, error) { logger: log.With(logger, "component", "client", "host", cfg.URL.Host), cfg: cfg, entries: make(chan api.Entry), + metrics: newMetrics(reg), externalLabels: cfg.ExternalLabels.LabelSet, ctx: ctx, @@ -162,7 +184,7 @@ func New(cfg Config, logger log.Logger) (Client, error) { // Initialize counters to 0 so the metrics are exported before the first // occurrence of incrementing to avoid missing metrics. - for _, counter := range countersWithHost { + for _, counter := range c.metrics.countersWithHost { counter.WithLabelValues(c.cfg.URL.Host).Add(0) } @@ -249,7 +271,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) { return } bufBytes := float64(len(buf)) - encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig) var status int @@ -258,11 +280,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) { // send uses `timeout` internally, so `context.Background` is good enough. status, err = c.send(context.Background(), tenantID, buf) - requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) + c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) if err == nil { - sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) for _, s := range batch.streams { lbls, err := parser.ParseMetric(s.Labels) if err != nil { @@ -280,7 +302,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) { } } if lblSet != nil { - streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) + c.metrics.streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) } } return @@ -292,7 +314,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) { } level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) - batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() + c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() backoff.Wait() // Make sure it sends at least once before checking for retry. @@ -303,8 +325,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) { if err != nil { level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err) - droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) } } @@ -382,5 +404,5 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) { func (c *client) UnregisterLatencyMetric(labels model.LabelSet) { labels[HostLabel] = model.LabelValue(c.cfg.URL.Host) - streamLag.Delete(labels) + c.metrics.streamLag.Delete(labels) } diff --git a/pkg/promtail/client/client_test.go b/pkg/promtail/client/client_test.go index 321c356be313e..9cd14271062df 100644 --- a/pkg/promtail/client/client_test.go +++ b/pkg/promtail/client/client_test.go @@ -238,9 +238,7 @@ func TestClient_Handle(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - // Reset metrics - sentEntries.Reset() - droppedEntries.Reset() + reg := prometheus.NewRegistry() // Create a buffer channel where we do enqueue received requests receivedReqsChan := make(chan receivedReq, 10) @@ -267,7 +265,7 @@ func TestClient_Handle(t *testing.T) { TenantID: testData.clientTenantID, } - c, err := New(cfg, log.NewNopLogger()) + c, err := New(reg, cfg, log.NewNopLogger()) require.NoError(t, err) // Send all the input log entries @@ -301,7 +299,7 @@ func TestClient_Handle(t *testing.T) { require.ElementsMatch(t, testData.expectedReqs, receivedReqs) expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1) - err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total") + err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total") assert.NoError(t, err) }) } @@ -372,9 +370,7 @@ func TestClient_StopNow(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - // Reset metrics - sentEntries.Reset() - droppedEntries.Reset() + reg := prometheus.NewRegistry() // Create a buffer channel where we do enqueue received requests receivedReqsChan := make(chan receivedReq, 10) @@ -401,7 +397,7 @@ func TestClient_StopNow(t *testing.T) { TenantID: c.clientTenantID, } - cl, err := New(cfg, log.NewNopLogger()) + cl, err := New(reg, cfg, log.NewNopLogger()) require.NoError(t, err) // Send all the input log entries @@ -441,7 +437,7 @@ func TestClient_StopNow(t *testing.T) { require.ElementsMatch(t, c.expectedReqs, receivedReqs) expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1) - err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total") + err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total") assert.NoError(t, err) }) } diff --git a/pkg/promtail/client/logger.go b/pkg/promtail/client/logger.go index 9a8f960179d44..9f18024e991aa 100644 --- a/pkg/promtail/client/logger.go +++ b/pkg/promtail/client/logger.go @@ -9,6 +9,7 @@ import ( "github.com/fatih/color" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/promtail/api" @@ -36,9 +37,9 @@ type logger struct { } // NewLogger creates a new client logger that logs entries instead of sending them. -func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) (Client, error) { +func NewLogger(reg prometheus.Registerer, log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) (Client, error) { // make sure the clients config is valid - c, err := NewMulti(log, externalLabels, cfgs...) + c, err := NewMulti(reg, log, externalLabels, cfgs...) if err != nil { return nil, err } diff --git a/pkg/promtail/client/logger_test.go b/pkg/promtail/client/logger_test.go index 72f17a4c7ba44..825a2a7bd8fbc 100644 --- a/pkg/promtail/client/logger_test.go +++ b/pkg/promtail/client/logger_test.go @@ -16,12 +16,11 @@ import ( ) func TestNewLogger(t *testing.T) { - _, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{}...) + _, err := NewLogger(nil, util.Logger, flagext.LabelSet{}, []Config{}...) require.Error(t, err) - l, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...) + l, err := NewLogger(nil, util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...) require.NoError(t, err) l.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Now(), Line: "entry"}} l.Stop() - } diff --git a/pkg/promtail/client/multi.go b/pkg/promtail/client/multi.go index 9f962c3fd2e60..380adaf5f5a40 100644 --- a/pkg/promtail/client/multi.go +++ b/pkg/promtail/client/multi.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/util/flagext" @@ -20,7 +21,7 @@ type MultiClient struct { } // NewMulti creates a new client -func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) { +func NewMulti(reg prometheus.Registerer, logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) { if len(cfgs) == 0 { return nil, errors.New("at least one client config should be provided") } @@ -34,7 +35,7 @@ func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config // not typically the order of precedence, the assumption here is someone providing a specific config in // yaml is doing so explicitly to make a key specific to a client. cfg.ExternalLabels = flagext.LabelSet{LabelSet: externalLabels.Merge(cfg.ExternalLabels.LabelSet)} - client, err := New(cfg, logger) + client, err := New(reg, cfg, logger) if err != nil { return nil, err } diff --git a/pkg/promtail/client/multi_test.go b/pkg/promtail/client/multi_test.go index 980c7ed7ee90e..b23cc40923467 100644 --- a/pkg/promtail/client/multi_test.go +++ b/pkg/promtail/client/multi_test.go @@ -18,7 +18,7 @@ import ( ) func TestNewMulti(t *testing.T) { - _, err := NewMulti(util.Logger, lokiflag.LabelSet{}, []Config{}...) + _, err := NewMulti(nil, util.Logger, lokiflag.LabelSet{}, []Config{}...) if err == nil { t.Fatal("expected err but got nil") } @@ -37,7 +37,7 @@ func TestNewMulti(t *testing.T) { ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"hi": "there"}}, } - clients, err := NewMulti(util.Logger, lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "command"}}, cc1, cc2) + clients, err := NewMulti(nil, util.Logger, lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "command"}}, cc1, cc2) if err != nil { t.Fatalf("expected err: nil got:%v", err) } @@ -98,7 +98,6 @@ func TestMultiClient_Stop(t *testing.T) { } func TestMultiClient_Handle(t *testing.T) { - f := fake.New(func() {}) clients := []Client{f, f, f, f, f, f} m := &MultiClient{ @@ -114,5 +113,4 @@ func TestMultiClient_Handle(t *testing.T) { if len(f.Received()) != len(clients) { t.Fatal("missing handle call") } - } diff --git a/pkg/promtail/promtail.go b/pkg/promtail/promtail.go index 82a6859cd6e6b..777df8c1cc4dc 100644 --- a/pkg/promtail/promtail.go +++ b/pkg/promtail/promtail.go @@ -5,6 +5,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/promtail/client" "github.com/grafana/loki/pkg/promtail/config" @@ -23,12 +24,20 @@ func WithLogger(log log.Logger) Option { } } -// Promtail is the root struct for Promtail... +// WithRegisterer overrides the default registerer for Promtail. +func WithRegisterer(reg prometheus.Registerer) Option { + return func(p *Promtail) { + p.reg = reg + } +} + +// Promtail is the root struct for Promtail. type Promtail struct { client client.Client targetManagers *targets.TargetManagers server server.Server logger log.Logger + reg prometheus.Registerer stopped bool mtx sync.Mutex @@ -40,6 +49,7 @@ func New(cfg config.Config, dryRun bool, opts ...Option) (*Promtail, error) { // them. promtail := &Promtail{ logger: util.Logger, + reg: prometheus.DefaultRegisterer, } for _, o := range opts { o(promtail) @@ -61,19 +71,19 @@ func New(cfg config.Config, dryRun bool, opts ...Option) (*Promtail, error) { var err error if dryRun { - promtail.client, err = client.NewLogger(promtail.logger, cfg.ClientConfig.ExternalLabels, cfg.ClientConfigs...) + promtail.client, err = client.NewLogger(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfig.ExternalLabels, cfg.ClientConfigs...) if err != nil { return nil, err } cfg.PositionsConfig.ReadOnly = true } else { - promtail.client, err = client.NewMulti(promtail.logger, cfg.ClientConfig.ExternalLabels, cfg.ClientConfigs...) + promtail.client, err = client.NewMulti(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfig.ExternalLabels, cfg.ClientConfigs...) if err != nil { return nil, err } } - tms, err := targets.NewTargetManagers(promtail, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig) + tms, err := targets.NewTargetManagers(promtail, promtail.reg, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig) if err != nil { return nil, err } diff --git a/pkg/promtail/targets/file/filetarget.go b/pkg/promtail/targets/file/filetarget.go index f22170f26dbd9..8a380e91c6beb 100644 --- a/pkg/promtail/targets/file/filetarget.go +++ b/pkg/promtail/targets/file/filetarget.go @@ -10,8 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" fsnotify "gopkg.in/fsnotify.v1" @@ -22,35 +20,6 @@ import ( "github.com/grafana/loki/pkg/promtail/targets/target" ) -var ( - readBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "promtail", - Name: "read_bytes_total", - Help: "Number of bytes read.", - }, []string{"path"}) - totalBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "promtail", - Name: "file_bytes_total", - Help: "Number of bytes total.", - }, []string{"path"}) - readLines = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "read_lines_total", - Help: "Number of lines read.", - }, []string{"path"}) - filesActive = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "promtail", - Name: "files_active_total", - Help: "Number of active files.", - }) - logLengthHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "promtail", - Name: "log_entries_bytes", - Help: "the total count of bytes", - Buckets: prometheus.ExponentialBuckets(16, 2, 8), - }, []string{"path"}) -) - const ( FilenameLabel = "filename" ) @@ -76,7 +45,8 @@ func (cfg *Config) RegisterFlags(flags *flag.FlagSet) { // FileTarget describes a particular set of logs. // nolint:golint type FileTarget struct { - logger log.Logger + metrics *Metrics + logger log.Logger handler api.EntryHandler positions positions.Positions @@ -95,7 +65,7 @@ type FileTarget struct { } // NewFileTarget create a new FileTarget. -func NewFileTarget(logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string, labels model.LabelSet, discoveredLabels model.LabelSet, targetConfig *Config) (*FileTarget, error) { +func NewFileTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string, labels model.LabelSet, discoveredLabels model.LabelSet, targetConfig *Config) (*FileTarget, error) { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -104,6 +74,7 @@ func NewFileTarget(logger log.Logger, handler api.EntryHandler, positions positi t := &FileTarget{ logger: logger, + metrics: metrics, watcher: watcher, path: path, labels: labels, @@ -301,7 +272,7 @@ func (t *FileTarget) startTailing(ps []string) { continue } level.Debug(t.logger).Log("msg", "tailing new file", "filename", p) - tailer, err := newTailer(t.logger, t.handler, t.positions, p) + tailer, err := newTailer(t.metrics, t.logger, t.handler, t.positions, p) if err != nil { level.Error(t.logger).Log("msg", "failed to start tailer", "error", err, "filename", p) continue @@ -379,7 +350,7 @@ func (t *FileTarget) reportSize(ms []string) { // the tail code will also check if the file exists before creating a tailer. return } - totalBytes.WithLabelValues(m).Set(float64(fi.Size())) + t.metrics.totalBytes.WithLabelValues(m).Set(float64(fi.Size())) } } diff --git a/pkg/promtail/targets/file/filetarget_test.go b/pkg/promtail/targets/file/filetarget_test.go index 15725a57ab706..a5efc9dcc7e5f 100644 --- a/pkg/promtail/targets/file/filetarget_test.go +++ b/pkg/promtail/targets/file/filetarget_test.go @@ -50,7 +50,8 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { t.Fatal(err) } - target, err := NewFileTarget(logger, client, ps, logFile, nil, nil, &Config{ + metrics := NewMetrics(nil) + target, err := NewFileTarget(metrics, logger, client, ps, logFile, nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { @@ -141,7 +142,8 @@ func TestWatchEntireDirectory(t *testing.T) { t.Fatal(err) } - target, err := NewFileTarget(logger, client, ps, logFileDir+"*", nil, nil, &Config{ + metrics := NewMetrics(nil) + target, err := NewFileTarget(metrics, logger, client, ps, logFileDir+"*", nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { @@ -228,7 +230,8 @@ func TestFileRolls(t *testing.T) { t.Fatal(err) } - target, err := NewFileTarget(logger, client, positions, dirName+"/*.log", nil, nil, &Config{ + metrics := NewMetrics(nil) + target, err := NewFileTarget(metrics, logger, client, positions, dirName+"/*.log", nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { @@ -324,7 +327,8 @@ func TestResumesWhereLeftOff(t *testing.T) { t.Fatal(err) } - target, err := NewFileTarget(logger, client, ps, dirName+"/*.log", nil, nil, &Config{ + metrics := NewMetrics(nil) + target, err := NewFileTarget(metrics, logger, client, ps, dirName+"/*.log", nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { @@ -358,7 +362,7 @@ func TestResumesWhereLeftOff(t *testing.T) { } // Create a new target, keep the same client so we can track what was sent through the handler. - target2, err := NewFileTarget(logger, client, ps2, dirName+"/*.log", nil, nil, &Config{ + target2, err := NewFileTarget(metrics, logger, client, ps2, dirName+"/*.log", nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { @@ -431,7 +435,8 @@ func TestGlobWithMultipleFiles(t *testing.T) { t.Fatal(err) } - target, err := NewFileTarget(logger, client, ps, dirName+"/*.log", nil, nil, &Config{ + metrics := NewMetrics(nil) + target, err := NewFileTarget(metrics, logger, client, ps, dirName+"/*.log", nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { @@ -527,7 +532,8 @@ func TestFileTargetSync(t *testing.T) { client := fake.New(func() {}) defer client.Stop() - target, err := NewFileTarget(logger, client, ps, logDir1+"/*.log", nil, nil, &Config{ + metrics := NewMetrics(nil) + target, err := NewFileTarget(metrics, logger, client, ps, logDir1+"/*.log", nil, nil, &Config{ SyncPeriod: 10 * time.Second, }) if err != nil { diff --git a/pkg/promtail/targets/file/filetargetmanager.go b/pkg/promtail/targets/file/filetargetmanager.go index 4eaf98c3af396..8d4db43e5cecd 100644 --- a/pkg/promtail/targets/file/filetargetmanager.go +++ b/pkg/promtail/targets/file/filetargetmanager.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/kubernetes" @@ -32,19 +31,6 @@ const ( kubernetesPodNodeField = "spec.nodeName" ) -var ( - failedTargets = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "targets_failed_total", - Help: "Number of failed targets.", - }, []string{"reason"}) - targetsActive = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "promtail", - Name: "targets_active_total", - Help: "Number of active total.", - }) -) - // FileTargetManager manages a set of targets. // nolint:golint type FileTargetManager struct { @@ -56,12 +42,18 @@ type FileTargetManager struct { // NewFileTargetManager creates a new TargetManager. func NewFileTargetManager( + metrics *Metrics, logger log.Logger, positions positions.Positions, client api.EntryHandler, scrapeConfigs []scrapeconfig.Config, targetConfig *Config, ) (*FileTargetManager, error) { + reg := metrics.reg + if reg == nil { + reg = prometheus.DefaultRegisterer + } + ctx, quit := context.WithCancel(context.Background()) tm := &FileTargetManager{ log: logger, @@ -81,8 +73,7 @@ func NewFileTargetManager( continue } - registerer := prometheus.DefaultRegisterer - pipeline, err := stages.NewPipeline(log.With(logger, "component", "file_pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) + pipeline, err := stages.NewPipeline(log.With(logger, "component", "file_pipeline"), cfg.PipelineStages, &cfg.JobName, reg) if err != nil { return nil, err } @@ -113,6 +104,7 @@ func NewFileTargetManager( } s := &targetSyncer{ + metrics: metrics, log: logger, positions: positions, relabelConfig: cfg.RelabelConfigs, @@ -181,6 +173,7 @@ func (tm *FileTargetManager) AllTargets() map[string][]target.Target { // targetSyncer sync targets based on service discovery changes. type targetSyncer struct { + metrics *Metrics log log.Logger positions positions.Positions entryHandler api.EntryHandler @@ -223,7 +216,7 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { if processedLabels == nil { dropped = append(dropped, target.NewDroppedTarget("dropping target, no labels", discoveredLabels)) level.Debug(s.log).Log("msg", "dropping target, no labels") - failedTargets.WithLabelValues("empty_labels").Inc() + s.metrics.failedTargets.WithLabelValues("empty_labels").Inc() continue } @@ -231,7 +224,7 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { if ok && string(host) != s.hostname { dropped = append(dropped, target.NewDroppedTarget(fmt.Sprintf("ignoring target, wrong host (labels:%s hostname:%s)", labels.String(), s.hostname), discoveredLabels)) level.Debug(s.log).Log("msg", "ignoring target, wrong host", "labels", labels.String(), "hostname", s.hostname) - failedTargets.WithLabelValues("wrong_host").Inc() + s.metrics.failedTargets.WithLabelValues("wrong_host").Inc() continue } @@ -239,7 +232,7 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { if !ok { dropped = append(dropped, target.NewDroppedTarget("no path for target", discoveredLabels)) level.Info(s.log).Log("msg", "no path for target", "labels", labels.String()) - failedTargets.WithLabelValues("no_path").Inc() + s.metrics.failedTargets.WithLabelValues("no_path").Inc() continue } @@ -254,7 +247,7 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { if _, ok := s.targets[key]; ok { dropped = append(dropped, target.NewDroppedTarget("ignoring target, already exists", discoveredLabels)) level.Debug(s.log).Log("msg", "ignoring target, already exists", "labels", labels.String()) - failedTargets.WithLabelValues("exists").Inc() + s.metrics.failedTargets.WithLabelValues("exists").Inc() continue } @@ -263,11 +256,11 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { if err != nil { dropped = append(dropped, target.NewDroppedTarget(fmt.Sprintf("Failed to create target: %s", err.Error()), discoveredLabels)) level.Error(s.log).Log("msg", "Failed to create target", "key", key, "error", err) - failedTargets.WithLabelValues("error").Inc() + s.metrics.failedTargets.WithLabelValues("error").Inc() continue } - targetsActive.Add(1.) + s.metrics.targetsActive.Add(1.) s.targets[key] = t } } @@ -276,7 +269,7 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { if _, ok := targets[key]; !ok { level.Info(s.log).Log("msg", "Removing target", "key", key) target.Stop() - targetsActive.Add(-1.) + s.metrics.targetsActive.Add(-1.) delete(s.targets, key) } } @@ -284,7 +277,7 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group) { } func (s *targetSyncer) newTarget(path string, labels model.LabelSet, discoveredLabels model.LabelSet) (*FileTarget, error) { - return NewFileTarget(s.log, s.entryHandler, s.positions, path, labels, discoveredLabels, s.targetConfig) + return NewFileTarget(s.metrics, s.log, s.entryHandler, s.positions, path, labels, discoveredLabels, s.targetConfig) } func (s *targetSyncer) DroppedTargets() []target.Target { diff --git a/pkg/promtail/targets/file/metrics.go b/pkg/promtail/targets/file/metrics.go new file mode 100644 index 0000000000000..48e0d6bcd84ac --- /dev/null +++ b/pkg/promtail/targets/file/metrics.go @@ -0,0 +1,79 @@ +package file + +import "github.com/prometheus/client_golang/prometheus" + +// Metrics hold the set of file-based metrics. +type Metrics struct { + // Registerer used. May be nil. + reg prometheus.Registerer + + // File-specific metrics + readBytes *prometheus.GaugeVec + totalBytes *prometheus.GaugeVec + readLines *prometheus.CounterVec + filesActive prometheus.Gauge + logLengthHistogram *prometheus.HistogramVec + + // Manager metrics + failedTargets *prometheus.CounterVec + targetsActive prometheus.Gauge +} + +// NewMetrics creates a new set of file metrics. If reg is non-nil, the metrics +// will be registered. +func NewMetrics(reg prometheus.Registerer) *Metrics { + var m Metrics + m.reg = reg + + m.readBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "promtail", + Name: "read_bytes_total", + Help: "Number of bytes read.", + }, []string{"path"}) + m.totalBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "promtail", + Name: "file_bytes_total", + Help: "Number of bytes total.", + }, []string{"path"}) + m.readLines = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "read_lines_total", + Help: "Number of lines read.", + }, []string{"path"}) + m.filesActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "promtail", + Name: "files_active_total", + Help: "Number of active files.", + }) + m.logLengthHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "promtail", + Name: "log_entries_bytes", + Help: "the total count of bytes", + Buckets: prometheus.ExponentialBuckets(16, 2, 8), + }, []string{"path"}) + + m.failedTargets = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "targets_failed_total", + Help: "Number of failed targets.", + }, []string{"reason"}) + m.targetsActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "promtail", + Name: "targets_active_total", + Help: "Number of active total.", + }) + + if reg != nil { + reg.MustRegister( + m.readBytes, + m.totalBytes, + m.readLines, + m.filesActive, + m.logLengthHistogram, + m.failedTargets, + m.targetsActive, + ) + } + + return &m +} diff --git a/pkg/promtail/targets/file/tailer.go b/pkg/promtail/targets/file/tailer.go index 38fd94da7a284..15ffdc06b174c 100644 --- a/pkg/promtail/targets/file/tailer.go +++ b/pkg/promtail/targets/file/tailer.go @@ -18,6 +18,7 @@ import ( ) type tailer struct { + metrics *Metrics logger log.Logger handler api.EntryHandler positions positions.Positions @@ -34,7 +35,7 @@ type tailer struct { done chan struct{} } -func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string) (*tailer, error) { +func newTailer(metrics *Metrics, logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string) (*tailer, error) { // Simple check to make sure the file we are tailing doesn't // have a position already saved which is past the end of the file. fi, err := os.Stat(path) @@ -66,6 +67,7 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions. logger = log.With(logger, "component", "tailer") tailer := &tailer{ + metrics: metrics, logger: logger, handler: api.AddLabelsMiddleware(model.LabelSet{FilenameLabel: model.LabelValue(path)}).Wrap(handler), positions: positions, @@ -80,7 +82,7 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions. go tailer.readLines() go tailer.updatePosition() - filesActive.Add(1.) + metrics.filesActive.Add(1.) return tailer, nil } @@ -146,8 +148,8 @@ func (t *tailer) readLines() { continue } - readLines.WithLabelValues(t.path).Inc() - logLengthHistogram.WithLabelValues(t.path).Observe(float64(len(line.Text))) + t.metrics.readLines.WithLabelValues(t.path).Inc() + t.metrics.logLengthHistogram.WithLabelValues(t.path).Observe(float64(len(line.Text))) entries <- api.Entry{ Labels: model.LabelSet{}, Entry: logproto.Entry{ @@ -173,13 +175,13 @@ func (t *tailer) markPositionAndSize() error { } return err } - totalBytes.WithLabelValues(t.path).Set(float64(size)) + t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) pos, err := t.tail.Tell() if err != nil { return err } - readBytes.WithLabelValues(t.path).Set(float64(pos)) + t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos)) t.positions.Put(t.path, pos) return nil @@ -218,9 +220,9 @@ func (t *tailer) isRunning() bool { // cleanupMetrics removes all metrics exported by this tailer func (t *tailer) cleanupMetrics() { // When we stop tailing the file, also un-export metrics related to the file - filesActive.Add(-1.) - readLines.DeleteLabelValues(t.path) - readBytes.DeleteLabelValues(t.path) - totalBytes.DeleteLabelValues(t.path) - logLengthHistogram.DeleteLabelValues(t.path) + t.metrics.filesActive.Add(-1.) + t.metrics.readLines.DeleteLabelValues(t.path) + t.metrics.readBytes.DeleteLabelValues(t.path) + t.metrics.totalBytes.DeleteLabelValues(t.path) + t.metrics.logLengthHistogram.DeleteLabelValues(t.path) } diff --git a/pkg/promtail/targets/journal/journaltargetmanager.go b/pkg/promtail/targets/journal/journaltargetmanager.go index 812d962180ea7..9b0c2e357f0b4 100644 --- a/pkg/promtail/targets/journal/journaltargetmanager.go +++ b/pkg/promtail/targets/journal/journaltargetmanager.go @@ -5,6 +5,7 @@ package journal import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/positions" @@ -19,6 +20,7 @@ type JournalTargetManager struct{} // NewJournalTargetManager returns nil as JournalTargets are not supported // on this platform. func NewJournalTargetManager( + reg prometheus.Registerer, logger log.Logger, positions positions.Positions, client api.EntryHandler, diff --git a/pkg/promtail/targets/journal/journaltargetmanager_linux.go b/pkg/promtail/targets/journal/journaltargetmanager_linux.go index 1171ada40d43b..5f7fb3fda6b58 100644 --- a/pkg/promtail/targets/journal/journaltargetmanager_linux.go +++ b/pkg/promtail/targets/journal/journaltargetmanager_linux.go @@ -23,6 +23,7 @@ type JournalTargetManager struct { // NewJournalTargetManager creates a new JournalTargetManager. func NewJournalTargetManager( + reg prometheus.Registerer, logger log.Logger, positions positions.Positions, client api.EntryHandler, @@ -38,8 +39,7 @@ func NewJournalTargetManager( continue } - registerer := prometheus.DefaultRegisterer - pipeline, err := stages.NewPipeline(log.With(logger, "component", "journal_pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) + pipeline, err := stages.NewPipeline(log.With(logger, "component", "journal_pipeline"), cfg.PipelineStages, &cfg.JobName, reg) if err != nil { return nil, err } diff --git a/pkg/promtail/targets/lokipush/pushtarget_test.go b/pkg/promtail/targets/lokipush/pushtarget_test.go index bf95e10f9fb71..f22bf4593e796 100644 --- a/pkg/promtail/targets/lokipush/pushtarget_test.go +++ b/pkg/promtail/targets/lokipush/pushtarget_test.go @@ -10,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/relabel" "github.com/stretchr/testify/require" @@ -77,7 +78,7 @@ func TestPushTarget(t *testing.T) { BatchWait: 1 * time.Second, BatchSize: 100 * 1024, } - pc, err := client.New(ccfg, logger) + pc, err := client.New(prometheus.DefaultRegisterer, ccfg, logger) require.NoError(t, err) defer pc.Stop() diff --git a/pkg/promtail/targets/lokipush/pushtargetmanager.go b/pkg/promtail/targets/lokipush/pushtargetmanager.go index 1f86075aa33ce..cfdd3abbe0352 100644 --- a/pkg/promtail/targets/lokipush/pushtargetmanager.go +++ b/pkg/promtail/targets/lokipush/pushtargetmanager.go @@ -23,6 +23,7 @@ type PushTargetManager struct { // NewPushTargetManager creates a new PushTargetManager. func NewPushTargetManager( + reg prometheus.Registerer, logger log.Logger, client api.EntryHandler, scrapeConfigs []scrapeconfig.Config, @@ -38,8 +39,7 @@ func NewPushTargetManager( } for _, cfg := range scrapeConfigs { - registerer := prometheus.DefaultRegisterer - pipeline, err := stages.NewPipeline(log.With(logger, "component", "push_pipeline_"+cfg.JobName), cfg.PipelineStages, &cfg.JobName, registerer) + pipeline, err := stages.NewPipeline(log.With(logger, "component", "push_pipeline_"+cfg.JobName), cfg.PipelineStages, &cfg.JobName, reg) if err != nil { return nil, err } diff --git a/pkg/promtail/targets/manager.go b/pkg/promtail/targets/manager.go index 313f94f34f4a1..d6f33b5945d6c 100644 --- a/pkg/promtail/targets/manager.go +++ b/pkg/promtail/targets/manager.go @@ -4,6 +4,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/positions" @@ -39,6 +40,7 @@ type TargetManagers struct { // NewTargetManagers makes a new TargetManagers func NewTargetManagers( app stdin.Shutdownable, + reg prometheus.Registerer, logger log.Logger, positionsConfig positions.Config, client api.EntryHandler, @@ -50,7 +52,7 @@ func NewTargetManagers( if targetConfig.Stdin { level.Debug(logger).Log("msg", "configured to read from stdin") - stdin, err := stdin.NewStdinTargetManager(logger, app, client, scrapeConfigs) + stdin, err := stdin.NewStdinTargetManager(reg, logger, app, client, scrapeConfigs) if err != nil { return nil, err } @@ -78,10 +80,22 @@ func NewTargetManagers( } } + var ( + fileMetrics *file.Metrics + syslogMetrics *syslog.Metrics + ) + if len(targetScrapeConfigs[FileScrapeConfigs]) > 0 { + fileMetrics = file.NewMetrics(reg) + } + if len(targetScrapeConfigs[SyslogScrapeConfigs]) > 0 { + syslogMetrics = syslog.NewMetrics(reg) + } + for target, scrapeConfigs := range targetScrapeConfigs { switch target { case FileScrapeConfigs: fileTargetManager, err := file.NewFileTargetManager( + fileMetrics, logger, positions, client, @@ -94,6 +108,7 @@ func NewTargetManagers( targetManagers = append(targetManagers, fileTargetManager) case JournalScrapeConfigs: journalTargetManager, err := journal.NewJournalTargetManager( + reg, logger, positions, client, @@ -105,6 +120,7 @@ func NewTargetManagers( targetManagers = append(targetManagers, journalTargetManager) case SyslogScrapeConfigs: syslogTargetManager, err := syslog.NewSyslogTargetManager( + syslogMetrics, logger, client, scrapeConfigs, @@ -115,6 +131,7 @@ func NewTargetManagers( targetManagers = append(targetManagers, syslogTargetManager) case PushScrapeConfigs: pushTargetManager, err := lokipush.NewPushTargetManager( + reg, logger, client, scrapeConfigs, diff --git a/pkg/promtail/targets/stdin/stdin_target_manager.go b/pkg/promtail/targets/stdin/stdin_target_manager.go index f19cd5bae4208..72cdb76575e33 100644 --- a/pkg/promtail/targets/stdin/stdin_target_manager.go +++ b/pkg/promtail/targets/stdin/stdin_target_manager.go @@ -57,8 +57,8 @@ type StdinTargetManager struct { app Shutdownable } -func NewStdinTargetManager(log log.Logger, app Shutdownable, client api.EntryHandler, configs []scrapeconfig.Config) (*StdinTargetManager, error) { - reader, err := newReaderTarget(log, stdIn, client, getStdinConfig(log, configs)) +func NewStdinTargetManager(reg prometheus.Registerer, log log.Logger, app Shutdownable, client api.EntryHandler, configs []scrapeconfig.Config) (*StdinTargetManager, error) { + reader, err := newReaderTarget(reg, log, stdIn, client, getStdinConfig(log, configs)) if err != nil { return nil, err } @@ -103,8 +103,8 @@ type readerTarget struct { ctx context.Context } -func newReaderTarget(logger log.Logger, in io.Reader, client api.EntryHandler, cfg scrapeconfig.Config) (*readerTarget, error) { - pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, prometheus.DefaultRegisterer) +func newReaderTarget(reg prometheus.Registerer, logger log.Logger, in io.Reader, client api.EntryHandler, cfg scrapeconfig.Config) (*readerTarget, error) { + pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, reg) if err != nil { return nil, err } diff --git a/pkg/promtail/targets/stdin/stdin_target_manager_test.go b/pkg/promtail/targets/stdin/stdin_target_manager_test.go index 26ec7b1b448a9..b23e33ca75461 100644 --- a/pkg/promtail/targets/stdin/stdin_target_manager_test.go +++ b/pkg/promtail/targets/stdin/stdin_target_manager_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -79,7 +80,7 @@ func Test_newReaderTarget(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := fake.New(func() {}) - got, err := newReaderTarget(util.Logger, tt.in, c, tt.cfg) + got, err := newReaderTarget(prometheus.DefaultRegisterer, util.Logger, tt.in, c, tt.cfg) if (err != nil) != tt.wantErr { t.Errorf("newReaderTarget() error = %v, wantErr %v", err, tt.wantErr) return @@ -119,7 +120,7 @@ func Test_Shutdown(t *testing.T) { stdIn = newFakeStdin("line") appMock := &mockShutdownable{called: make(chan bool, 1)} recorder := fake.New(func() {}) - manager, err := NewStdinTargetManager(util.Logger, appMock, recorder, []scrapeconfig.Config{{}}) + manager, err := NewStdinTargetManager(prometheus.DefaultRegisterer, util.Logger, appMock, recorder, []scrapeconfig.Config{{}}) require.NoError(t, err) require.NotNil(t, manager) require.Equal(t, true, <-appMock.called) diff --git a/pkg/promtail/targets/syslog/metrics.go b/pkg/promtail/targets/syslog/metrics.go new file mode 100644 index 0000000000000..705d14598fbd4 --- /dev/null +++ b/pkg/promtail/targets/syslog/metrics.go @@ -0,0 +1,45 @@ +package syslog + +import "github.com/prometheus/client_golang/prometheus" + +// Metrics holds a set of syslog metrics. +type Metrics struct { + reg prometheus.Registerer + + syslogEntries prometheus.Counter + syslogParsingErrors prometheus.Counter + syslogEmptyMessages prometheus.Counter +} + +// NewMetrics creates a new set of syslog metrics. If reg is non-nil, the +// metrics will be registered. +func NewMetrics(reg prometheus.Registerer) *Metrics { + var m Metrics + m.reg = reg + + m.syslogEntries = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "syslog_target_entries_total", + Help: "Total number of successful entries sent to the syslog target", + }) + m.syslogParsingErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "syslog_target_parsing_errors_total", + Help: "Total number of parsing errors while receiving syslog messages", + }) + m.syslogEmptyMessages = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "syslog_empty_messages_total", + Help: "Total number of empty messages receiving from syslog", + }) + + if reg != nil { + reg.MustRegister( + m.syslogEntries, + m.syslogParsingErrors, + m.syslogEmptyMessages, + ) + } + + return &m +} diff --git a/pkg/promtail/targets/syslog/syslogtarget.go b/pkg/promtail/targets/syslog/syslogtarget.go index d52b4ad73ecc2..4023b8bfca7c6 100644 --- a/pkg/promtail/targets/syslog/syslogtarget.go +++ b/pkg/promtail/targets/syslog/syslogtarget.go @@ -15,8 +15,6 @@ import ( "github.com/influxdata/go-syslog/v3" "github.com/influxdata/go-syslog/v3/rfc5424" "github.com/mwitkow/go-conntrack" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" @@ -29,28 +27,13 @@ import ( ) var ( - syslogEntries = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "syslog_target_entries_total", - Help: "Total number of successful entries sent to the syslog target", - }) - syslogParsingErrors = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "syslog_target_parsing_errors_total", - Help: "Total number of parsing errors while receiving syslog messages", - }) - syslogEmptyMessages = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "syslog_empty_messages_total", - Help: "Total number of empty messages receiving from syslog", - }) - defaultIdleTimeout = 120 * time.Second ) // SyslogTarget listens to syslog messages. // nolint:golint type SyslogTarget struct { + metrics *Metrics logger log.Logger handler api.EntryHandler config *scrapeconfig.SyslogTargetConfig @@ -72,6 +55,7 @@ type message struct { // NewSyslogTarget configures a new SyslogTarget. func NewSyslogTarget( + metrics *Metrics, logger log.Logger, handler api.EntryHandler, relabel []*relabel.Config, @@ -81,6 +65,7 @@ func NewSyslogTarget( ctx, cancel := context.WithCancel(context.Background()) t := &SyslogTarget{ + metrics: metrics, logger: logger, handler: handler, config: config, @@ -182,14 +167,14 @@ func (t *SyslogTarget) handleMessageError(err error) { return } level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) - syslogParsingErrors.Inc() + t.metrics.syslogParsingErrors.Inc() } func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) { rfc5424Msg := msg.(*rfc5424.SyslogMessage) if rfc5424Msg.Message == nil { - syslogEmptyMessages.Inc() + t.metrics.syslogEmptyMessages.Inc() return } @@ -251,7 +236,7 @@ func (t *SyslogTarget) messageSender(entries chan<- api.Entry) { Line: msg.message, }, } - syslogEntries.Inc() + t.metrics.syslogEntries.Inc() } } diff --git a/pkg/promtail/targets/syslog/syslogtarget_test.go b/pkg/promtail/targets/syslog/syslogtarget_test.go index d32f2fbfb93cb..e5bd677714874 100644 --- a/pkg/promtail/targets/syslog/syslogtarget_test.go +++ b/pkg/promtail/targets/syslog/syslogtarget_test.go @@ -32,7 +32,8 @@ func testSyslogTarget(t *testing.T, octetCounting bool) { logger := log.NewLogfmtLogger(w) client := fake.New(func() {}) - tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ + metrics := NewMetrics(nil) + tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", LabelStructuredData: true, Labels: model.LabelSet{ @@ -130,8 +131,9 @@ func TestSyslogTarget_InvalidData(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) client := fake.New(func() {}) + metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ + tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", }) require.NoError(t, err) @@ -160,8 +162,9 @@ func TestSyslogTarget_NonUTF8Message(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) client := fake.New(func() {}) + metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ + tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", }) require.NoError(t, err) @@ -197,8 +200,9 @@ func TestSyslogTarget_IdleTimeout(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) client := fake.New(func() {}) + metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ + tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", IdleTimeout: time.Millisecond, }) diff --git a/pkg/promtail/targets/syslog/syslogtargetmanager.go b/pkg/promtail/targets/syslog/syslogtargetmanager.go index 5379f26027ec6..b2bbce57ddf38 100644 --- a/pkg/promtail/targets/syslog/syslogtargetmanager.go +++ b/pkg/promtail/targets/syslog/syslogtargetmanager.go @@ -20,10 +20,15 @@ type SyslogTargetManager struct { // NewSyslogTargetManager creates a new SyslogTargetManager. func NewSyslogTargetManager( + metrics *Metrics, logger log.Logger, client api.EntryHandler, scrapeConfigs []scrapeconfig.Config, ) (*SyslogTargetManager, error) { + reg := metrics.reg + if reg == nil { + reg = prometheus.DefaultRegisterer + } tm := &SyslogTargetManager{ logger: logger, @@ -31,13 +36,12 @@ func NewSyslogTargetManager( } for _, cfg := range scrapeConfigs { - registerer := prometheus.DefaultRegisterer - pipeline, err := stages.NewPipeline(log.With(logger, "component", "syslog_pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) + pipeline, err := stages.NewPipeline(log.With(logger, "component", "syslog_pipeline"), cfg.PipelineStages, &cfg.JobName, reg) if err != nil { return nil, err } - t, err := NewSyslogTarget(logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.SyslogConfig) + t, err := NewSyslogTarget(metrics, logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.SyslogConfig) if err != nil { return nil, err }