diff --git a/CHANGELOG.md b/CHANGELOG.md index fdb3747dcb..6cd891ece6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [ENHANCEMENT] Added `cortex_alertmanager_config_hash` metric to expose hash of Alertmanager Config loaded per user. #3388 * [ENHANCEMENT] Query-Frontend / Query-Scheduler: New component called "Query-Scheduler" has been introduced. Query-Scheduler is simply a queue of requests, moved outside of Query-Frontend. This allows Query-Frontend to be scaled separately from number of queues. To make Query-Frontend and Querier use Query-Scheduler, they need to be started with `-frontend.scheduler-address` and `-querier.scheduler-address` options respectively. #3374 * [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423 +* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422 ## 1.5.0 in progress diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 49ca96438a..8a8adf1c55 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -54,6 +54,11 @@ type userTSDB struct { seriesInMetric *metricCounter limiter *Limiter + forcedCompactionInProgressMtx sync.RWMutex + forcedCompactionInProgress bool + + pushesInFlight sync.WaitGroup + // Used to detect idle TSDBs. lastUpdate *atomic.Int64 @@ -93,6 +98,18 @@ func (u *userTSDB) Compact() error { // compactHead compacts the Head block at specified block durations avoiding a single huge block. func (u *userTSDB) compactHead(blockDuration int64) error { + u.forcedCompactionInProgressMtx.Lock() + u.forcedCompactionInProgress = true + u.forcedCompactionInProgressMtx.Unlock() + defer func() { + u.forcedCompactionInProgressMtx.Lock() + u.forcedCompactionInProgress = false + u.forcedCompactionInProgressMtx.Unlock() + }() + // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks. + // So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over. + u.pushesInFlight.Wait() + h := u.Head() minTime, maxTime := h.MinTime(), h.MaxTime() @@ -501,6 +518,11 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien } i.userStatesMtx.RUnlock() + if err := db.acquireAppendLock(); err != nil { + return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(err, userID).Error()) + } + defer db.releaseAppendLock() + // Given metadata is a best-effort approach, and we don't halt on errors // process it before samples. Otherwise, we risk returning an error before ingestion. i.pushMetadata(ctx, userID, req.GetMetadata()) @@ -651,6 +673,22 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien return &client.WriteResponse{}, nil } +func (u *userTSDB) acquireAppendLock() error { + u.forcedCompactionInProgressMtx.RLock() + defer u.forcedCompactionInProgressMtx.RUnlock() + + if u.forcedCompactionInProgress { + return errors.New("forced compaction in progress") + } + + u.pushesInFlight.Add(1) + return nil +} + +func (u *userTSDB) releaseAppendLock() { + u.pushesInFlight.Done() +} + func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 464991d94c..647070b524 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1765,7 +1765,7 @@ func TestIngester_shipBlocks(t *testing.T) { defer cleanup() // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -1953,7 +1953,7 @@ func TestIngester_flushing(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -1983,7 +1983,7 @@ func TestIngester_ForFlush(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2162,7 +2162,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2321,7 +2321,7 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2358,7 +2358,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { }) // Wait until it's ACTIVE - test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { return i.lifecycler.GetState() }) @@ -2422,3 +2422,100 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { require.NotEqual(t, b.Meta().ULID, newBlocks2[1].Meta().ULID) } } + +func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push a sample, it should succeed. + pushSingleSample(t, i) + + // We mock a flushing by setting the boolean. + db := i.getTSDB(userID) + require.NotNil(t, db) + db.forcedCompactionInProgressMtx.Lock() + require.False(t, db.forcedCompactionInProgress) + db.forcedCompactionInProgress = true + db.forcedCompactionInProgressMtx.Unlock() + + // Ingestion should fail with a 503. + req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) + ctx := user.InjectOrgID(context.Background(), userID) + _, err = i.v2Push(ctx, req) + require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("forced compaction in progress"), userID).Error()), err) + + // Ingestion is successful after a flush. + db.forcedCompactionInProgressMtx.Lock() + require.True(t, db.forcedCompactionInProgress) + db.forcedCompactionInProgress = false + db.forcedCompactionInProgressMtx.Unlock() + pushSingleSample(t, i) +} + +func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { + registry := prometheus.NewRegistry() + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), registry) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push few samples. + for j := 0; j < 5; j++ { + pushSingleSample(t, i) + } + + // Verifying that compaction won't happen when a request is in flight. + + // This mocks a request in flight. + db := i.getTSDB(userID) + require.NoError(t, db.acquireAppendLock()) + + // Flush handler only triggers compactions, but doesn't wait for them to finish. + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) + + // Flushing should not have succeeded even after 5 seconds. + time.Sleep(5 * time.Second) + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed. + # TYPE cortex_ingester_tsdb_compactions_total counter + cortex_ingester_tsdb_compactions_total 0 + `), "cortex_ingester_tsdb_compactions_total")) + + // No requests in flight after this. + db.releaseAppendLock() + + // Let's wait until all head series have been flushed. + test.Poll(t, 5*time.Second, uint64(0), func() interface{} { + db := i.getTSDB(userID) + if db == nil { + return false + } + return db.Head().NumSeries() + }) + + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed. + # TYPE cortex_ingester_tsdb_compactions_total counter + cortex_ingester_tsdb_compactions_total 1 + `), "cortex_ingester_tsdb_compactions_total")) +}