diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 6cfa0fec0f..2739516194 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -392,6 +392,12 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request * // TODO: Add metrics to measure how long are items sitting in the queue before they are flushed. if err := p.shards[shard].AddToBatch(ctx, ts); err != nil { + // TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error. + // We'll do that in the next PR as otherwise it's too many changes right now. + if !mimirpb.IsClientError(err) { + return err + } + errs.Add(err) } } @@ -456,7 +462,7 @@ type batchingQueue struct { func newBatchingQueue(capacity int, batchSize int) *batchingQueue { return &batchingQueue{ ch: make(chan flushableWriteRequest, capacity), - errCh: make(chan error, 1), + errCh: make(chan error, capacity), done: make(chan struct{}), currentBatch: flushableWriteRequest{WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}}, batchSize: batchSize, @@ -464,19 +470,15 @@ func newBatchingQueue(capacity int, batchSize int) *batchingQueue { } // AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel(). +// If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed. func (q *batchingQueue) AddToBatch(ctx context.Context, ts mimirpb.PreallocTimeseries) error { - s := &q.currentBatch - s.Timeseries = append(s.Timeseries, ts) - s.Context = ctx + q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts) + q.currentBatch.Context = ctx - if len(s.Timeseries) >= q.batchSize { - if err := q.push(*s); err != nil { + if len(q.currentBatch.Timeseries) >= q.batchSize { + if err := q.push(); err != nil { return err } - - q.currentBatch = flushableWriteRequest{ - WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}, - } } return nil @@ -485,23 +487,19 @@ func (q *batchingQueue) AddToBatch(ctx context.Context, ts mimirpb.PreallocTimes // Close closes the batchingQueue, it'll push the current branch to the channel if it's not empty. // and then close the channel. func (q *batchingQueue) Close() error { + var errs multierror.MultiError if len(q.currentBatch.Timeseries) > 0 { - if err := q.push(q.currentBatch); err != nil { - return err + if err := q.push(); err != nil { + errs.Add(err) } } close(q.ch) <-q.done - select { - case err := <-q.errCh: - close(q.errCh) - return err - default: - close(q.errCh) - return nil - } + errs = append(errs, q.collectErrors()...) + close(q.errCh) + return errs.Err() } // Channel returns the channel where the batches are pushed. @@ -520,11 +518,33 @@ func (q *batchingQueue) Done() { close(q.done) } -func (q *batchingQueue) push(fwr flushableWriteRequest) error { - select { - case q.ch <- fwr: - return nil - case err := <-q.errCh: - return err +// push pushes the current batch to the channel and resets the current batch. +// It also collects any errors that might have occurred while pushing the batch. +func (q *batchingQueue) push() error { + errs := q.collectErrors() + + q.ch <- q.currentBatch + q.resetCurrentBatch() + + return errs.Err() +} + +// resetCurrentBatch resets the current batch to an empty state. +func (q *batchingQueue) resetCurrentBatch() { + q.currentBatch = flushableWriteRequest{ + WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}, + } +} + +func (q *batchingQueue) collectErrors() multierror.MultiError { + var errs multierror.MultiError + + for { + select { + case err := <-q.errCh: + errs.Add(err) + default: + return errs + } } } diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 9ade90400b..fabd5da386 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -566,6 +566,9 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, }, expectedErrsCount: 1, // at least one of those should fail because the first flush failed @@ -582,8 +585,12 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { mockPreallocTimeseries("series_3_8"), mockPreallocTimeseries("series_3_8"), }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_8"), + mockPreallocTimeseries("series_3_8"), + }}, }, - upstreamPushErrs: []error{assert.AnError, nil, nil}, + upstreamPushErrs: []error{assert.AnError, nil, nil, nil}, expectedCloseErr: nil, }, } @@ -614,7 +621,7 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { } if len(tc.expectedErrs) > 0 { - assert.Equal(t, tc.expectedErrs, actualPushErrs) + require.Equal(t, tc.expectedErrs, actualPushErrs) } else { receivedErrs := 0 for _, err := range actualPushErrs { @@ -622,11 +629,11 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { receivedErrs++ } } - assert.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs) + require.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs) } closeErr := shardingP.Stop() - assert.ErrorIs(t, closeErr, tc.expectedCloseErr) + require.ErrorIs(t, closeErr, tc.expectedCloseErr) pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes)) pusher.AssertExpectations(t) }) @@ -736,6 +743,73 @@ func TestBatchingQueue(t *testing.T) { }) } +func TestBatchingQueue_ErrorHandling(t *testing.T) { + capacity := 2 + batchSize := 2 + series1 := mockPreallocTimeseries("series_1") + series2 := mockPreallocTimeseries("series_2") + + t.Run("AddToBatch returns all errors and it pushes the batch when the batch is filled ", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + ctx := context.Background() + + // Push 1 series so that the next push will complete the batch. + require.NoError(t, queue.AddToBatch(ctx, series2)) + + // Push an error to fill the error channel. + queue.ErrorChannel() <- fmt.Errorf("mock error 1") + queue.ErrorChannel() <- fmt.Errorf("mock error 2") + + // AddToBatch should return an error now. + err := queue.AddToBatch(ctx, series2) + assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error()) + // Also the batch was pushed. + select { + case batch := <-queue.Channel(): + require.Equal(t, series2, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + default: + t.Fatal("expected batch to be flushed") + } + + // AddToBatch should work again. + require.NoError(t, queue.AddToBatch(ctx, series2)) + require.NoError(t, queue.AddToBatch(ctx, series2)) + }) + + t.Run("Any errors pushed after last AddToBatch call are received on Close", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + ctx := context.Background() + + // Add a batch to a batch but make sure nothing is pushed., + require.NoError(t, queue.AddToBatch(ctx, series1)) + + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + default: + } + + // Push multiple errors + queue.ErrorChannel() <- fmt.Errorf("mock error 1") + queue.ErrorChannel() <- fmt.Errorf("mock error 2") + + // Close and Done on the queue. + queue.Done() + err := queue.Close() + require.Error(t, err) + assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error()) + + // Batch is also pushed. + select { + case batch := <-queue.Channel(): + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + default: + t.Fatal("expected batch to be flushed") + } + }) +} + func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.PreallocTimeseries) *batchingQueue { t.Helper()