Skip to content

Commit

Permalink
pass a prometheus registerer to promtail components (#3175)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Fratto <robertfratto@gmail.com>
  • Loading branch information
rfratto authored Jan 15, 2021
1 parent 54d1d3b commit 628c1fa
Show file tree
Hide file tree
Showing 26 changed files with 320 additions and 181 deletions.
2 changes: 1 addition & 1 deletion cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) {
if err != nil {
return nil, err
}
c, err := client.New(cfg.clientConfig, logger)
c, err := client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/fluent-bit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/promtail/client"
)
Expand All @@ -11,5 +12,5 @@ func NewClient(cfg *config, logger log.Logger) (client.Client, error) {
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger)
}
return client.New(cfg.clientConfig, logger)
return client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
}
3 changes: 2 additions & 1 deletion cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/joncrlsn/dque"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -71,7 +72,7 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {
_ = q.queue.TurboOn()
}

q.loki, err = client.New(cfg.clientConfig, logger)
q.loki, err = client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
if err != nil {
return nil, err
}
Expand Down
94 changes: 58 additions & 36 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,68 +42,88 @@ const (
)

var (
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
UserAgent = fmt.Sprintf("promtail/%s", version.Version)
)

type metrics struct {
encodedBytes *prometheus.CounterVec
sentBytes *prometheus.CounterVec
droppedBytes *prometheus.CounterVec
sentEntries *prometheus.CounterVec
droppedEntries *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
streamLag *metric.Gauges
countersWithHost []*prometheus.CounterVec
}

func newMetrics(reg prometheus.Registerer) *metrics {
var m metrics

m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{HostLabel})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{HostLabel})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{HostLabel})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", HostLabel})
batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "batch_retries_total",
Help: "Number of times batches has had to be retried.",
}, []string{HostLabel})
streamLag *metric.Gauges

countersWithHost = []*prometheus.CounterVec{
encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries,
}

UserAgent = fmt.Sprintf("promtail/%s", version.Version)
)

func init() {
prometheus.MustRegister(encodedBytes)
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(droppedBytes)
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
prometheus.MustRegister(batchRetries)
var err error
streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
m.streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
"Difference between current time and last batch timestamp for successful sends",
metric.GaugeConfig{Action: "set"},
int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric.
)
if err != nil {
panic(err)
}
prometheus.MustRegister(streamLag)

m.countersWithHost = []*prometheus.CounterVec{
m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries,
}

if reg != nil {
reg.MustRegister(
m.encodedBytes,
m.sentBytes,
m.droppedBytes,
m.sentEntries,
m.droppedEntries,
m.requestDuration,
m.batchRetries,
m.streamLag,
)
}

return &m
}

// Client pushes entries to Loki and can be stopped
Expand All @@ -115,6 +135,7 @@ type Client interface {

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
metrics *metrics
logger log.Logger
cfg Config
client *http.Client
Expand All @@ -131,7 +152,7 @@ type client struct {
}

// New makes a new Client.
func New(cfg Config, logger log.Logger) (Client, error) {
func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) {
if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
}
Expand All @@ -142,6 +163,7 @@ func New(cfg Config, logger log.Logger) (Client, error) {
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
entries: make(chan api.Entry),
metrics: newMetrics(reg),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
Expand All @@ -162,7 +184,7 @@ func New(cfg Config, logger log.Logger) (Client, error) {

// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range countersWithHost {
for _, counter := range c.metrics.countersWithHost {
counter.WithLabelValues(c.cfg.URL.Host).Add(0)
}

Expand Down Expand Up @@ -249,7 +271,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
return
}
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig)
var status int
Expand All @@ -258,11 +280,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(context.Background(), tenantID, buf)

requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
Expand All @@ -280,7 +302,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
c.metrics.streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
}
return
Expand All @@ -292,7 +314,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}

level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()

// Make sure it sends at least once before checking for retry.
Expand All @@ -303,8 +325,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}

Expand Down Expand Up @@ -382,5 +404,5 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) {

func (c *client) UnregisterLatencyMetric(labels model.LabelSet) {
labels[HostLabel] = model.LabelValue(c.cfg.URL.Host)
streamLag.Delete(labels)
c.metrics.streamLag.Delete(labels)
}
16 changes: 6 additions & 10 deletions pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ func TestClient_Handle(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()
reg := prometheus.NewRegistry()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)
Expand All @@ -267,7 +265,7 @@ func TestClient_Handle(t *testing.T) {
TenantID: testData.clientTenantID,
}

c, err := New(cfg, log.NewNopLogger())
c, err := New(reg, cfg, log.NewNopLogger())
require.NoError(t, err)

// Send all the input log entries
Expand Down Expand Up @@ -301,7 +299,7 @@ func TestClient_Handle(t *testing.T) {
require.ElementsMatch(t, testData.expectedReqs, receivedReqs)

expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -372,9 +370,7 @@ func TestClient_StopNow(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()
reg := prometheus.NewRegistry()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)
Expand All @@ -401,7 +397,7 @@ func TestClient_StopNow(t *testing.T) {
TenantID: c.clientTenantID,
}

cl, err := New(cfg, log.NewNopLogger())
cl, err := New(reg, cfg, log.NewNopLogger())
require.NoError(t, err)

// Send all the input log entries
Expand Down Expand Up @@ -441,7 +437,7 @@ func TestClient_StopNow(t *testing.T) {
require.ElementsMatch(t, c.expectedReqs, receivedReqs)

expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/fatih/color"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/promtail/api"
Expand Down Expand Up @@ -36,9 +37,9 @@ type logger struct {
}

// NewLogger creates a new client logger that logs entries instead of sending them.
func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) (Client, error) {
func NewLogger(reg prometheus.Registerer, log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) (Client, error) {
// make sure the clients config is valid
c, err := NewMulti(log, externalLabels, cfgs...)
c, err := NewMulti(reg, log, externalLabels, cfgs...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/promtail/client/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import (
)

func TestNewLogger(t *testing.T) {
_, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{}...)
_, err := NewLogger(nil, util.Logger, flagext.LabelSet{}, []Config{}...)
require.Error(t, err)

l, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...)
l, err := NewLogger(nil, util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...)
require.NoError(t, err)
l.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Now(), Line: "entry"}}
l.Stop()

}
5 changes: 3 additions & 2 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/util/flagext"
Expand All @@ -20,7 +21,7 @@ type MultiClient struct {
}

// NewMulti creates a new client
func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) {
func NewMulti(reg prometheus.Registerer, logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) {
if len(cfgs) == 0 {
return nil, errors.New("at least one client config should be provided")
}
Expand All @@ -34,7 +35,7 @@ func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config
// not typically the order of precedence, the assumption here is someone providing a specific config in
// yaml is doing so explicitly to make a key specific to a client.
cfg.ExternalLabels = flagext.LabelSet{LabelSet: externalLabels.Merge(cfg.ExternalLabels.LabelSet)}
client, err := New(cfg, logger)
client, err := New(reg, cfg, logger)
if err != nil {
return nil, err
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/promtail/client/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestNewMulti(t *testing.T) {
_, err := NewMulti(util.Logger, lokiflag.LabelSet{}, []Config{}...)
_, err := NewMulti(nil, util.Logger, lokiflag.LabelSet{}, []Config{}...)
if err == nil {
t.Fatal("expected err but got nil")
}
Expand All @@ -37,7 +37,7 @@ func TestNewMulti(t *testing.T) {
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"hi": "there"}},
}

clients, err := NewMulti(util.Logger, lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "command"}}, cc1, cc2)
clients, err := NewMulti(nil, util.Logger, lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "command"}}, cc1, cc2)
if err != nil {
t.Fatalf("expected err: nil got:%v", err)
}
Expand Down Expand Up @@ -98,7 +98,6 @@ func TestMultiClient_Stop(t *testing.T) {
}

func TestMultiClient_Handle(t *testing.T) {

f := fake.New(func() {})
clients := []Client{f, f, f, f, f, f}
m := &MultiClient{
Expand All @@ -114,5 +113,4 @@ func TestMultiClient_Handle(t *testing.T) {
if len(f.Received()) != len(clients) {
t.Fatal("missing handle call")
}

}
Loading

0 comments on commit 628c1fa

Please sign in to comment.