diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 6be600726dc..27395161941 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -24,21 +24,28 @@ import ( "github.com/grafana/mimir/pkg/util/spanlogger" ) +const shardForSeriesBuffer = 2000 // TODO dimitarvdimitrov 2000 is arbitrary; the idea is that we don't block the goroutine calling PushToStorage while we're flushing. A linked list with a sync.Cond or something different would also work + type Pusher interface { PushToStorage(context.Context, *mimirpb.WriteRequest) error } type PusherCloser interface { - Pusher + PushToStorage(context.Context, *mimirpb.WriteRequest) error + // Calls to close are safe and will not be called concurrenctly. Close() []error } +// pusherConsumer receivers records from Kafka and pushes them to the storage. +// Each time a batch of records is received from Kafka, we instantiate a new pusherConsumer, this is to ensure we can retry if necessary and know whether we have completed that batch or not. type pusherConsumer struct { fallbackClientErrSampler *util_log.Sampler metrics *pusherConsumerMetrics logger log.Logger - pusher PusherCloser + kafkaConfig KafkaConfig + + pusher Pusher } type pusherConsumerMetrics struct { @@ -49,6 +56,7 @@ type pusherConsumerMetrics struct { totalRequests prometheus.Counter } +// newPusherConsumerMetrics creates a new pusherConsumerMetrics instance. func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics { errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_reader_records_failed_total", @@ -78,20 +86,14 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics } } +// newPusherConsumer creates a new pusherConsumer instance. func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer { - var p PusherCloser - if kafkaCfg.ReplayShards == 0 { - p = newNoopPusherCloser(metrics, pusher) - } else { - p = newMultiTenantPusher(metrics, pusher, kafkaCfg.ReplayShards, kafkaCfg.BatchSize) - } - return &pusherConsumer{ + pusher: pusher, + kafkaConfig: kafkaCfg, metrics: metrics, logger: logger, fallbackClientErrSampler: util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), - - pusher: p, } } @@ -147,6 +149,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { } }(ctx, records, recordsChannel) + writer := c.newStorageWriter() for r := range recordsChannel { if r.err != nil { level.Error(c.logger).Log("msg", "failed to parse write request; skipping", "err", r.err) @@ -154,7 +157,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { } // If we get an error at any point, we need to stop processing the records. They will be retried at some point. - err := c.pushToStorage(r.ctx, r.tenantID, r.WriteRequest) + err := c.pushToStorage(r.ctx, r.tenantID, r.WriteRequest, writer) if err != nil { cancel(cancellation.NewErrorf("error while pushing to storage")) // Stop the unmarshalling goroutine. return fmt.Errorf("consuming record at index %d for tenant %s: %w", r.index, r.tenantID, err) @@ -162,12 +165,11 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { } cancel(cancellation.NewErrorf("done unmarshalling records")) - return nil -} -func (c pusherConsumer) Close(ctx context.Context) error { + // We need to tell the storage writer that we're done and no more records are coming. + // err := c.close(ctx, writer) spanLog := spanlogger.FromContext(ctx, log.NewNopLogger()) - errs := c.pusher.Close() + errs := writer.Close() for eIdx := 0; eIdx < len(errs); eIdx++ { err := errs[eIdx] isServerErr := c.handlePushErr(ctx, "TODO", err, spanLog) @@ -177,10 +179,19 @@ func (c pusherConsumer) Close(ctx context.Context) error { eIdx-- } } + return multierror.New(errs...).Err() } -func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest) error { +func (c pusherConsumer) newStorageWriter() PusherCloser { + if c.kafkaConfig.ReplayShards == 0 { + return newSequentialStoragePusher(c.metrics, c.pusher) + } + + return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.ReplayShards, c.kafkaConfig.BatchSize, c.logger) +} + +func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error { spanLog, ctx := spanlogger.NewWithLogger(ctx, c.logger, "pusherConsumer.pushToStorage") defer spanLog.Finish() @@ -188,7 +199,7 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req // Note that the implementation of the Pusher expects the tenantID to be in the context. ctx = user.InjectOrgID(ctx, tenantID) - err := c.pusher.PushToStorage(ctx, req) + err := writer.PushToStorage(ctx, req) // TODO dimitarvdimitrov processing time is flawed because it's only counting enqueuing time, not processing time. c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds()) @@ -241,163 +252,299 @@ func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bo return optional.ShouldLog(ctx) } -type multiTenantPusher struct { +// sequentialStoragePusher receives mimirpb.WriteRequest which are then pushed to the storage one by one. +type sequentialStoragePusher struct { metrics *pusherConsumerMetrics - pushers map[string]*shardingPusher + pusher Pusher +} + +// newSequentialStoragePusher creates a new sequentialStoragePusher instance. +func newSequentialStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher) sequentialStoragePusher { + return sequentialStoragePusher{ + metrics: metrics, + pusher: pusher, + } +} + +// PushToStorage implements the PusherCloser interface. +func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { + // TODO: What about time?? + ssp.metrics.numTimeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) + return ssp.pusher.PushToStorage(ctx, wr) +} + +// Close implements the PusherCloser interface. +func (ssp sequentialStoragePusher) Close() []error { + return nil +} + +// parallelStoragePusher receives WriteRequest which are then pushed to the storage in parallel. +// The parallelism is two-tiered which means that we first parallelize by tenantID and then by series. +type parallelStoragePusher struct { + metrics *pusherConsumerMetrics + logger log.Logger + + pushers map[string]*parallelStorageShards upstreamPusher Pusher numShards int batchSize int } -func (c multiTenantPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { - user, _ := user.ExtractOrgID(ctx) - return c.pusher(user).PushToStorage(ctx, request) -} - -// TODO dimitarvdimitrov rename because this is multi-tenant sharding pusher -func newMultiTenantPusher(metrics *pusherConsumerMetrics, upstream Pusher, numShards int, batchSize int) *multiTenantPusher { - return &multiTenantPusher{ - pushers: make(map[string]*shardingPusher), - upstreamPusher: upstream, +// newParallelStoragePusher creates a new parallelStoragePusher instance. +func newParallelStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher { + return ¶llelStoragePusher{ + logger: log.With(logger, "component", "parallel-storage-pusher"), + pushers: make(map[string]*parallelStorageShards), + upstreamPusher: pusher, numShards: numShards, batchSize: batchSize, metrics: metrics, } } -func (c multiTenantPusher) pusher(userID string) *shardingPusher { - if p := c.pushers[userID]; p != nil { - return p +// PushToStorage implements the PusherCloser interface. +func (c parallelStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + level.Error(c.logger).Log("msg", "failed to extract tenant ID from context", "err", err) } - const shardingPusherBuffer = 2000 // TODO dimitarvdimitrov 2000 is arbitrary; the idea is that we don't block the goroutine calling PushToStorage while we're flushing. A linked list with a sync.Cond or something different would also work - p := newShardingPusher(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, shardingPusherBuffer, c.upstreamPusher) // TODO dimitarvdimitrov this ok or do we need to inject a factory here too? - c.pushers[userID] = p - return p + + shards := c.shardsFor(userID) + return shards.ShardWriteRequest(ctx, wr) } -func (c multiTenantPusher) Close() []error { +// Close implements the PusherCloser interface. +func (c parallelStoragePusher) Close() []error { var errs multierror.MultiError for _, p := range c.pushers { - errs.Add(p.close()) + errs.Add(p.Stop()) } clear(c.pushers) return errs } -type shardingPusher struct { - numShards int - shards []chan flushableWriteRequest - unfilledShards []flushableWriteRequest - upstream Pusher - wg *sync.WaitGroup - errs chan error - batchSize int +// shardsFor returns the parallelStorageShards for the given userID. Once created the same shards are re-used for the same userID. +// We create a shard for each tenantID to parallelize the writes. +func (c parallelStoragePusher) shardsFor(userID string) *parallelStorageShards { + if p := c.pushers[userID]; p != nil { + return p + } + // Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes. + hashLabels := labels.Labels.Hash + p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, shardForSeriesBuffer, c.upstreamPusher, hashLabels) + c.pushers[userID] = p + return p +} + +type labelsHashFunc func(labels.Labels) uint64 + +// parallelStorageShards is a collection of shards that are used to parallelize the writes to the storage by series. +// Each series is hashed to a shard that contains its own batchingQueue. +type parallelStorageShards struct { numTimeSeriesPerFlush prometheus.Histogram + + pusher Pusher + hashLabels labelsHashFunc + + numShards int + batchSize int + capacity int + + wg *sync.WaitGroup + shards []*batchingQueue } -// TODO dimitarvdimitrov if this is expensive, consider having this long-lived and not Close()-ing and recreating it on every fetch, but instead calling something like Flush() on it. -func newShardingPusher(numTimeSeriesPerFlush prometheus.Histogram, numShards int, batchSize int, buffer int, upstream Pusher) *shardingPusher { - pusher := &shardingPusher{ +// flushableWriteRequest is a WriteRequest that can be flushed to the storage. It represents the current batch of time series that are to be flushed. +type flushableWriteRequest struct { + *mimirpb.WriteRequest + context.Context +} + +// newParallelStorageShards creates a new parallelStorageShards instance. +func newParallelStorageShards(numTimeSeriesPerFlush prometheus.Histogram, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards { + p := ¶llelStorageShards{ numShards: numShards, - upstream: upstream, + pusher: pusher, + hashLabels: hashLabels, + capacity: capacity, numTimeSeriesPerFlush: numTimeSeriesPerFlush, batchSize: batchSize, wg: &sync.WaitGroup{}, - errs: make(chan error, numShards), - unfilledShards: make([]flushableWriteRequest, numShards), } - shards := make([]chan flushableWriteRequest, numShards) - pusher.wg.Add(numShards) - for i := range shards { - pusher.unfilledShards[i].WriteRequest = &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()} - shards[i] = make(chan flushableWriteRequest, buffer) - go pusher.runShard(shards[i]) - } - go func() { - pusher.wg.Wait() - close(pusher.errs) - }() - pusher.shards = shards - return pusher + p.start() + + return p } -func (p *shardingPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { +// ShardWriteRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard. +func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *mimirpb.WriteRequest) error { var ( builder labels.ScratchBuilder nonCopiedLabels labels.Labels errs multierror.MultiError ) + for _, ts := range request.Timeseries { mimirpb.FromLabelAdaptersOverwriteLabels(&builder, ts.Labels, &nonCopiedLabels) - shard := nonCopiedLabels.Hash() % uint64(p.numShards) - - s := p.unfilledShards[shard] - // TODO dimitarvdimitrov support metadata and the rest of the fields; perhaps cut a new request for different values of SkipLabelNameValidation? - s.Timeseries = append(s.Timeseries, ts) - s.Context = ctx // retain the last context in case we have to flush it when closing shardingPusher + shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards) + + // TODO: Add metrics to measure how long are items sitting in the queue before they are flushed. + if err := p.shards[shard].AddToBatch(ctx, ts); err != nil { + // TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error. + // We'll do that in the next PR as otherwise it's too many changes right now. + if !mimirpb.IsClientError(err) { + return err + } - if len(s.Timeseries) < p.batchSize { - continue - } - p.unfilledShards[shard] = flushableWriteRequest{ - WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}, + errs.Add(err) } + } - tryPush: - for { - select { - case p.shards[shard] <- s: - break tryPush - case err := <-p.errs: - // we might have to first unblock the shard before we can flush to it - errs.Add(err) - } - } + // We might some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming. + // Return whatever errors we have now, we'll call stop eventually and collect the rest. + return errs.Err() +} - // drain errors to avoid blocking the shard loop - drainErrs: - for { - select { - case err := <-p.errs: - errs.Add(err) - default: - break drainErrs - } - } +// Stop stops all the shards and waits for them to finish. +func (p *parallelStorageShards) Stop() error { + var errs multierror.MultiError + + for _, shard := range p.shards { + errs.Add(shard.Close()) } + + p.wg.Wait() + return errs.Err() } -type flushableWriteRequest struct { - *mimirpb.WriteRequest - context.Context +// start starts the shards, each in its own goroutine. +func (p *parallelStorageShards) start() { + shards := make([]*batchingQueue, p.numShards) + p.wg.Add(p.numShards) + + for i := range shards { + shards[i] = newBatchingQueue(p.capacity, p.batchSize) + go p.run(shards[i]) + } + + p.shards = shards } -func (p *shardingPusher) runShard(toFlush chan flushableWriteRequest) { +// run runs the batchingQueue for the shard. +func (p *parallelStorageShards) run(queue *batchingQueue) { defer p.wg.Done() - for wr := range toFlush { + defer queue.Done() + + for wr := range queue.Channel() { p.numTimeSeriesPerFlush.Observe(float64(len(wr.WriteRequest.Timeseries))) - err := p.upstream.PushToStorage(wr.Context, wr.WriteRequest) + err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest) if err != nil { - p.errs <- err + queue.ErrorChannel() <- err } } } -func (p *shardingPusher) close() error { - for shard, wr := range p.unfilledShards { - if len(wr.Timeseries) > 0 { - p.shards[shard] <- wr - } +// batchingQueue is a queue that batches the incoming time series according to the batch size. +// Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method. +type batchingQueue struct { + ch chan flushableWriteRequest + errCh chan error + done chan struct{} + + currentBatch flushableWriteRequest + batchSize int +} + +// newBatchingQueue creates a new batchingQueue instance. +func newBatchingQueue(capacity int, batchSize int) *batchingQueue { + return &batchingQueue{ + ch: make(chan flushableWriteRequest, capacity), + errCh: make(chan error, capacity), + done: make(chan struct{}), + currentBatch: flushableWriteRequest{WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}}, + batchSize: batchSize, } - for _, shard := range p.shards { - close(shard) +} + +// AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel(). +// If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed. +func (q *batchingQueue) AddToBatch(ctx context.Context, ts mimirpb.PreallocTimeseries) error { + q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts) + q.currentBatch.Context = ctx + + if len(q.currentBatch.Timeseries) >= q.batchSize { + if err := q.push(); err != nil { + return err + } } + + return nil +} + +// Close closes the batchingQueue, it'll push the current branch to the channel if it's not empty. +// and then close the channel. +func (q *batchingQueue) Close() error { var errs multierror.MultiError - for err := range p.errs { - errs.Add(err) + if len(q.currentBatch.Timeseries) > 0 { + if err := q.push(); err != nil { + errs.Add(err) + } } + + close(q.ch) + <-q.done + + errs = append(errs, q.collectErrors()...) + close(q.errCh) return errs.Err() } + +// Channel returns the channel where the batches are pushed. +func (q *batchingQueue) Channel() <-chan flushableWriteRequest { + return q.ch +} + +// ErrorChannel returns the channel where errors are pushed. +func (q *batchingQueue) ErrorChannel() chan<- error { + return q.errCh +} + +// Done signals the queue that there is no more data coming for both the channel and the error channel. +// It is necessary to ensure we don't close the channel before all the data is flushed. +func (q *batchingQueue) Done() { + close(q.done) +} + +// push pushes the current batch to the channel and resets the current batch. +// It also collects any errors that might have occurred while pushing the batch. +func (q *batchingQueue) push() error { + errs := q.collectErrors() + + q.ch <- q.currentBatch + q.resetCurrentBatch() + + return errs.Err() +} + +// resetCurrentBatch resets the current batch to an empty state. +func (q *batchingQueue) resetCurrentBatch() { + q.currentBatch = flushableWriteRequest{ + WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}, + } +} + +func (q *batchingQueue) collectErrors() multierror.MultiError { + var errs multierror.MultiError + + for { + select { + case err := <-q.errCh: + errs.Add(err) + default: + return errs + } + } +} diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index c70d16bc157..fabd5da3860 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/go-kit/log" "github.com/gogo/status" @@ -17,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -392,8 +394,7 @@ func (m *mockPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRe return args.Error(0) } -func TestShardingPusher(t *testing.T) { - t.Skipf("skipping because this is producing different results on the CI than locally because of the Prometheus label hashing") +func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { noopHistogram := promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{Name: "noop", NativeHistogramBucketFactor: 1.1}) testCases := map[string]struct { @@ -565,6 +566,9 @@ func TestShardingPusher(t *testing.T) { {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, }, expectedErrsCount: 1, // at least one of those should fail because the first flush failed @@ -581,8 +585,12 @@ func TestShardingPusher(t *testing.T) { mockPreallocTimeseries("series_3_8"), mockPreallocTimeseries("series_3_8"), }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_8"), + mockPreallocTimeseries("series_3_8"), + }}, }, - upstreamPushErrs: []error{assert.AnError, nil, nil}, + upstreamPushErrs: []error{assert.AnError, nil, nil, nil}, expectedCloseErr: nil, }, } @@ -601,19 +609,19 @@ func TestShardingPusher(t *testing.T) { pusher := &mockPusher{} // run with a buffer of one, so some of the tests can fill the buffer and test the error handling const buffer = 1 - shardingP := newShardingPusher(noopHistogram, tc.shardCount, tc.batchSize, buffer, pusher) + shardingP := newParallelStorageShards(noopHistogram, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash) for i, req := range tc.expectedUpstreamPushes { pusher.On("PushToStorage", mock.Anything, req).Return(tc.upstreamPushErrs[i]) } var actualPushErrs []error for _, req := range tc.requests { - err := shardingP.PushToStorage(context.Background(), req) + err := shardingP.ShardWriteRequest(context.Background(), req) actualPushErrs = append(actualPushErrs, err) } if len(tc.expectedErrs) > 0 { - assert.Equal(t, tc.expectedErrs, actualPushErrs) + require.Equal(t, tc.expectedErrs, actualPushErrs) } else { receivedErrs := 0 for _, err := range actualPushErrs { @@ -621,13 +629,195 @@ func TestShardingPusher(t *testing.T) { receivedErrs++ } } - assert.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs) + require.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs) } - closeErr := shardingP.close() - assert.ErrorIs(t, closeErr, tc.expectedCloseErr) + closeErr := shardingP.Stop() + require.ErrorIs(t, closeErr, tc.expectedCloseErr) pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes)) pusher.AssertExpectations(t) }) } } + +func TestBatchingQueue(t *testing.T) { + capacity := 5 + batchSize := 3 + + series1 := mockPreallocTimeseries("series_1") + series2 := mockPreallocTimeseries("series_2") + + series := []mimirpb.PreallocTimeseries{series1, series2} + + t.Run("batch not flushed because batch size is 3 and we have 2 items in the queue", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, series) + + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + case <-time.After(100 * time.Millisecond): + } + }) + + t.Run("batch flushed because batch size is 3 and we have 3 items in the queue", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, series) + + series3 := mockPreallocTimeseries("series_3") + require.NoError(t, queue.AddToBatch(context.Background(), series3)) + + select { + case batch := <-queue.Channel(): + require.Len(t, batch.WriteRequest.Timeseries, 3) + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + require.Equal(t, series3, batch.WriteRequest.Timeseries[2]) + case <-time.After(time.Second): + t.Fatal("expected batch to be flushed") + } + + // after the batch is flushed, the queue should be empty. + require.Len(t, queue.currentBatch.Timeseries, 0) + }) + + t.Run("if you close the queue with items in the queue, the queue should flush the items", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, series) + + // Channel is empty. + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + case <-time.After(100 * time.Millisecond): + } + + // Read in a separate goroutine as when we close the queue, the channel will be closed. + var batch flushableWriteRequest + go func() { + defer queue.Done() + for b := range queue.Channel() { + batch = b + queue.ErrorChannel() <- nil + } + }() + + // Close the queue, and the items should be flushed. + require.NoError(t, queue.Close()) + + require.Len(t, batch.WriteRequest.Timeseries, 2) + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + }) + + t.Run("test queue capacity", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + + // Queue channel is empty because there are only 2 items in the current currentBatch. + require.Len(t, queue.ch, 0) + require.Len(t, queue.currentBatch.Timeseries, 0) + + // Add items to the queue until it's full. + for i := 0; i < capacity*batchSize; i++ { + s := mockPreallocTimeseries(fmt.Sprintf("series_%d", i)) + require.NoError(t, queue.AddToBatch(context.Background(), s)) + } + + // We should have 5 items in the queue channel and 0 items in the currentBatch. + require.Len(t, queue.ch, 5) + require.Len(t, queue.currentBatch.Timeseries, 0) + + // Read one item to free up a queue space. + batch := <-queue.Channel() + require.Len(t, batch.WriteRequest.Timeseries, 3) + + // Queue should have 4 items now and the currentBatch remains the same. + require.Len(t, queue.ch, 4) + require.Len(t, queue.currentBatch.Timeseries, 0) + + // Add three more items to fill up the queue again, this shouldn't block. + s := mockPreallocTimeseries("series_100") + require.NoError(t, queue.AddToBatch(context.Background(), s)) + require.NoError(t, queue.AddToBatch(context.Background(), s)) + require.NoError(t, queue.AddToBatch(context.Background(), s)) + + require.Len(t, queue.ch, 5) + require.Len(t, queue.currentBatch.Timeseries, 0) + }) +} + +func TestBatchingQueue_ErrorHandling(t *testing.T) { + capacity := 2 + batchSize := 2 + series1 := mockPreallocTimeseries("series_1") + series2 := mockPreallocTimeseries("series_2") + + t.Run("AddToBatch returns all errors and it pushes the batch when the batch is filled ", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + ctx := context.Background() + + // Push 1 series so that the next push will complete the batch. + require.NoError(t, queue.AddToBatch(ctx, series2)) + + // Push an error to fill the error channel. + queue.ErrorChannel() <- fmt.Errorf("mock error 1") + queue.ErrorChannel() <- fmt.Errorf("mock error 2") + + // AddToBatch should return an error now. + err := queue.AddToBatch(ctx, series2) + assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error()) + // Also the batch was pushed. + select { + case batch := <-queue.Channel(): + require.Equal(t, series2, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + default: + t.Fatal("expected batch to be flushed") + } + + // AddToBatch should work again. + require.NoError(t, queue.AddToBatch(ctx, series2)) + require.NoError(t, queue.AddToBatch(ctx, series2)) + }) + + t.Run("Any errors pushed after last AddToBatch call are received on Close", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + ctx := context.Background() + + // Add a batch to a batch but make sure nothing is pushed., + require.NoError(t, queue.AddToBatch(ctx, series1)) + + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + default: + } + + // Push multiple errors + queue.ErrorChannel() <- fmt.Errorf("mock error 1") + queue.ErrorChannel() <- fmt.Errorf("mock error 2") + + // Close and Done on the queue. + queue.Done() + err := queue.Close() + require.Error(t, err) + assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error()) + + // Batch is also pushed. + select { + case batch := <-queue.Channel(): + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + default: + t.Fatal("expected batch to be flushed") + } + }) +} + +func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.PreallocTimeseries) *batchingQueue { + t.Helper() + + queue := newBatchingQueue(capacity, batchSize) + + for _, s := range series { + require.NoError(t, queue.AddToBatch(context.Background(), s)) + } + + return queue +} diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index c77f8119470..97c77dd9f89 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -35,7 +35,6 @@ import ( "go.opentelemetry.io/otel/propagation" "go.uber.org/atomic" - "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -60,8 +59,6 @@ type record struct { } type recordConsumer interface { - Close(context.Context) error - // Consume consumes the given records in the order they are provided. We need this as samples that will be ingested, // are also needed to be in order to avoid ingesting samples out of order. // The function is expected to be idempotent and incremental, meaning that it can be called multiple times with the same records, and it won't respond to context cancellation. @@ -103,28 +100,6 @@ func (c consumerFactoryFunc) consumer() recordConsumer { return c() } -type noopPusherCloser struct { - metrics *pusherConsumerMetrics - - Pusher -} - -func newNoopPusherCloser(metrics *pusherConsumerMetrics, pusher Pusher) noopPusherCloser { - return noopPusherCloser{ - metrics: metrics, - Pusher: pusher, - } -} - -func (c noopPusherCloser) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { - c.metrics.numTimeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) - return c.Pusher.PushToStorage(ctx, wr) -} - -func (noopPusherCloser) Close() []error { - return nil -} - func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { metrics := newPusherConsumerMetrics(reg) factory := consumerFactoryFunc(func() recordConsumer { @@ -495,10 +470,7 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche consumeCtx := context.WithoutCancel(ctx) err := consumer.Consume(consumeCtx, records) if err == nil { - err = consumer.Close(consumeCtx) - if err == nil { - break - } + break } level.Error(r.logger).Log( "msg", "encountered error while ingesting data from Kafka; should retry", diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 87ac8017a66..7a10ed018d5 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1975,11 +1975,6 @@ func newTestConsumer(capacity int) testConsumer { } } -func (t testConsumer) Close(context.Context) error { - // We don't close the channel because the same consumer can be reused to consume multiple fetches. - return nil -} - func (t testConsumer) Consume(ctx context.Context, records []record) error { for _, r := range records { select { @@ -2021,10 +2016,6 @@ func (t testConsumer) waitRecords(numRecords int, waitTimeout, drainPeriod time. type consumerFunc func(ctx context.Context, records []record) error -func (consumerFunc) Close(context.Context) error { - return nil -} - func (c consumerFunc) Consume(ctx context.Context, records []record) error { return c(ctx, records) }