Skip to content

Commit

Permalink
Merge branch 'main' into docs-send-data-alloy
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Jul 25, 2024
2 parents 2e71b0d + 250f4cf commit 41aa756
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 134 deletions.
2 changes: 1 addition & 1 deletion docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ null
<tr>
<td>backend.targetModule</td>
<td>string</td>
<td>Comma-separated list of Loki modules to load for the read</td>
<td>Comma-separated list of Loki modules to load for the backend</td>
<td><pre lang="json">
"backend"
</pre>
Expand Down
4 changes: 2 additions & 2 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
[querier: <querier>]

querier_rf1:
# Enable the RF1 querier. If set, replaces the usual querier with a RF-1
# querier when using 'ALL' target.
# Enable the RF1 querier. If set, replaces the usual querier with an RF-1
# querier.
# CLI flag: -querier-rf1.enabled
[enabled: <boolean> | default = false]

Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,9 @@ func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error {

func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
start := time.Now()
defer func() {
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
w.ReportMetrics()
}()

i.metrics.flushesTotal.Add(1)
defer func() { i.metrics.flushDuration.Observe(time.Since(start).Seconds()) }()

buf := i.flushBuffers[j]
defer buf.Reset()
Expand All @@ -111,6 +108,9 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
return err
}

stats := wal.GetSegmentStats(w, time.Now())
wal.ReportSegmentStats(stats, i.metrics.segmentMetrics)

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
Expand All @@ -121,7 +121,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
Block: w.Meta(id),
}); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("metastore add block: %w", err)
return fmt.Errorf("failed to update metastore: %w", err)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func New(cfg Config, clientConfig client.Config,
MaxAge: cfg.MaxSegmentAge,
MaxSegments: int64(cfg.MaxSegments),
MaxSegmentSize: int64(cfg.MaxSegmentSize),
}, wal.NewMetrics(registerer))
}, wal.NewManagerMetrics(registerer))
if err != nil {
return nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/ingester-rf1/metastore/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"

metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/util/constants"
)

Expand Down Expand Up @@ -106,12 +106,10 @@ const grpcServiceConfig = `{
func instrumentation(latency *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(latency))

var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(latency))

return unaryInterceptors, streamInterceptors
Expand Down
56 changes: 25 additions & 31 deletions pkg/ingester-rf1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@ package ingesterrf1
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/storage/wal"
)

type flushMetrics struct {
type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushQueues prometheus.Gauge
flushDuration prometheus.Histogram
flushSizeBytes prometheus.Histogram
flushSize prometheus.Histogram
segmentMetrics *wal.SegmentMetrics
}

func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
return &flushMetrics{
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
Expand All @@ -33,37 +52,12 @@ func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_size_bytes",
Help: "The flush size (as written to object storage).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
*flushMetrics
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushMetrics: newFlushMetrics(r),
segmentMetrics: wal.NewSegmentMetrics(r),
}
}
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (t *Loki) setupModuleManager() error {
Write: {Ingester, IngesterRF1, Distributor, PatternIngester},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway, BloomCompactor},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore},
}

if t.Cfg.Querier.PerRequestLimitsEnabled {
Expand Down
6 changes: 6 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,12 @@ func (t *Loki) initAnalytics() (services.Service, error) {
}

func (t *Loki) initMetastore() (services.Service, error) {
if !t.Cfg.IngesterRF1.Enabled {
return nil, nil
}
if t.Cfg.isTarget(All) {
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%s", t.Cfg.Server.GRPCListenAddress)
}
m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier-rf1/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Config struct {
// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Engine.RegisterFlagsWithPrefix("querier-rf1", f)
f.BoolVar(&cfg.Enabled, "querier-rf1.enabled", false, "Enable the RF1 querier. If set, replaces the usual querier with a RF-1 querier when using 'ALL' target.")
f.BoolVar(&cfg.Enabled, "querier-rf1.enabled", false, "Enable the RF1 querier. If set, replaces the usual querier with an RF-1 querier.")
f.DurationVar(&cfg.ExtraQueryDelay, "querier-rf1.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.IntVar(&cfg.MaxConcurrent, "querier-rf1.max-concurrent", 4, "The maximum number of queries that can be simultaneously processed by the querier.")
f.BoolVar(&cfg.PerRequestLimitsEnabled, "querier-rf1.per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.")
Expand Down
32 changes: 7 additions & 25 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

const (
DefaultMaxAge = 500 * time.Millisecond
DefaultMaxSegments = 10
DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB.
)

var (
// ErrClosed is returned when the WAL is closed. It is a permanent error
// as once closed, a WAL cannot be re-opened.
Expand Down Expand Up @@ -109,31 +103,24 @@ type Manager struct {
clock quartz.Clock
}

// segment is similar to PendingSegment, however it is an internal struct used
// in the available and pending lists. It contains a single-use result that is
// returned to callers appending to the WAL and a re-usable segment that is reset
// after each flush.
// segment is an internal struct used in the available and pending lists. It
// contains a single-use result that is returned to callers appending to the
// WAL and a re-usable segment that is reset after each flush.
type segment struct {
r *AppendResult
w *SegmentWriter

// moved is the time the segment was moved to the pending list. It is used
// to calculate the age of the segment. A segment is moved when it has
// exceeded the maximum age or the maximum size.
moved time.Time
}

// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
Moved time.Time
}

func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error) {
m := Manager{
cfg: cfg,
metrics: metrics.ManagerMetrics,
metrics: metrics,
available: list.New(),
pending: list.New(),
clock: quartz.NewReal(),
Expand All @@ -142,7 +129,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter(metrics.SegmentMetrics)
w, err := NewWalSegmentWriter()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -205,11 +192,7 @@ func (m *Manager) NextPending() (*PendingSegment, error) {
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingSegment{
Result: s.r,
Writer: s.w,
Moved: s.moved,
}, nil
return &PendingSegment{Result: s.r, Writer: s.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
Expand All @@ -229,7 +212,6 @@ func (m *Manager) Put(s *PendingSegment) {
// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, s *segment) {
s.moved = m.clock.Now()
m.pending.PushBack(s)
m.metrics.NumPending.Inc()
m.available.Remove(el)
Expand Down
Loading

0 comments on commit 41aa756

Please sign in to comment.