Skip to content

Commit

Permalink
feat: add loki_ingester_rf1_segment_age_seconds metric
Browse files Browse the repository at this point in the history
This commit adds a new metric loki_ingester_rf1_segment_age_seconds.
It also cleans up a lot of the code that is used to report metrics
for segments and adds a new SegmentsStats struct to get data from
a SegmentWriter.
  • Loading branch information
grobinson-grafana committed Jul 24, 2024
1 parent 4f534d7 commit 45cca9d
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 124 deletions.
10 changes: 5 additions & 5 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,9 @@ func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error {

func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
start := time.Now()
defer func() {
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
w.ReportMetrics()
}()

i.metrics.flushesTotal.Add(1)
defer i.metrics.flushDuration.Observe(time.Since(start).Seconds())

buf := i.flushBuffers[j]
defer buf.Reset()
Expand All @@ -111,6 +108,9 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
return err
}

stats := wal.GetSegmentStats(w, time.Now())
wal.ReportSegmentStats(stats, i.metrics.segmentMetrics)

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
Expand All @@ -121,7 +121,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
Block: w.Meta(id),
}); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("metastore add block: %w", err)
return fmt.Errorf("failed to update metastore: %w", err)
}

return nil
Expand Down
55 changes: 24 additions & 31 deletions pkg/ingester-rf1/metrics.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
package ingesterrf1

import (
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type flushMetrics struct {
type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushQueues prometheus.Gauge
flushDuration prometheus.Histogram
flushSizeBytes prometheus.Histogram
flushSize prometheus.Histogram
segmentMetrics *wal.SegmentMetrics
}

func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
return &flushMetrics{
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
Expand All @@ -33,37 +51,12 @@ func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_size_bytes",
Help: "The flush size (as written to object storage).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
*flushMetrics
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushMetrics: newFlushMetrics(r),
segmentMetrics: wal.NewSegmentMetrics(r),
}
}
32 changes: 7 additions & 25 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

const (
DefaultMaxAge = 500 * time.Millisecond
DefaultMaxSegments = 10
DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB.
)

var (
// ErrClosed is returned when the WAL is closed. It is a permanent error
// as once closed, a WAL cannot be re-opened.
Expand Down Expand Up @@ -109,31 +103,24 @@ type Manager struct {
clock quartz.Clock
}

// segment is similar to PendingSegment, however it is an internal struct used
// in the available and pending lists. It contains a single-use result that is
// returned to callers appending to the WAL and a re-usable segment that is reset
// after each flush.
// segment is an internal struct used in the available and pending lists. It
// contains a single-use result that is returned to callers appending to the
// WAL and a re-usable segment that is reset after each flush.
type segment struct {
r *AppendResult
w *SegmentWriter

// moved is the time the segment was moved to the pending list. It is used
// to calculate the age of the segment. A segment is moved when it has
// exceeded the maximum age or the maximum size.
moved time.Time
}

// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
Moved time.Time
}

func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error) {
m := Manager{
cfg: cfg,
metrics: metrics.ManagerMetrics,
metrics: metrics,
available: list.New(),
pending: list.New(),
clock: quartz.NewReal(),
Expand All @@ -142,7 +129,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter(metrics.SegmentMetrics)
w, err := NewWalSegmentWriter()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -205,11 +192,7 @@ func (m *Manager) NextPending() (*PendingSegment, error) {
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingSegment{
Result: s.r,
Writer: s.w,
Moved: s.moved,
}, nil
return &PendingSegment{Result: s.r, Writer: s.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
Expand All @@ -229,7 +212,6 @@ func (m *Manager) Put(s *PendingSegment) {
// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, s *segment) {
s.moved = m.clock.Now()
m.pending.PushBack(s)
m.metrics.NumPending.Inc()
m.available.Remove(el)
Expand Down
28 changes: 14 additions & 14 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestManager_Append(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestManager_AppendFailed(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestManager_AppendFailedWALClosed(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 10,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestManager_AppendFailedWALFull(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 10,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Should be able to write 100KB of data, 10KB per segment.
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestManager_AppendMaxAgeExceeded(t *testing.T) {
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestManager_AppendMaxSizeExceeded(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append 512B of data.
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestManager_NextPending(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// There should be no segments waiting to be flushed as no data has been
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestManager_NextPendingAge(t *testing.T) {
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
Expand All @@ -311,7 +311,7 @@ func TestManager_NextPendingAge(t *testing.T) {
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, 100*time.Millisecond, s.Writer.Age(s.Moved))
require.Equal(t, 100*time.Millisecond, s.Writer.Age(clock.Now()))
m.Put(s)

// Append 1KB of data using two separate append requests, 1ms apart.
Expand Down Expand Up @@ -342,15 +342,15 @@ func TestManager_NextPendingAge(t *testing.T) {
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, time.Millisecond, s.Writer.Age(s.Moved))
require.Equal(t, time.Millisecond, s.Writer.Age(clock.Now()))
}

func TestManager_NextPendingMaxAgeExceeded(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestManager_NextPendingWALClosed(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestManager_Put(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// There should be 1 available and 0 pending segments.
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestManager_Metrics(t *testing.T) {
m, err := NewManager(Config{
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(r))
}, NewManagerMetrics(r))
require.NoError(t, err)

metricNames := []string{
Expand Down
Loading

0 comments on commit 45cca9d

Please sign in to comment.