Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow applying retention at different interval than compaction with a config #4736

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Release notes for 2.4.1 can be found on the [release notes page](https://grafana
* [4687](https://github.com/grafana/loki/pull/4687) **owen-d**: overrides checks for nil tenant limits on AllByUserID
* [4683](https://github.com/grafana/loki/pull/4683) **owen-d**: Adds replication_factor doc to common config
* [4681](https://github.com/grafana/loki/pull/4681) **slim-bean**: Loki: check new Read target when initializing boltdb-shipper store
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction

# 2.4.0 (2021/11/05)

Expand Down
31 changes: 26 additions & 5 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Config struct {
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
Expand All @@ -71,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.DurationVar(&cfg.ApplyRetentionInterval, "boltdb.shipper.compactor.apply-retention-interval", 0, "Interval at which to apply/enforce retention. 0 means run at same interval as compaction. If non-zero, it should always be a multiple of compaction interval.")
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.")
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
Expand All @@ -84,6 +86,10 @@ func (cfg *Config) Validate() error {
if cfg.MaxCompactionParallelism < 1 {
return errors.New("max compaction parallelism must be >= 1")
}
if cfg.RetentionEnabled && cfg.ApplyRetentionInterval != 0 && cfg.ApplyRetentionInterval%cfg.CompactionInterval != 0 {
return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
}

return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

Expand Down Expand Up @@ -338,12 +344,24 @@ func (c *Compactor) runCompactions(ctx context.Context) {
break
}

lastRetentionRunAt := time.Unix(0, 0)
runCompaction := func() {
err := c.RunCompaction(ctx)
applyRetention := false
if c.cfg.RetentionEnabled && time.Since(lastRetentionRunAt) >= c.cfg.ApplyRetentionInterval {
level.Info(util_log.Logger).Log("msg", "applying retention with compaction")
applyRetention = true
}

err := c.RunCompaction(ctx, applyRetention)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err)
}

if applyRetention {
lastRetentionRunAt = time.Now()
}
}

c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -380,7 +398,7 @@ func (c *Compactor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), c.subservices)
}

func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error {
table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err)
Expand All @@ -389,7 +407,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {

interval := retention.ExtractIntervalFromTableName(tableName)
intervalMayHaveExpiredChunks := false
if c.cfg.RetentionEnabled {
if c.cfg.RetentionEnabled && applyRetention {
intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval)
}

Expand All @@ -401,7 +419,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
return nil
}

func (c *Compactor) RunCompaction(ctx context.Context) error {
func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error {
status := statusSuccess
start := time.Now()

Expand All @@ -415,6 +433,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
if status == statusSuccess {
c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds())
c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
if applyRetention {
c.metrics.applyRetentionLastSuccess.SetToCurrentTime()
}
}

if c.cfg.RetentionEnabled {
Expand Down Expand Up @@ -453,7 +474,7 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
}

level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName)
err = c.CompactTable(ctx, tableName)
err = c.CompactTable(ctx, tableName, applyRetention)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestCompactor_RunCompaction(t *testing.T) {
}

compactor := setupTestCompactor(t, tempDir)
err = compactor.RunCompaction(context.Background())
err = compactor.RunCompaction(context.Background(), true)
require.NoError(t, err)

for name := range tables {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/stores/shipper/compactor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type metrics struct {
compactTablesOperationTotal *prometheus.CounterVec
compactTablesOperationDurationSeconds prometheus.Gauge
compactTablesOperationLastSuccess prometheus.Gauge
applyRetentionLastSuccess prometheus.Gauge
compactorRunning prometheus.Gauge
}

Expand All @@ -34,6 +35,11 @@ func newMetrics(r prometheus.Registerer) *metrics {
Name: "compact_tables_operation_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful compaction run",
}),
applyRetentionLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "apply_retention_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful retention run",
}),
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "compactor_running",
Expand Down