Skip to content

Commit

Permalink
Pairing feedback
Browse files Browse the repository at this point in the history
Signed-off-by: gotjosh <josue.abreu@gmail.com>
  • Loading branch information
gotjosh committed Sep 19, 2024
1 parent 99ea756 commit a55e417
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 30 deletions.
72 changes: 46 additions & 26 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *

// TODO: Add metrics to measure how long are items sitting in the queue before they are flushed.
if err := p.shards[shard].AddToBatch(ctx, ts); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
if !mimirpb.IsClientError(err) {
return err
}

errs.Add(err)
}
}
Expand Down Expand Up @@ -456,27 +462,23 @@ type batchingQueue struct {
func newBatchingQueue(capacity int, batchSize int) *batchingQueue {
return &batchingQueue{
ch: make(chan flushableWriteRequest, capacity),
errCh: make(chan error, 1),
errCh: make(chan error, capacity),
done: make(chan struct{}),
currentBatch: flushableWriteRequest{WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}},
batchSize: batchSize,
}
}

// AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel().
// If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed.
func (q *batchingQueue) AddToBatch(ctx context.Context, ts mimirpb.PreallocTimeseries) error {
s := &q.currentBatch
s.Timeseries = append(s.Timeseries, ts)
s.Context = ctx
q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts)
q.currentBatch.Context = ctx

if len(s.Timeseries) >= q.batchSize {
if err := q.push(*s); err != nil {
if len(q.currentBatch.Timeseries) >= q.batchSize {
if err := q.push(); err != nil {
return err
}

q.currentBatch = flushableWriteRequest{
WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()},
}
}

return nil
Expand All @@ -485,23 +487,19 @@ func (q *batchingQueue) AddToBatch(ctx context.Context, ts mimirpb.PreallocTimes
// Close closes the batchingQueue, it'll push the current branch to the channel if it's not empty.
// and then close the channel.
func (q *batchingQueue) Close() error {
var errs multierror.MultiError
if len(q.currentBatch.Timeseries) > 0 {
if err := q.push(q.currentBatch); err != nil {
return err
if err := q.push(); err != nil {
errs.Add(err)
}
}

close(q.ch)
<-q.done

select {
case err := <-q.errCh:
close(q.errCh)
return err
default:
close(q.errCh)
return nil
}
errs = append(errs, q.collectErrors()...)
close(q.errCh)
return errs.Err()
}

// Channel returns the channel where the batches are pushed.
Expand All @@ -520,11 +518,33 @@ func (q *batchingQueue) Done() {
close(q.done)
}

func (q *batchingQueue) push(fwr flushableWriteRequest) error {
select {
case q.ch <- fwr:
return nil
case err := <-q.errCh:
return err
// push pushes the current batch to the channel and resets the current batch.
// It also collects any errors that might have occurred while pushing the batch.
func (q *batchingQueue) push() error {
errs := q.collectErrors()

q.ch <- q.currentBatch
q.resetCurrentBatch()

return errs.Err()
}

// resetCurrentBatch resets the current batch to an empty state.
func (q *batchingQueue) resetCurrentBatch() {
q.currentBatch = flushableWriteRequest{
WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()},
}
}

func (q *batchingQueue) collectErrors() multierror.MultiError {
var errs multierror.MultiError

for {
select {
case err := <-q.errCh:
errs.Add(err)
default:
return errs
}
}
}
82 changes: 78 additions & 4 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {

{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}},

{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}},
},
expectedErrsCount: 1, // at least one of those should fail because the first flush failed

Expand All @@ -582,8 +585,12 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
mockPreallocTimeseries("series_3_8"),
mockPreallocTimeseries("series_3_8"),
}},
{Timeseries: []mimirpb.PreallocTimeseries{
mockPreallocTimeseries("series_3_8"),
mockPreallocTimeseries("series_3_8"),
}},
},
upstreamPushErrs: []error{assert.AnError, nil, nil},
upstreamPushErrs: []error{assert.AnError, nil, nil, nil},
expectedCloseErr: nil,
},
}
Expand Down Expand Up @@ -614,19 +621,19 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
}

if len(tc.expectedErrs) > 0 {
assert.Equal(t, tc.expectedErrs, actualPushErrs)
require.Equal(t, tc.expectedErrs, actualPushErrs)
} else {
receivedErrs := 0
for _, err := range actualPushErrs {
if err != nil {
receivedErrs++
}
}
assert.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs)
require.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs)
}

closeErr := shardingP.Stop()
assert.ErrorIs(t, closeErr, tc.expectedCloseErr)
require.ErrorIs(t, closeErr, tc.expectedCloseErr)
pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes))
pusher.AssertExpectations(t)
})
Expand Down Expand Up @@ -736,6 +743,73 @@ func TestBatchingQueue(t *testing.T) {
})
}

func TestBatchingQueue_ErrorHandling(t *testing.T) {
capacity := 2
batchSize := 2
series1 := mockPreallocTimeseries("series_1")
series2 := mockPreallocTimeseries("series_2")

t.Run("AddToBatch returns all errors and it pushes the batch when the batch is filled ", func(t *testing.T) {
queue := setupQueue(t, capacity, batchSize, nil)
ctx := context.Background()

// Push 1 series so that the next push will complete the batch.
require.NoError(t, queue.AddToBatch(ctx, series2))

// Push an error to fill the error channel.
queue.ErrorChannel() <- fmt.Errorf("mock error 1")
queue.ErrorChannel() <- fmt.Errorf("mock error 2")

// AddToBatch should return an error now.
err := queue.AddToBatch(ctx, series2)
assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error())
// Also the batch was pushed.
select {
case batch := <-queue.Channel():
require.Equal(t, series2, batch.WriteRequest.Timeseries[0])
require.Equal(t, series2, batch.WriteRequest.Timeseries[1])
default:
t.Fatal("expected batch to be flushed")
}

// AddToBatch should work again.
require.NoError(t, queue.AddToBatch(ctx, series2))
require.NoError(t, queue.AddToBatch(ctx, series2))
})

t.Run("Any errors pushed after last AddToBatch call are received on Close", func(t *testing.T) {
queue := setupQueue(t, capacity, batchSize, nil)
ctx := context.Background()

// Add a batch to a batch but make sure nothing is pushed.,
require.NoError(t, queue.AddToBatch(ctx, series1))

select {
case <-queue.Channel():
t.Fatal("expected batch to not be flushed")
default:
}

// Push multiple errors
queue.ErrorChannel() <- fmt.Errorf("mock error 1")
queue.ErrorChannel() <- fmt.Errorf("mock error 2")

// Close and Done on the queue.
queue.Done()
err := queue.Close()
require.Error(t, err)
assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error())

// Batch is also pushed.
select {
case batch := <-queue.Channel():
require.Equal(t, series1, batch.WriteRequest.Timeseries[0])
default:
t.Fatal("expected batch to be flushed")
}
})
}

func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.PreallocTimeseries) *batchingQueue {
t.Helper()

Expand Down

0 comments on commit a55e417

Please sign in to comment.