diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dc3f19d7cbd3..f2bcf495be4d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Release notes for 2.4.1 can be found on the [release notes page](https://grafana * [4687](https://github.com/grafana/loki/pull/4687) **owen-d**: overrides checks for nil tenant limits on AllByUserID * [4683](https://github.com/grafana/loki/pull/4683) **owen-d**: Adds replication_factor doc to common config * [4681](https://github.com/grafana/loki/pull/4681) **slim-bean**: Loki: check new Read target when initializing boltdb-shipper store +* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction # 2.4.0 (2021/11/05) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 11b70be9de4e5..a9f90cf7d76dd 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -57,6 +57,7 @@ type Config struct { SharedStoreType string `yaml:"shared_store"` SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` CompactionInterval time.Duration `yaml:"compaction_interval"` + ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"` RetentionEnabled bool `yaml:"retention_enabled"` RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` @@ -71,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") + f.DurationVar(&cfg.ApplyRetentionInterval, "boltdb.shipper.compactor.apply-retention-interval", 0, "Interval at which to apply/enforce retention. 0 means run at same interval as compaction. If non-zero, it should always be a multiple of compaction interval.") f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.") f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") @@ -84,6 +86,10 @@ func (cfg *Config) Validate() error { if cfg.MaxCompactionParallelism < 1 { return errors.New("max compaction parallelism must be >= 1") } + if cfg.RetentionEnabled && cfg.ApplyRetentionInterval != 0 && cfg.ApplyRetentionInterval%cfg.CompactionInterval != 0 { + return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval") + } + return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) } @@ -338,12 +344,24 @@ func (c *Compactor) runCompactions(ctx context.Context) { break } + lastRetentionRunAt := time.Unix(0, 0) runCompaction := func() { - err := c.RunCompaction(ctx) + applyRetention := false + if c.cfg.RetentionEnabled && time.Since(lastRetentionRunAt) >= c.cfg.ApplyRetentionInterval { + level.Info(util_log.Logger).Log("msg", "applying retention with compaction") + applyRetention = true + } + + err := c.RunCompaction(ctx, applyRetention) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) } + + if applyRetention { + lastRetentionRunAt = time.Now() + } } + c.wg.Add(1) go func() { defer c.wg.Done() @@ -380,7 +398,7 @@ func (c *Compactor) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), c.subservices) } -func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { +func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error { table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err) @@ -389,7 +407,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { interval := retention.ExtractIntervalFromTableName(tableName) intervalMayHaveExpiredChunks := false - if c.cfg.RetentionEnabled { + if c.cfg.RetentionEnabled && applyRetention { intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval) } @@ -401,7 +419,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { return nil } -func (c *Compactor) RunCompaction(ctx context.Context) error { +func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error { status := statusSuccess start := time.Now() @@ -415,6 +433,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { if status == statusSuccess { c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds()) c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime() + if applyRetention { + c.metrics.applyRetentionLastSuccess.SetToCurrentTime() + } } if c.cfg.RetentionEnabled { @@ -453,7 +474,7 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { } level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName) - err = c.CompactTable(ctx, tableName) + err = c.CompactTable(ctx, tableName, applyRetention) if err != nil { return } diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index b10bd78fbb741..725d7a6b615d0 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -95,7 +95,7 @@ func TestCompactor_RunCompaction(t *testing.T) { } compactor := setupTestCompactor(t, tempDir) - err = compactor.RunCompaction(context.Background()) + err = compactor.RunCompaction(context.Background(), true) require.NoError(t, err) for name := range tables { diff --git a/pkg/storage/stores/shipper/compactor/metrics.go b/pkg/storage/stores/shipper/compactor/metrics.go index 94d2f1ca2822b..b81ae2ab51da4 100644 --- a/pkg/storage/stores/shipper/compactor/metrics.go +++ b/pkg/storage/stores/shipper/compactor/metrics.go @@ -14,6 +14,7 @@ type metrics struct { compactTablesOperationTotal *prometheus.CounterVec compactTablesOperationDurationSeconds prometheus.Gauge compactTablesOperationLastSuccess prometheus.Gauge + applyRetentionLastSuccess prometheus.Gauge compactorRunning prometheus.Gauge } @@ -34,6 +35,11 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "compact_tables_operation_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run", }), + applyRetentionLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "apply_retention_last_successful_run_timestamp_seconds", + Help: "Unix timestamp of the last successful retention run", + }), compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: "loki_boltdb_shipper", Name: "compactor_running",