From 2f5485dbb50ab3aa1eb3e0e45e918a1c2070aca3 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 2 Aug 2024 08:45:57 -0600 Subject: [PATCH] feat: improve async tee request processing --- pkg/distributor/distributor.go | 2 + pkg/distributor/tee.go | 7 +++ pkg/ingester-rf1/tee.go | 4 ++ pkg/pattern/aggregation/push.go | 4 -- pkg/pattern/ingester.go | 49 +++++++++++++++++-- pkg/pattern/tee.go | 87 ++++++++++++++++++++++++++------- 6 files changed, 127 insertions(+), 26 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 1fb48b20a0b26..2d740a9f03fae 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -304,6 +304,8 @@ func (d *Distributor) running(ctx context.Context) error { } func (d *Distributor) stopping(_ error) error { + d.tee.Stop() + return services.StopManagerAndAwaitStopped(context.Background(), d.subservices) } diff --git a/pkg/distributor/tee.go b/pkg/distributor/tee.go index 04acb1e22c0df..1905f91b4f81f 100644 --- a/pkg/distributor/tee.go +++ b/pkg/distributor/tee.go @@ -3,6 +3,7 @@ package distributor // Tee implementations can duplicate the log streams to another endpoint. type Tee interface { Duplicate(tenant string, streams []KeyedStream) + Stop() } // WrapTee wraps a new Tee around an existing Tee. @@ -25,3 +26,9 @@ func (m *multiTee) Duplicate(tenant string, streams []KeyedStream) { tee.Duplicate(tenant, streams) } } + +func (m *multiTee) Stop() { + for _, tee := range m.tees { + tee.Stop() + } +} diff --git a/pkg/ingester-rf1/tee.go b/pkg/ingester-rf1/tee.go index 14d5aa87da3d0..b2e2a6018903c 100644 --- a/pkg/ingester-rf1/tee.go +++ b/pkg/ingester-rf1/tee.go @@ -86,3 +86,7 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { t.ingesterAppends.WithLabelValues(addr, "success").Inc() return nil } + +func (t *Tee) Stop() { + // noop +} diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index 94531814f42ca..bf47cc1acf7a9 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -58,7 +58,6 @@ type Push struct { // shutdown channels quit chan struct{} - done chan struct{} // auth username, password string @@ -135,7 +134,6 @@ func NewPush( password: password, logger: logger, quit: make(chan struct{}), - done: make(chan struct{}), backoff: backoffCfg, entries: entries{ entries: make([]entry, 0), @@ -155,7 +153,6 @@ func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) { func (p *Push) Stop() { if p.quit != nil { close(p.quit) - <-p.done p.quit = nil } } @@ -215,7 +212,6 @@ func (p *Push) run(pushPeriod time.Duration) { defer func() { pushTicker.Stop() - close(p.done) }() for { diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 29cf26d5a1d30..79627f5abdda9 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -41,6 +41,8 @@ type Config struct { MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."` MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."` MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."` + TeeParallelism int `yaml:"tee_parallelism,omitempty" doc:"description=The number of parallel goroutines to use for forwarding requests to the pattern ingester."` + TeeBufferSize int `yaml:"tee_buffer_size,omitempty" doc:"Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped."` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -51,11 +53,48 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger) cfg.ClientConfig.RegisterFlags(fs) cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.") - fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") - fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") - fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.") - fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.") + fs.BoolVar( + &cfg.Enabled, + "pattern-ingester.enabled", + false, + "Flag to enable or disable the usage of the pattern-ingester component.", + ) + fs.IntVar( + &cfg.ConcurrentFlushes, + "pattern-ingester.concurrent-flushes", + 32, + "How many flushes can happen concurrently from each stream.", + ) + fs.DurationVar( + &cfg.FlushCheckPeriod, + "pattern-ingester.flush-check-period", + 1*time.Minute, + "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.", + ) + fs.IntVar( + &cfg.MaxClusters, + "pattern-ingester.max-clusters", + drain.DefaultConfig().MaxClusters, + "The maximum number of detected pattern clusters that can be created by the pattern ingester.", + ) + fs.Float64Var( + &cfg.MaxEvictionRatio, + "pattern-ingester.max-eviction-ratio", + drain.DefaultConfig().MaxEvictionRatio, + "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.", + ) + fs.IntVar( + &cfg.TeeParallelism, + "pattern-ingester.tee-parallelism", + 5, + "The number of parallel goroutines to use for forwarding requests to the pattern ingester.", + ) + fs.IntVar( + &cfg.TeeBufferSize, + "pattern-ingester.tee-buffer-size", + 100, + "Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped.", + ) } func (cfg *Config) Validate() error { diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go index bd9387a560659..45715d1ac57ae 100644 --- a/pkg/pattern/tee.go +++ b/pkg/pattern/tee.go @@ -22,6 +22,18 @@ type Tee struct { ingesterAppends *prometheus.CounterVec fallbackIngesterAppends *prometheus.CounterVec + + teedRequests *prometheus.CounterVec + + requestCh chan request + + // shutdown channel + quit chan struct{} +} + +type request struct { + tenant string + stream distributor.KeyedStream } func NewTee( @@ -43,34 +55,53 @@ func NewTee( Name: "pattern_ingester_fallback_appends_total", Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.", }, []string{"ingester", "status"}), + teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_teed_requests_total", + Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.", + }, []string{"tenant", "status"}), cfg: cfg, ringClient: ringClient, + requestCh: make(chan request, cfg.TeeBufferSize), + quit: make(chan struct{}), + } + + for i := 0; i < cfg.TeeParallelism; i++ { + go t.run() } return t, nil } -// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters. -func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - if err := t.sendStream(tenant, stream); err != nil { +func (t *Tee) run() { + for { + select { + case <-t.quit: + return + case req := <-t.requestCh: + ctx, cancel := context.WithTimeout( + user.InjectOrgID(context.Background(), req.tenant), + t.cfg.ClientConfig.RemoteTimeout, + ) + defer cancel() + + if err := t.sendStream(ctx, req.tenant, req.stream); err != nil { level.Error(t.logger).Log("msg", "failed to send stream to pattern ingester", "err", err) } - }(streams[idx]) + } } } -func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { - err := t.sendOwnedStream(tenant, stream) +func (t *Tee) sendStream(ctx context.Context, tenant string, stream distributor.KeyedStream) error { + err := t.sendOwnedStream(ctx, tenant, stream) if err == nil { + // Success, return early return nil } // Pattern ingesters serve 2 functions, processing patterns and aggregating metrics. // Only owned streams are processed for patterns, however any pattern ingester can // aggregate metrics for any stream. Therefore, if we can't send the owned stream, - // try to send it to any pattern ingester so we at least capture the metrics. + // try to forward request to any pattern ingester so we at least capture the metrics. replicationSet, err := t.ringClient.Ring().GetAllHealthy(ring.Read) if replicationSet.Instances == nil { return errors.New("no instances found") @@ -86,11 +117,6 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { }, } - ctx, cancel := context.WithTimeout( - user.InjectOrgID(context.Background(), tenant), - t.cfg.ClientConfig.RemoteTimeout, - ) - defer cancel() _, err = client.(logproto.PatternClient).Push(ctx, req) if err != nil { t.fallbackIngesterAppends.WithLabelValues(addr, "fail").Inc() @@ -105,7 +131,7 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { return err } -func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) error { +func (t *Tee) sendOwnedStream(ctx context.Context, tenant string, stream distributor.KeyedStream) error { var descs [1]ring.InstanceDesc replicationSet, err := t.ringClient.Ring().Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) if err != nil { @@ -125,8 +151,6 @@ func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) err }, } - ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), t.cfg.ClientConfig.RemoteTimeout) - defer cancel() _, err = client.(logproto.PatternClient).Push(ctx, req) if err != nil { t.ingesterAppends.WithLabelValues(addr, "fail").Inc() @@ -135,3 +159,32 @@ func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) err t.ingesterAppends.WithLabelValues(addr, "success").Inc() return nil } + +// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters. +func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { + for idx := range streams { + go func(stream distributor.KeyedStream) { + req := request{ + tenant: tenant, + stream: stream, + } + + // We need to prioritize protecting distributors to prevent bigger problems to the system, so + // we respond to backpressure by dropping requests if the channel is full + select { + case t.requestCh <- req: + t.teedRequests.WithLabelValues(tenant, "queued").Inc() + return + default: + t.teedRequests.WithLabelValues(tenant, "dropped").Inc() + return + } + }(streams[idx]) + } +} + +// Stop will cancel any ongoing requests and stop the goroutine listening for requests +func (t *Tee) Stop() { + close(t.quit) + t.requestCh = make(chan request, t.cfg.TeeBufferSize) +}