diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f2e42bf71e22..2ccae2e7c874 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -61,7 +61,8 @@ const ( ringAutoForgetUnhealthyPeriods = 2 - levelLabel = "detected_level" + LevelLabel = "detected_level" + LogLevelUnknown = "unknown" logLevelDebug = "debug" logLevelInfo = "info" logLevelWarn = "warn" @@ -69,7 +70,6 @@ const ( logLevelFatal = "fatal" logLevelCritical = "critical" logLevelTrace = "trace" - logLevelUnknown = "unknown" ) var ( @@ -406,9 +406,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } else { logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata) } - if logLevel != logLevelUnknown && logLevel != "" { + if logLevel != LogLevelUnknown && logLevel != "" { entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{ - Name: levelLabel, + Name: LevelLabel, Value: logLevel, }) } @@ -902,7 +902,7 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels. return logLevelInfo } if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) { - return logLevelUnknown + return LogLevelUnknown } else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) { return logLevelTrace } else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) { @@ -916,7 +916,7 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels. } else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) { return logLevelFatal } - return logLevelUnknown + return LogLevelUnknown } return extractLogLevelFromLogLine(entry.Line) @@ -1016,5 +1016,5 @@ func detectLevelFromLogLine(log string) string { if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") { return logLevelDebug } - return logLevelUnknown + return LogLevelUnknown } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ff41e498a13c..404c1f01d6c4 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -30,9 +30,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/loki/pkg/push" loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" @@ -1577,7 +1578,7 @@ func Test_DetectLogLevels(t *testing.T) { require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) require.Equal(t, push.LabelsAdapter{ { - Name: levelLabel, + Name: LevelLabel, Value: logLevelWarn, }, }, topVal.Streams[0].Entries[0].StructuredMetadata) @@ -1594,7 +1595,7 @@ func Test_DetectLogLevels(t *testing.T) { require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels) sm := topVal.Streams[0].Entries[0].StructuredMetadata require.Len(t, sm, 1) - require.Equal(t, sm[0].Name, levelLabel) + require.Equal(t, sm[0].Name, LevelLabel) require.Equal(t, sm[0].Value, logLevelDebug) }) @@ -1619,7 +1620,7 @@ func Test_DetectLogLevels(t *testing.T) { Name: "severity", Value: logLevelWarn, }, { - Name: levelLabel, + Name: LevelLabel, Value: logLevelWarn, }, }, sm) @@ -1662,7 +1663,7 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) { entry: logproto.Entry{ Line: "foo", }, - expectedLogLevel: logLevelUnknown, + expectedLogLevel: LogLevelUnknown, }, { name: "non otlp with log level keywords in log line", @@ -1746,7 +1747,7 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) { entry: logproto.Entry{ Line: `foo=bar msg="message with keyword but it should not get picked up" level=NA`, }, - expectedLogLevel: logLevelUnknown, + expectedLogLevel: LogLevelUnknown, }, { name: "logfmt log line with label Severity is allowed for level detection", @@ -1799,7 +1800,7 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) { for i := 0; i < b.N; i++ { level := extractLogLevelFromLogLine(logLine) - require.Equal(b, logLevelUnknown, level) + require.Equal(b, LogLevelUnknown, level) } } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ffbfc23acb8e..bb7ff7b33bf8 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -353,7 +353,7 @@ type Loki struct { IngesterRF1 ingester_rf1.Interface IngesterRF1RingClient *ingester_rf1.RingClient PatternIngester *pattern.Ingester - PatternRingClient *pattern.RingClient + PatternRingClient pattern.RingClient Querier querier.Querier cacheGenerationLoader queryrangebase.CacheGenNumberLoader querierAPI *querier.QuerierAPI diff --git a/pkg/pattern/aggregation/config.go b/pkg/pattern/aggregation/config.go index 345e9ae8b620..c40509347519 100644 --- a/pkg/pattern/aggregation/config.go +++ b/pkg/pattern/aggregation/config.go @@ -10,15 +10,15 @@ import ( type Config struct { // TODO(twhitney): This needs to be a per-tenant config - Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` - DownsamplePeriod time.Duration `yaml:"downsample_period"` - LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."` - WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."` - PushPeriod time.Duration `yaml:"push_period,omitempty" doc:"description=How long to wait in between pushes to Loki."` - HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,omitempty" doc:"description=The HTTP client configuration for pushing metrics to Loki."` - UseTLS bool `yaml:"use_tls,omitempty" doc:"description=Whether to use TLS for pushing metrics to Loki."` - BasicAuth BasicAuth `yaml:"basic_auth,omitempty" doc:"description=The basic auth configuration for pushing metrics to Loki."` - BackoffConfig backoff.Config `yaml:"backoff_config,omitempty" doc:"description=The backoff configuration for pushing metrics to Loki."` + Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` + DownsamplePeriod time.Duration `yaml:"downsample_period"` + LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."` + WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."` + PushPeriod time.Duration `yaml:"push_period,omitempty" doc:"description=How long to wait in between pushes to Loki."` + HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,omitempty" doc:"description=The HTTP client configuration for pushing metrics to Loki."` + UseTLS bool `yaml:"use_tls,omitempty" doc:"description=Whether to use TLS for pushing metrics to Loki."` + BasicAuth BasicAuth `yaml:"basic_auth,omitempty" doc:"description=The basic auth configuration for pushing metrics to Loki."` + BackoffConfig backoff.Config `yaml:"backoff_config,omitempty" doc:"description=The backoff configuration for pushing metrics to Loki."` } // RegisterFlags registers pattern ingester related flags. diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index 73bf087f48fd..94531814f42c 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -11,12 +11,15 @@ import ( "sync" "time" + "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/golang/snappy" "github.com/prometheus/common/config" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/util/build" @@ -307,3 +310,25 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) { return status, err } + +func AggregatedMetricEntry( + ts model.Time, + totalBytes, totalCount uint64, + service string, + lbls labels.Labels, +) string { + byteString := humanize.Bytes(totalBytes) + base := fmt.Sprintf( + "ts=%d bytes=%s count=%d %s=%s", + ts.UnixNano(), + byteString, + totalCount, + push.LabelServiceName, service, + ) + + for _, l := range lbls { + base += fmt.Sprintf(" %s=%s", l.Name, l.Value) + } + + return base +} diff --git a/pkg/pattern/flush_test.go b/pkg/pattern/flush_test.go index 9ee4bd436992..14190ecc44cc 100644 --- a/pkg/pattern/flush_test.go +++ b/pkg/pattern/flush_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" + ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" @@ -22,7 +23,8 @@ import ( ) func TestSweepInstance(t *testing.T) { - ing, err := New(defaultIngesterTestConfig(t), "foo", nil, log.NewNopLogger()) + ringClient := &fakeRingClient{} + ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger()) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck err = services.StartAndAwaitRunning(context.Background(), ing) @@ -95,3 +97,118 @@ func defaultIngesterTestConfig(t testing.TB) Config { return cfg } + +type fakeRingClient struct{} + +func (f *fakeRingClient) Pool() *ring_client.Pool { + panic("not implemented") +} + +func (f *fakeRingClient) StartAsync(_ context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) AwaitRunning(_ context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) StopAsync() { + panic("not implemented") +} + +func (f *fakeRingClient) AwaitTerminated(_ context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) FailureCase() error { + panic("not implemented") +} + +func (f *fakeRingClient) State() services.State { + panic("not implemented") +} + +func (f *fakeRingClient) AddListener(_ services.Listener) { + panic("not implemented") +} + +func (f *fakeRingClient) Ring() ring.ReadRing { + return &fakeRing{} +} + +type fakeRing struct{} + +// InstancesWithTokensCount returns the number of instances in the ring that have tokens. +func (f *fakeRing) InstancesWithTokensCount() int { + panic("not implemented") // TODO: Implement +} + +// InstancesInZoneCount returns the number of instances in the ring that are registered in given zone. +func (f *fakeRing) InstancesInZoneCount(_ string) int { + panic("not implemented") // TODO: Implement +} + +// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens. +func (f *fakeRing) InstancesWithTokensInZoneCount(_ string) int { + panic("not implemented") // TODO: Implement +} + +// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring. +func (f *fakeRing) ZonesCount() int { + panic("not implemented") // TODO: Implement +} + +func (f *fakeRing) Get( + _ uint32, + _ ring.Operation, + _ []ring.InstanceDesc, + _ []string, + _ []string, +) (ring.ReplicationSet, error) { + panic("not implemented") +} + +func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { + return ring.ReplicationSet{}, nil +} + +func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) { + return ring.ReplicationSet{}, nil +} + +func (f *fakeRing) ReplicationFactor() int { + panic("not implemented") +} + +func (f *fakeRing) InstancesCount() int { + panic("not implemented") +} + +func (f *fakeRing) ShuffleShard(_ string, _ int) ring.ReadRing { + panic("not implemented") +} + +func (f *fakeRing) GetInstanceState(_ string) (ring.InstanceState, error) { + panic("not implemented") +} + +func (f *fakeRing) ShuffleShardWithLookback( + _ string, + _ int, + _ time.Duration, + _ time.Time, +) ring.ReadRing { + panic("not implemented") +} + +func (f *fakeRing) HasInstance(_ string) bool { + panic("not implemented") +} + +func (f *fakeRing) CleanupShuffleShardCache(_ string) { + panic("not implemented") +} + +func (f *fakeRing) GetTokenRangesForInstance(_ string) (ring.TokenRanges, error) { + panic("not implemented") +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 9aaf49a4b9ca..c17620779128 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -68,7 +68,7 @@ func (cfg *Config) Validate() error { type Ingester struct { services.Service lifecycler *ring.Lifecycler - ringClient *RingClient + ringClient RingClient lifecyclerWatcher *services.FailureWatcher @@ -92,7 +92,7 @@ type Ingester struct { func New( cfg Config, - ringClient *RingClient, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -172,7 +172,7 @@ func (i *Ingester) stopping(_ error) error { flushQueue.Close() } i.flushQueuesDone.Wait() - i.stopWriters() + i.stopWriters() return err } @@ -204,7 +204,7 @@ func (i *Ingester) loop() { flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) defer flushTicker.Stop() -if i.cfg.MetricAggregation.Enabled { + if i.cfg.MetricAggregation.Enabled { downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) defer downsampleTicker.Stop() for { @@ -335,7 +335,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / i.drainCfg, i.ringClient, i.lifecycler.ID, - writer, + writer, ) if err != nil { return nil, err diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 2220a2ef41d8..9cc8a4502144 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -27,7 +27,7 @@ type IngesterQuerier struct { cfg Config logger log.Logger - ringClient *RingClient + ringClient RingClient registerer prometheus.Registerer ingesterQuerierMetrics *ingesterQuerierMetrics @@ -35,7 +35,7 @@ type IngesterQuerier struct { func NewIngesterQuerier( cfg Config, - ringClient *RingClient, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -128,7 +128,7 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, m // ForAllIngesters runs f, in parallel, for all ingesters func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) { - replicationSet, err := q.ringClient.ring.GetAllHealthy(ring.Read) + replicationSet, err := q.ringClient.Ring().GetAllHealthy(ring.Read) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ingester := ingester i := i g.Go(func() error { - client, err := q.ringClient.pool.GetClientFor(ingester.Addr) + client, err := q.ringClient.Pool().GetClientFor(ingester.Addr) if err != nil { return err } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 90b1845a90c3..6d24ab3b4367 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" @@ -20,11 +21,20 @@ import ( func TestInstancePushQuery(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) + ringClient := &fakeRingClient{} + ingesterID := "foo" + + mockWriter := &mockEntryWriter{} + mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything) + inst, err := newInstance( "foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"), drain.DefaultConfig(), + ringClient, + ingesterID, + mockWriter, ) require.NoError(t, err) @@ -68,3 +78,15 @@ func TestInstancePushQuery(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(res.Series)) } + +type mockEntryWriter struct { + mock.Mock +} + +func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels) { + _ = m.Called(ts, entry, lbls) +} + +func (m *mockEntryWriter) Stop() { + _ = m.Called() +} diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 64c3576d0cb3..5f33769dd370 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -7,7 +7,6 @@ import ( "net/http" "sync" - "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/multierror" @@ -15,7 +14,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/v3/pkg/detection" + "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -40,7 +39,7 @@ type instance struct { logger log.Logger metrics *ingesterMetrics drainCfg *drain.Config - ringClient *RingClient + ringClient RingClient ingesterID string aggMetricsLock sync.Mutex @@ -54,7 +53,15 @@ type aggregatedMetrics struct { count uint64 } -func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, drainCfg *drain.Config, ringClient *RingClient, ingesterID string, writer aggregation.EntryWriter) (*instance, error) { +func newInstance( + instanceID string, + logger log.Logger, + metrics *ingesterMetrics, + drainCfg *drain.Config, + ringClient RingClient, + ingesterID string, + writer aggregation.EntryWriter, +) (*instance, error) { index, err := index.NewBitPrefixWithShards(indexShards) if err != nil { return nil, err @@ -85,7 +92,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { // All streams are observed for metrics i.Observe(reqStream.Labels, reqStream.Entries) - // But only owned streamd are processed for patterns + // But only owned streamed are processed for patterns ownedStream, err := i.isOwnedStream(i.ingesterID, reqStream.Labels) if err != nil { appendErr.Add(err) @@ -117,7 +124,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { func (i *instance) isOwnedStream(ingesterID string, stream string) (bool, error) { var descs [1]ring.InstanceDesc - replicationSet, err := i.ringClient.ring.Get( + replicationSet, err := i.ringClient.Ring().Get( lokiring.TokenFor(i.instanceID, stream), ring.WriteNoExtend, descs[:0], @@ -293,7 +300,7 @@ func (i *instance) writeAggregatedMetrics( level := streamLbls.Get("level") if level == "" { - level = detection.LogLevelUnknown + level = distributor.LogLevelUnknown } newLbls := labels.Labels{ @@ -304,30 +311,8 @@ func (i *instance) writeAggregatedMetrics( if i.writer != nil { i.writer.WriteEntry( now.Time(), - AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls), + aggregation.AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls), newLbls, ) } } - -func AggregatedMetricEntry( - ts model.Time, - totalBytes, totalCount uint64, - service string, - lbls labels.Labels, -) string { - byteString := humanize.Bytes(totalBytes) - base := fmt.Sprintf( - "ts=%d bytes=%s count=%d %s=%s", - ts.UnixNano(), - byteString, - totalCount, - push.LabelServiceName, service, - ) - - for _, l := range lbls { - base += fmt.Sprintf(" %s=%s", l.Name, l.Value) - } - - return base -} diff --git a/pkg/pattern/ring_client.go b/pkg/pattern/ring_client.go index 3ceaf481a3b9..0d49cfe42379 100644 --- a/pkg/pattern/ring_client.go +++ b/pkg/pattern/ring_client.go @@ -13,7 +13,13 @@ import ( "github.com/grafana/loki/v3/pkg/pattern/clientpool" ) -type RingClient struct { +type RingClient interface { + services.Service + Ring() ring.ReadRing + Pool() *ring_client.Pool +} + +type ringClient struct { cfg Config logger log.Logger @@ -29,10 +35,10 @@ func NewRingClient( metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, -) (*RingClient, error) { +) (RingClient, error) { var err error registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - ringClient := &RingClient{ + ringClient := &ringClient{ logger: log.With(logger, "component", "pattern-ring-client"), cfg: cfg, } @@ -59,19 +65,55 @@ func NewRingClient( return ringClient, nil } -func (q *RingClient) starting(ctx context.Context) error { - return services.StartManagerAndAwaitHealthy(ctx, q.subservices) +func (r *ringClient) starting(ctx context.Context) error { + return services.StartManagerAndAwaitHealthy(ctx, r.subservices) } -func (q *RingClient) running(ctx context.Context) error { +func (r *ringClient) running(ctx context.Context) error { select { case <-ctx.Done(): return nil - case err := <-q.subservicesWatcher.Chan(): + case err := <-r.subservicesWatcher.Chan(): return fmt.Errorf("pattern tee subservices failed: %w", err) } } -func (q *RingClient) stopping(_ error) error { - return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) +func (r *ringClient) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), r.subservices) +} + +func (r *ringClient) Ring() ring.ReadRing { + return r.ring +} + +func (r *ringClient) Pool() *ring_client.Pool { + return r.pool +} + +func (r *ringClient) StartAsync(ctx context.Context) error { + return r.ring.StartAsync(ctx) +} + +func (r *ringClient) AwaitRunning(ctx context.Context) error { + return r.ring.AwaitRunning(ctx) +} + +func (r *ringClient) StopAsync() { + r.ring.StopAsync() +} + +func (r *ringClient) AwaitTerminated(ctx context.Context) error { + return r.ring.AwaitTerminated(ctx) +} + +func (r *ringClient) FailureCase() error { + return r.ring.FailureCase() +} + +func (r *ringClient) State() services.State { + return r.ring.State() +} + +func (r *ringClient) AddListener(listener services.Listener) { + r.ring.AddListener(listener) } diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go index dd8f36540582..bd9387a56065 100644 --- a/pkg/pattern/tee.go +++ b/pkg/pattern/tee.go @@ -18,7 +18,7 @@ import ( type Tee struct { cfg Config logger log.Logger - ringClient *RingClient + ringClient RingClient ingesterAppends *prometheus.CounterVec fallbackIngesterAppends *prometheus.CounterVec @@ -26,7 +26,7 @@ type Tee struct { func NewTee( cfg Config, - ringClient *RingClient, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -71,14 +71,14 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { // Only owned streams are processed for patterns, however any pattern ingester can // aggregate metrics for any stream. Therefore, if we can't send the owned stream, // try to send it to any pattern ingester so we at least capture the metrics. - replicationSet, err := t.ringClient.ring.GetAllHealthy(ring.Read) + replicationSet, err := t.ringClient.Ring().GetAllHealthy(ring.Read) if replicationSet.Instances == nil { return errors.New("no instances found") } for _, instance := range replicationSet.Instances { addr := instance.Addr - client, err := t.ringClient.pool.GetClientFor(addr) + client, err := t.ringClient.Pool().GetClientFor(addr) if err != nil { req := &logproto.PushRequest{ Streams: []logproto.Stream{ @@ -107,7 +107,7 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) error { var descs [1]ring.InstanceDesc - replicationSet, err := t.ringClient.ring.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) + replicationSet, err := t.ringClient.Ring().Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) if err != nil { return err } @@ -115,7 +115,7 @@ func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) err return errors.New("no instances found") } addr := replicationSet.Instances[0].Addr - client, err := t.ringClient.pool.GetClientFor(addr) + client, err := t.ringClient.Pool().GetClientFor(addr) if err != nil { return err }