Skip to content

Commit

Permalink
Add new metrics for compressed size, streams and tenants
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Jul 15, 2024
1 parent e1dee72 commit 7208d9c
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 78 deletions.
12 changes: 5 additions & 7 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,18 @@ func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
reader := ch.Reader()
defer runutil.CloseWithLogOnErr(util_log.Logger, reader, "flushSegment")
newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
r := ch.Reader()

start := time.Now()
defer func() {
runutil.CloseWithLogOnErr(util_log.Logger, r, "flushSegment")
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
ch.Observe()
}()

// TODO: observe flush size, not just segment size
i.metrics.segmentSize.Observe(float64(ch.InputSize()))

i.metrics.flushesTotal.Add(1)
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+newUlid.String()), reader); err != nil {
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), r); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func New(cfg Config, clientConfig client.Config,
}
compressionStats.Set(cfg.ChunkEncoding)
targetSizeStats.Set(int64(cfg.TargetChunkSize))
metrics := newIngesterMetrics(registerer, metricsNamespace)
metrics := newIngesterMetrics(registerer)

walManager, err := wal.NewManager(wal.Config{
MaxAge: wal.DefaultMaxAge,
Expand Down
71 changes: 37 additions & 34 deletions pkg/ingester-rf1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,65 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
flushDuration prometheus.Histogram
flushFailuresTotal prometheus.Counter
flushesTotal prometheus.Counter
flushQueues prometheus.Gauge
flushSize prometheus.Histogram
limiterEnabled prometheus.Gauge
segmentSize prometheus.Histogram
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
type flushMetrics struct {
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushQueues prometheus.Gauge
flushDuration prometheus.Histogram
flushSizeBytes prometheus.Histogram
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten",
}),
flushDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_duration_seconds",
Help: "The flush duration (in seconds)",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
return &flushMetrics{
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
}),
flushFailuresTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flush_failures_total",
Help: "The total number of failed flushes.",
}),
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
}),
flushQueues: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_flush_queues",
Help: "The total number of flush queues.",
}),
flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
flushDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_duration_seconds",
Help: "The flush duration (in seconds).",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_size_bytes",
Help: "The flush size (as written to object storage).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
*flushMetrics
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
segmentSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_segment_size_bytes",
Help: "The segment size (uncompressed in memory).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushMetrics: newFlushMetrics(r),
}
}
31 changes: 3 additions & 28 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -83,29 +81,6 @@ type Config struct {
MaxSegmentSize int64
}

type Metrics struct {
NumAvailable prometheus.Gauge
NumPending prometheus.Gauge
NumFlushing prometheus.Gauge
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
NumAvailable: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_available",
Help: "The number of WAL segments accepting writes.",
}),
NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_pending",
Help: "The number of WAL segments waiting to be flushed.",
}),
NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_flushing",
Help: "The number of WAL segments being flushed.",
}),
}
}

// Manager buffers segments in memory, and keeps track of which segments are
// available and which are waiting to be flushed. The maximum number of
// segments that can be buffered in memory, and their maximum age and maximum
Expand All @@ -123,7 +98,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
// and avoid congestion collapse due to excessive timeouts and retries.
type Manager struct {
cfg Config
metrics *Metrics
metrics *ManagerMetrics

// available is a list of segments that are available and accepting data.
// All segments other than the segment at the front of the list are empty,
Expand Down Expand Up @@ -163,15 +138,15 @@ type PendingItem struct {
func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
m := Manager{
cfg: cfg,
metrics: metrics,
metrics: metrics.ManagerMetrics,
available: list.New(),
pending: list.New(),
shutdown: make(chan struct{}),
}
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(metrics.SegmentMetrics)
if err != nil {
return nil, err
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/storage/wal/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package wal

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type ManagerMetrics struct {
NumAvailable prometheus.Gauge
NumPending prometheus.Gauge
NumFlushing prometheus.Gauge
}

func NewManagerMetrics(r prometheus.Registerer) *ManagerMetrics {
return &ManagerMetrics{
NumAvailable: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_available",
Help: "The number of WAL segments accepting writes.",
}),
NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_pending",
Help: "The number of WAL segments waiting to be flushed.",
}),
NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_flushing",
Help: "The number of WAL segments being flushed.",
}),
}
}

type SegmentMetrics struct {
outputSizeBytes prometheus.Histogram
inputSizeBytes prometheus.Histogram
streams prometheus.Histogram
tenants prometheus.Histogram
}

func NewSegmentMetrics(r prometheus.Registerer) *SegmentMetrics {
return &SegmentMetrics{
outputSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_segment_output_size_bytes",
Help: "The segment size as written to disk (compressed).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
inputSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_segment_input_size_bytes",
Help: "The segment size (uncompressed).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
streams: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_per_segment_streams",
Help: "The number of streams per segment.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
NativeHistogramBucketFactor: 1.1,
}),
tenants: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_per_segment_tenants",
Help: "The number of tenants per segment.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type Metrics struct {
SegmentMetrics *SegmentMetrics
ManagerMetrics *ManagerMetrics
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
ManagerMetrics: NewManagerMetrics(r),
SegmentMetrics: NewSegmentMetrics(r),
}
}
21 changes: 20 additions & 1 deletion pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ type streamID struct {
}

type SegmentWriter struct {
metrics *SegmentMetrics
streams map[streamID]*streamSegment
buf1 encoding.Encbuf
outputSize atomic.Int64
inputSize atomic.Int64
idxWriter *index.Writer
consistencyMtx *sync.RWMutex
Expand Down Expand Up @@ -76,12 +78,13 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
}

// NewWalSegmentWriter creates a new WalSegmentWriter.
func NewWalSegmentWriter() (*SegmentWriter, error) {
func NewWalSegmentWriter(m *SegmentMetrics) (*SegmentWriter, error) {
idxWriter, err := index.NewWriter()
if err != nil {
return nil, err
}
return &SegmentWriter{
metrics: m,
streams: make(map[streamID]*streamSegment, 64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
idxWriter: idxWriter,
Expand Down Expand Up @@ -144,6 +147,20 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
}
}

func (b *SegmentWriter) Observe() {
b.consistencyMtx.Lock()
defer b.consistencyMtx.Unlock()

b.metrics.streams.Observe(float64(len(b.streams)))
tenants := make(map[string]struct{}, len(b.streams))
for _, s := range b.streams {
tenants[s.tenantID] = struct{}{}
}
b.metrics.tenants.Observe(float64(len(tenants)))
b.metrics.inputSizeBytes.Observe(float64(b.inputSize.Load()))
b.metrics.outputSizeBytes.Observe(float64(b.outputSize.Load()))
}

func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
var (
total int64
Expand Down Expand Up @@ -262,6 +279,8 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
}
total += int64(n)

b.outputSize.Store(total)

return total, nil
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestWalSegmentWriter_Append(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
// Create a new WalSegmentWriter
w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
// Append the entries
for _, batch := range tt.batches {
Expand Down Expand Up @@ -168,7 +168,7 @@ func BenchmarkConcurrentAppends(t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
var err error
w, err = NewWalSegmentWriter()
w, err = NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)

for _, lbl := range lbls {
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestConcurrentAppends(t *testing.T) {
}
dst := bytes.NewBuffer(nil)

w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
var wg sync.WaitGroup
workChan := make(chan *appendArgs, 100)
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestConcurrentAppends(t *testing.T) {
}

func TestMultiTenantWrite(t *testing.T) {
w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
dst := bytes.NewBuffer(nil)

Expand Down Expand Up @@ -359,7 +359,7 @@ func TestCompression(t *testing.T) {
}

func testCompression(t *testing.T, maxInputSize int64) {
w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
dst := bytes.NewBuffer(nil)
files := testdata.Files()
Expand Down Expand Up @@ -416,7 +416,7 @@ func testCompression(t *testing.T, maxInputSize int64) {
}

func TestReset(t *testing.T) {
w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
dst := bytes.NewBuffer(nil)

Expand Down Expand Up @@ -490,7 +490,7 @@ func BenchmarkWrites(b *testing.B) {

dst := bytes.NewBuffer(make([]byte, 0, inputSize))

writer, err := NewWalSegmentWriter()
writer, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(b, err)

for _, d := range data {
Expand Down

0 comments on commit 7208d9c

Please sign in to comment.