Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: pass a prometheus registerer to promtail components #3175

Merged
merged 1 commit into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) {
if err != nil {
return nil, err
}
c, err := client.New(cfg.clientConfig, logger)
c, err := client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/fluent-bit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/promtail/client"
)
Expand All @@ -11,5 +12,5 @@ func NewClient(cfg *config, logger log.Logger) (client.Client, error) {
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger)
}
return client.New(cfg.clientConfig, logger)
return client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
}
3 changes: 2 additions & 1 deletion cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/joncrlsn/dque"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -71,7 +72,7 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {
_ = q.queue.TurboOn()
}

q.loki, err = client.New(cfg.clientConfig, logger)
q.loki, err = client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
if err != nil {
return nil, err
}
Expand Down
94 changes: 58 additions & 36 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,68 +42,88 @@ const (
)

var (
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
UserAgent = fmt.Sprintf("promtail/%s", version.Version)
)

type metrics struct {
encodedBytes *prometheus.CounterVec
sentBytes *prometheus.CounterVec
droppedBytes *prometheus.CounterVec
sentEntries *prometheus.CounterVec
droppedEntries *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
streamLag *metric.Gauges
countersWithHost []*prometheus.CounterVec
}

func newMetrics(reg prometheus.Registerer) *metrics {
var m metrics

m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{HostLabel})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{HostLabel})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{HostLabel})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", HostLabel})
batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "batch_retries_total",
Help: "Number of times batches has had to be retried.",
}, []string{HostLabel})
streamLag *metric.Gauges

countersWithHost = []*prometheus.CounterVec{
encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries,
}

UserAgent = fmt.Sprintf("promtail/%s", version.Version)
)

func init() {
prometheus.MustRegister(encodedBytes)
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(droppedBytes)
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
prometheus.MustRegister(batchRetries)
var err error
streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
m.streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
"Difference between current time and last batch timestamp for successful sends",
metric.GaugeConfig{Action: "set"},
int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric.
)
if err != nil {
panic(err)
}
prometheus.MustRegister(streamLag)

m.countersWithHost = []*prometheus.CounterVec{
m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries,
}

if reg != nil {
reg.MustRegister(
m.encodedBytes,
m.sentBytes,
m.droppedBytes,
m.sentEntries,
m.droppedEntries,
m.requestDuration,
m.batchRetries,
m.streamLag,
)
}

return &m
}

// Client pushes entries to Loki and can be stopped
Expand All @@ -115,6 +135,7 @@ type Client interface {

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
metrics *metrics
logger log.Logger
cfg Config
client *http.Client
Expand All @@ -131,7 +152,7 @@ type client struct {
}

// New makes a new Client.
func New(cfg Config, logger log.Logger) (Client, error) {
func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) {
if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
}
Expand All @@ -142,6 +163,7 @@ func New(cfg Config, logger log.Logger) (Client, error) {
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
entries: make(chan api.Entry),
metrics: newMetrics(reg),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
Expand All @@ -162,7 +184,7 @@ func New(cfg Config, logger log.Logger) (Client, error) {

// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range countersWithHost {
for _, counter := range c.metrics.countersWithHost {
counter.WithLabelValues(c.cfg.URL.Host).Add(0)
}

Expand Down Expand Up @@ -249,7 +271,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
return
}
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig)
var status int
Expand All @@ -258,11 +280,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(context.Background(), tenantID, buf)

requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
Expand All @@ -280,7 +302,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
c.metrics.streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
}
return
Expand All @@ -292,7 +314,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}

level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()

// Make sure it sends at least once before checking for retry.
Expand All @@ -303,8 +325,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}

Expand Down Expand Up @@ -382,5 +404,5 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) {

func (c *client) UnregisterLatencyMetric(labels model.LabelSet) {
labels[HostLabel] = model.LabelValue(c.cfg.URL.Host)
streamLag.Delete(labels)
c.metrics.streamLag.Delete(labels)
}
16 changes: 6 additions & 10 deletions pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ func TestClient_Handle(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()
reg := prometheus.NewRegistry()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)
Expand All @@ -267,7 +265,7 @@ func TestClient_Handle(t *testing.T) {
TenantID: testData.clientTenantID,
}

c, err := New(cfg, log.NewNopLogger())
c, err := New(reg, cfg, log.NewNopLogger())
require.NoError(t, err)

// Send all the input log entries
Expand Down Expand Up @@ -301,7 +299,7 @@ func TestClient_Handle(t *testing.T) {
require.ElementsMatch(t, testData.expectedReqs, receivedReqs)

expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -372,9 +370,7 @@ func TestClient_StopNow(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()
reg := prometheus.NewRegistry()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)
Expand All @@ -401,7 +397,7 @@ func TestClient_StopNow(t *testing.T) {
TenantID: c.clientTenantID,
}

cl, err := New(cfg, log.NewNopLogger())
cl, err := New(reg, cfg, log.NewNopLogger())
require.NoError(t, err)

// Send all the input log entries
Expand Down Expand Up @@ -441,7 +437,7 @@ func TestClient_StopNow(t *testing.T) {
require.ElementsMatch(t, c.expectedReqs, receivedReqs)

expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/fatih/color"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/promtail/api"
Expand Down Expand Up @@ -36,9 +37,9 @@ type logger struct {
}

// NewLogger creates a new client logger that logs entries instead of sending them.
func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) (Client, error) {
func NewLogger(reg prometheus.Registerer, log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) (Client, error) {
// make sure the clients config is valid
c, err := NewMulti(log, externalLabels, cfgs...)
c, err := NewMulti(reg, log, externalLabels, cfgs...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/promtail/client/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import (
)

func TestNewLogger(t *testing.T) {
_, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{}...)
_, err := NewLogger(nil, util.Logger, flagext.LabelSet{}, []Config{}...)
require.Error(t, err)

l, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...)
l, err := NewLogger(nil, util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...)
require.NoError(t, err)
l.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Now(), Line: "entry"}}
l.Stop()

}
5 changes: 3 additions & 2 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/util/flagext"
Expand All @@ -20,7 +21,7 @@ type MultiClient struct {
}

// NewMulti creates a new client
func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) {
func NewMulti(reg prometheus.Registerer, logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) {
if len(cfgs) == 0 {
return nil, errors.New("at least one client config should be provided")
}
Expand All @@ -34,7 +35,7 @@ func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config
// not typically the order of precedence, the assumption here is someone providing a specific config in
// yaml is doing so explicitly to make a key specific to a client.
cfg.ExternalLabels = flagext.LabelSet{LabelSet: externalLabels.Merge(cfg.ExternalLabels.LabelSet)}
client, err := New(cfg, logger)
client, err := New(reg, cfg, logger)
if err != nil {
return nil, err
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/promtail/client/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestNewMulti(t *testing.T) {
_, err := NewMulti(util.Logger, lokiflag.LabelSet{}, []Config{}...)
_, err := NewMulti(nil, util.Logger, lokiflag.LabelSet{}, []Config{}...)
if err == nil {
t.Fatal("expected err but got nil")
}
Expand All @@ -37,7 +37,7 @@ func TestNewMulti(t *testing.T) {
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"hi": "there"}},
}

clients, err := NewMulti(util.Logger, lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "command"}}, cc1, cc2)
clients, err := NewMulti(nil, util.Logger, lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "command"}}, cc1, cc2)
if err != nil {
t.Fatalf("expected err: nil got:%v", err)
}
Expand Down Expand Up @@ -98,7 +98,6 @@ func TestMultiClient_Stop(t *testing.T) {
}

func TestMultiClient_Handle(t *testing.T) {

f := fake.New(func() {})
clients := []Client{f, f, f, f, f, f}
m := &MultiClient{
Expand All @@ -114,5 +113,4 @@ func TestMultiClient_Handle(t *testing.T) {
if len(f.Received()) != len(clients) {
t.Fatal("missing handle call")
}

}
Loading