diff --git a/CHANGELOG.md b/CHANGELOG.md index 2085470f507..1ef983c07fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 * [BUGFIX] Store-Gateway and AlertManager: Add a `wait_instance_time_out` to WaitInstanceState context to avoid waiting forever. #5581 +* [BUGFIX] Ingester: Allow shipper to hot reload upload compacted blocks when out of order samples is enabled. #5416 ## 1.15.1 2023-04-26 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 96d8df52f7b..7c96aa683ee 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1967,7 +1967,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { if maxExemplarsForUser > 0 { enableExemplars = true } - oooTimeWindow := i.limits.OutOfOrderTimeWindow(userID) walCompressType := wlog.CompressionNone // TODO(yeya24): expose zstd compression for WAL. if i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled { @@ -1990,7 +1989,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { MaxExemplars: maxExemplarsForUser, HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown, - OutOfOrderTimeWindow: time.Duration(oooTimeWindow).Milliseconds(), + OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(), OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, }, nil) if err != nil { @@ -2044,8 +2043,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { func() labels.Labels { return l }, metadata.ReceiveSource, func() bool { - return oooTimeWindow > 0 // Upload compacted blocks when OOO is enabled. - }, + return i.limits.OutOfOrderTimeWindow(userID) > 0 + }, // No need to upload compacted blocks unless out of order samples is enabled. true, // Allow out of order uploads. It's fine in Cortex's context. metadata.NoneFunc, ) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 132003f4ff9..78e43c9c8be 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -4247,3 +4247,55 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest return cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) } + +func TestIngesterShipperUploadCompactedHotReload(t *testing.T) { + // Create ingester. + cfg := defaultIngesterTestConfig(t) + dir := t.TempDir() + blocksDir := filepath.Join(dir, "blocks") + limits := defaultLimitsTestConfig() + reg := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, blocksDir, reg) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + uid := "user-1" + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", 1)), 1) + ctx := user.InjectOrgID(context.Background(), uid) + _, err = i.Push(ctx, req) + require.NoError(t, err) + db := i.getTSDB(uid) + _, err = db.shipper.Sync(ctx) + require.NoError(t, err) + + promReg := i.TSDBState.tsdbMetrics.regs.GetPromRegistryByUser(uid) + require.NotNil(t, promReg) + // Out of order samples disabled. Expected metric value to be 0. + expectedMetrics := ` + # HELP thanos_shipper_upload_compacted_done If 1 it means shipper uploaded all compacted blocks from the filesystem. + # TYPE thanos_shipper_upload_compacted_done gauge + thanos_shipper_upload_compacted_done 0 +` + assert.NoError(t, testutil.GatherAndCompare(promReg, strings.NewReader(expectedMetrics), "thanos_shipper_upload_compacted_done")) + + // set ooo time window in limits, and re-initialize the ingester + limits.OutOfOrderTimeWindow = model.Duration(time.Minute) + i.limits, _ = validation.NewOverrides(limits, nil) + require.NoError(t, err) + _, err = db.shipper.Sync(ctx) + require.NoError(t, err) + // Expected metric value to be 1 now as OOO time window has been set. + expectedMetrics = ` + # HELP thanos_shipper_upload_compacted_done If 1 it means shipper uploaded all compacted blocks from the filesystem. + # TYPE thanos_shipper_upload_compacted_done gauge + thanos_shipper_upload_compacted_done 1 +` + assert.NoError(t, testutil.GatherAndCompare(promReg, strings.NewReader(expectedMetrics), "thanos_shipper_upload_compacted_done")) +} diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index 532912dd945..5479b492502 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -678,6 +678,20 @@ func (r *UserRegistries) Registries() []UserRegistry { return out } +// GetPromRegistryByUser returns the Prometheus metrics registry by userID. +func (r *UserRegistries) GetPromRegistryByUser(user string) *prometheus.Registry { + r.regsMu.Lock() + defer r.regsMu.Unlock() + + for _, reg := range r.regs { + if reg.user == user { + return reg.reg + } + } + + return nil +} + func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser { data := MetricFamiliesPerUser{} for _, entry := range r.Registries() {