From 68126abb83d8b5a0eebcfd01238f8f2a364c8df7 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 29 Oct 2020 18:49:35 +0530 Subject: [PATCH 1/6] Fail ingestion when a flush is going on Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 1 + pkg/ingester/ingester_v2.go | 15 +++++++++++++ pkg/ingester/ingester_v2_test.go | 36 ++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2372b4034..7f38e5e9c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - `cortex_ingester_tsdb_head_truncations_total` - `cortex_ingester_tsdb_head_gc_duration_seconds` * [ENHANCEMENT] Added `cortex_alertmanager_config_hash` metric to expose hash of Alertmanager Config loaded per user. #3388 +* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422 ## 1.5.0 in progress diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 49ca96438a..1fa03b053f 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -54,6 +54,9 @@ type userTSDB struct { seriesInMetric *metricCounter limiter *Limiter + flushInProgress atomic.Bool + pushesInFlight sync.WaitGroup + // Used to detect idle TSDBs. lastUpdate *atomic.Int64 @@ -501,6 +504,12 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien } i.userStatesMtx.RUnlock() + if db.flushInProgress.Load() { + return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("flush in progress"), userID).Error()) + } + db.pushesInFlight.Add(1) + defer db.pushesInFlight.Done() + // Given metadata is a best-effort approach, and we don't halt on errors // process it before samples. Otherwise, we risk returning an error before ingestion. i.pushMetadata(ctx, userID, req.GetMetadata()) @@ -1361,6 +1370,12 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { switch { case force: reason = "forced" + + userDB.flushInProgress.Store(true) + defer userDB.flushInProgress.Store(false) + // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks. + // So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over. + userDB.pushesInFlight.Wait() err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds()) case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout): diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 464991d94c..f796745e3a 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2422,3 +2422,39 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { require.NotEqual(t, b.Meta().ULID, newBlocks2[1].Meta().ULID) } } + +func TestIngesterPushErrorDuringFlush(t *testing.T) { + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push a sample, it should succeed. + pushSingleSample(t, i) + + // We mock a flushing by setting the boolean. + db := i.getTSDB(userID) + require.NotNil(t, db) + require.False(t, db.flushInProgress.Load()) + db.flushInProgress.Store(true) + + // Ingestion should fail with a 503. + req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) + ctx := user.InjectOrgID(context.Background(), userID) + _, err = i.v2Push(ctx, req) + require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("flush in progress"), userID).Error()), err) + + // Ingestion is successful after a flush. + require.True(t, db.flushInProgress.Load()) + db.flushInProgress.Store(false) + pushSingleSample(t, i) +} From 484ebd3a32d2cba4963a739be00ef8323e8e82d8 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 30 Oct 2020 15:01:26 +0530 Subject: [PATCH 2/6] Fix the race Signed-off-by: Ganesh Vernekar --- pkg/ingester/ingester_v2.go | 44 +++++++++++++++++++++++--------- pkg/ingester/ingester_v2_test.go | 12 ++++----- 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 1fa03b053f..46f98945a2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -54,8 +54,9 @@ type userTSDB struct { seriesInMetric *metricCounter limiter *Limiter - flushInProgress atomic.Bool - pushesInFlight sync.WaitGroup + forcedCompactionInProgress bool + forcedCompactionInProgressMtx sync.RWMutex + pushesInFlight sync.WaitGroup // Used to detect idle TSDBs. lastUpdate *atomic.Int64 @@ -96,6 +97,16 @@ func (u *userTSDB) Compact() error { // compactHead compacts the Head block at specified block durations avoiding a single huge block. func (u *userTSDB) compactHead(blockDuration int64) error { + u.forcedCompactionInProgressMtx.Lock() + u.forcedCompactionInProgress = true + u.forcedCompactionInProgressMtx.Unlock() + defer func() { + u.forcedCompactionInProgress = false + }() + // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks. + // So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over. + u.pushesInFlight.Wait() + h := u.Head() minTime, maxTime := h.MinTime(), h.MaxTime() @@ -504,11 +515,10 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien } i.userStatesMtx.RUnlock() - if db.flushInProgress.Load() { - return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("flush in progress"), userID).Error()) + if err := db.acquireAppendLock(); err != nil { + return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(err, userID).Error()) } - db.pushesInFlight.Add(1) - defer db.pushesInFlight.Done() + defer db.releaseAppendLock() // Given metadata is a best-effort approach, and we don't halt on errors // process it before samples. Otherwise, we risk returning an error before ingestion. @@ -660,6 +670,22 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien return &client.WriteResponse{}, nil } +func (u *userTSDB) acquireAppendLock() error { + u.forcedCompactionInProgressMtx.RLock() + defer u.forcedCompactionInProgressMtx.RUnlock() + + if u.forcedCompactionInProgress { + return errors.New("forced compaction in progress") + } + + u.pushesInFlight.Add(1) + return nil +} + +func (u *userTSDB) releaseAppendLock() { + u.pushesInFlight.Done() +} + func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { @@ -1370,12 +1396,6 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { switch { case force: reason = "forced" - - userDB.flushInProgress.Store(true) - defer userDB.flushInProgress.Store(false) - // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks. - // So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over. - userDB.pushesInFlight.Wait() err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds()) case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout): diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index f796745e3a..f6ed5a8115 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2423,7 +2423,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { } } -func TestIngesterPushErrorDuringFlush(t *testing.T) { +func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) require.NoError(t, err) t.Cleanup(cleanup) @@ -2444,17 +2444,17 @@ func TestIngesterPushErrorDuringFlush(t *testing.T) { // We mock a flushing by setting the boolean. db := i.getTSDB(userID) require.NotNil(t, db) - require.False(t, db.flushInProgress.Load()) - db.flushInProgress.Store(true) + require.False(t, db.forcedCompactionInProgress) + db.forcedCompactionInProgress = true // Ingestion should fail with a 503. req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) ctx := user.InjectOrgID(context.Background(), userID) _, err = i.v2Push(ctx, req) - require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("flush in progress"), userID).Error()), err) + require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("forced compaction in progress"), userID).Error()), err) // Ingestion is successful after a flush. - require.True(t, db.flushInProgress.Load()) - db.flushInProgress.Store(false) + require.True(t, db.forcedCompactionInProgress) + db.forcedCompactionInProgress = false pushSingleSample(t, i) } From 84b030964584b5fd1d54885567a66dfa9c685ee8 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 30 Oct 2020 15:32:39 +0530 Subject: [PATCH 3/6] Fix review comments Signed-off-by: Ganesh Vernekar --- pkg/ingester/ingester_v2.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 46f98945a2..6287466e93 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -101,7 +101,9 @@ func (u *userTSDB) compactHead(blockDuration int64) error { u.forcedCompactionInProgress = true u.forcedCompactionInProgressMtx.Unlock() defer func() { + u.forcedCompactionInProgressMtx.Lock() u.forcedCompactionInProgress = false + u.forcedCompactionInProgressMtx.Unlock() }() // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks. // So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over. From 70958ebd281bf8226478f3ea9597c94042392425 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 3 Nov 2020 12:09:08 +0530 Subject: [PATCH 4/6] Fix review comments and add another test Signed-off-by: Ganesh Vernekar --- pkg/ingester/ingester_v2.go | 5 +-- pkg/ingester/ingester_v2_test.go | 57 ++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 6287466e93..8a8adf1c55 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -54,9 +54,10 @@ type userTSDB struct { seriesInMetric *metricCounter limiter *Limiter - forcedCompactionInProgress bool forcedCompactionInProgressMtx sync.RWMutex - pushesInFlight sync.WaitGroup + forcedCompactionInProgress bool + + pushesInFlight sync.WaitGroup // Used to detect idle TSDBs. lastUpdate *atomic.Int64 diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index f6ed5a8115..871c575f7b 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2458,3 +2458,60 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { db.forcedCompactionInProgress = false pushSingleSample(t, i) } + +func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { + registry := prometheus.NewRegistry() + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), registry) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push few samples. + for j := 0; j < 5; j++ { + pushSingleSample(t, i) + } + + // Verifying that compaction won't happen when a request is in flight. + + // This mocks a request in flight. + db := i.getTSDB(userID) + require.NoError(t, db.acquireAppendLock()) + + // Flush handler only triggers compactions, but doesn't wait for them to finish. + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) + + // Flushing should not have succeeded even after 5 seconds. + time.Sleep(5 * time.Second) + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed. + # TYPE cortex_ingester_tsdb_compactions_total counter + cortex_ingester_tsdb_compactions_total 0 + `), "cortex_ingester_tsdb_compactions_total")) + + // No requests in flight after this. + db.releaseAppendLock() + + // Let's wait for a moment for flush to finish, and then verify. + test.Poll(t, 5*time.Second, uint64(0), func() interface{} { + db := i.getTSDB(userID) + if db == nil { + return false + } + return db.Head().NumSeries() + }) + + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed. + # TYPE cortex_ingester_tsdb_compactions_total counter + cortex_ingester_tsdb_compactions_total 1 + `), "cortex_ingester_tsdb_compactions_total")) +} From 34a94ae68de17f8057b3eb09c0e5d6bd9b3d3731 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 3 Nov 2020 13:28:30 +0530 Subject: [PATCH 5/6] Fix review comments Signed-off-by: Ganesh Vernekar --- pkg/ingester/ingester_v2_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 871c575f7b..01f4c3724f 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1765,7 +1765,7 @@ func TestIngester_shipBlocks(t *testing.T) { defer cleanup() // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -1953,7 +1953,7 @@ func TestIngester_flushing(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -1983,7 +1983,7 @@ func TestIngester_ForFlush(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2162,7 +2162,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2321,7 +2321,7 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2358,7 +2358,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2434,7 +2434,7 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2471,7 +2471,7 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2500,7 +2500,7 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { // No requests in flight after this. db.releaseAppendLock() - // Let's wait for a moment for flush to finish, and then verify. + // Let's wait until all head series have been flushed. test.Poll(t, 5*time.Second, uint64(0), func() interface{} { db := i.getTSDB(userID) if db == nil { From 17131df1db002ed7e964354561267b76642c19ef Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 3 Nov 2020 13:40:27 +0530 Subject: [PATCH 6/6] Fix more review comments Signed-off-by: Ganesh Vernekar --- pkg/ingester/ingester_v2_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 01f4c3724f..647070b524 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2444,8 +2444,10 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { // We mock a flushing by setting the boolean. db := i.getTSDB(userID) require.NotNil(t, db) + db.forcedCompactionInProgressMtx.Lock() require.False(t, db.forcedCompactionInProgress) db.forcedCompactionInProgress = true + db.forcedCompactionInProgressMtx.Unlock() // Ingestion should fail with a 503. req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) @@ -2454,8 +2456,10 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("forced compaction in progress"), userID).Error()), err) // Ingestion is successful after a flush. + db.forcedCompactionInProgressMtx.Lock() require.True(t, db.forcedCompactionInProgress) db.forcedCompactionInProgress = false + db.forcedCompactionInProgressMtx.Unlock() pushSingleSample(t, i) }