From dd582e6a8770bf88e74fbfd7bad3b4da7dc48182 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 11:04:53 +0200 Subject: [PATCH 1/9] Trigger compaction prior retention. This should allow to process table where the amount of index files is below 4. Currently they are not processed and we should process them since they may never be compacted. Signed-off-by: Cyril Tovena --- .../stores/shipper/compactor/compactor.go | 65 +++++++++++++------ .../shipper/compactor/retention/retention.go | 62 +++++++++++------- .../compactor/retention/retention_test.go | 2 +- pkg/storage/stores/shipper/compactor/table.go | 51 ++++++++------- 4 files changed, 110 insertions(+), 70 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index f7a7b5dd94515..a8a97bcdac1ab 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -67,8 +67,8 @@ type Compactor struct { objectClient chunk.ObjectClient tableMarker *retention.Marker sweeper *retention.Sweeper - - metrics *metrics + mutex sync.Mutex + metrics *metrics } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { @@ -93,20 +93,20 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st if err != nil { return nil, err } - marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, prefixedClient, retention.NewExpirationChecker(limits), r) - if err != nil { - return nil, err - } - compactor := Compactor{ + + compactor := &Compactor{ cfg: cfg, objectClient: prefixedClient, metrics: newMetrics(r), - tableMarker: marker, sweeper: sweeper, } - + marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, prefixedClient, retention.NewExpirationChecker(limits), compactor, r) + if err != nil { + return nil, err + } + compactor.tableMarker = marker compactor.Service = services.NewBasicService(nil, compactor.loop, nil) - return &compactor, nil + return compactor, nil } func (c *Compactor) loop(ctx context.Context) error { @@ -172,6 +172,39 @@ func (c *Compactor) loop(ctx context.Context) error { return nil } +// Compact implements retention.TableCompactor. +func (c *Compactor) Compact(ctx context.Context, tableName string, destinationPath string) (string, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient) + if err != nil { + return "", err + } + + objectKey, err := table.compactTo(destinationPath, true) + if err != nil { + return "", err + } + return objectKey, nil +} + +func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient) + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err) + return err + } + + err = table.compact() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to compact files", "table", tableName, "err", err) + return err + } + return nil +} + func (c *Compactor) RunCompaction(ctx context.Context) error { status := statusSuccess start := time.Now() @@ -196,19 +229,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { } for _, tableName := range tables { - table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient) - if err != nil { - status = statusFailure - level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err) - continue - } - - err = table.compact() - if err != nil { + if err := c.CompactTable(ctx, tableName); err != nil { status = statusFailure - level.Error(util_log.Logger).Log("msg", "failed to compact files", "table", tableName, "err", err) } - // check if context was cancelled before going for next table. select { case <-ctx.Done(): diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 47d42e38edd34..cd3e2479dbf49 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -34,15 +34,23 @@ const ( markersFolder = "markers" ) +// TableCompactor can compact tables. +type TableCompactor interface { + // Compact compacts the given tableName and output the result at the destinationPath then returns the object key. + // The object key is expected to be already uploaded and the local path is expected to be a copy of it. + Compact(ctx context.Context, tableName string, destinationPath string) (string, error) +} + type Marker struct { workingDirectory string config storage.SchemaConfig objectClient chunk.ObjectClient expiration ExpirationChecker markerMetrics *markerMetrics + compactor TableCompactor } -func NewMarker(workingDirectory string, config storage.SchemaConfig, objectClient chunk.ObjectClient, expiration ExpirationChecker, r prometheus.Registerer) (*Marker, error) { +func NewMarker(workingDirectory string, config storage.SchemaConfig, objectClient chunk.ObjectClient, expiration ExpirationChecker, compactor TableCompactor, r prometheus.Registerer) (*Marker, error) { if err := validatePeriods(config); err != nil { return nil, err } @@ -53,6 +61,7 @@ func NewMarker(workingDirectory string, config storage.SchemaConfig, objectClien objectClient: objectClient, expiration: expiration, markerMetrics: metrics, + compactor: compactor, }, nil } @@ -74,21 +83,34 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName string) error { } func (t *Marker) markTable(ctx context.Context, tableName string) error { - objects, err := util.ListDirectory(ctx, tableName, t.objectClient) + tableDirectory := path.Join(t.workingDirectory, tableName) + err := chunk_util.EnsureDirectory(tableDirectory) if err != nil { return err } + defer func() { + if err := os.RemoveAll(tableDirectory); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to remove temporary table directory", "err", err, "path", tableDirectory) + } + }() - if len(objects) != 1 { - // todo(1): in the future we would want to support more tables so that we can apply retention below 1d. - // for simplicity and to avoid conflict with compactor we'll support only compacted db file. - // Possibly we should apply retention right before the compactor upload compacted db. + downloadAt := filepath.Join(tableDirectory, fmt.Sprintf("retention-%d", time.Now().UnixNano())) - // todo(2): Depending on the retention rules we should be able to skip tables. - // For instance if there isn't a retention rules below 1 week, then we can skip the first 7 tables. - level.Debug(util_log.Logger).Log("msg", "skipping retention for non-compacted table", "name", tableName) - return nil + objects, err := util.ListDirectory(ctx, tableName, t.objectClient) + if err != nil { + return err + } + if len(objects) != 1 { + level.Info(util_log.Logger).Log("msg", "compacting table before applying retention", "table", tableName) + // if there are more than one table file let's compact first. + objectKey, err := t.compactor.Compact(ctx, tableName, downloadAt) + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to compact files before retention", "table", tableName, "err", err) + return err + } + return t.markTableFromPath(ctx, tableName, objectKey, downloadAt) } + objectKey := objects[0].Key if shipper_util.IsDirectory(objectKey) { @@ -96,28 +118,18 @@ func (t *Marker) markTable(ctx context.Context, tableName string) error { return nil } - tableDirectory := path.Join(t.workingDirectory, tableName) - err = chunk_util.EnsureDirectory(tableDirectory) - if err != nil { - return err - } - defer func() { - if err := os.RemoveAll(tableDirectory); err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to remove temporary table directory", "err", err, "path", tableDirectory) - } - }() - - downloadAt := filepath.Join(tableDirectory, fmt.Sprintf("retention-%d", time.Now().UnixNano())) - err = shipper_util.GetFileFromStorage(ctx, t.objectClient, objectKey, downloadAt) if err != nil { level.Warn(util_log.Logger).Log("msg", "failed to download table", "err", err, "path", downloadAt, "objectKey", objectKey) return err } + return t.markTableFromPath(ctx, tableName, objectKey, downloadAt) +} - db, err := shipper_util.SafeOpenBoltdbFile(downloadAt) +func (t *Marker) markTableFromPath(ctx context.Context, tableName, objectKey, filepath string) error { + db, err := shipper_util.SafeOpenBoltdbFile(filepath) if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to open db", "err", err, "path", downloadAt) + level.Warn(util_log.Logger).Log("msg", "failed to open db", "err", err, "path", filepath) return err } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index f13f4f9d639e9..7b72e850b826e 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -125,7 +125,7 @@ func Test_Retention(t *testing.T) { sweep.Start() defer sweep.Stop() - marker, err := NewMarker(workDir, store.schemaCfg, util.NewPrefixedObjectClient(store.objectClient, "index/"), expiration, prometheus.NewRegistry()) + marker, err := NewMarker(workDir, store.schemaCfg, util.NewPrefixedObjectClient(store.objectClient, "index/"), expiration, nil, prometheus.NewRegistry()) require.NoError(t, err) for _, table := range store.indexTables() { table.Close() diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index a1752c12b7dce..02040c330a7ec 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -61,30 +61,26 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O return &table, nil } -func (t *table) compact() error { +func (t *table) compactTo(outputPath string, force bool) (string, error) { objects, err := util.ListDirectory(t.ctx, t.name, t.storageClient) if err != nil { - return err + return "", err } level.Info(util_log.Logger).Log("msg", "listed files", "count", len(objects)) - if len(objects) < compactMinDBs { + if len(objects) < compactMinDBs && !force { level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) - return nil + return "", nil } - defer func() { - err := t.cleanup() - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to cleanup table", "name", t.name) - } - }() - + if outputPath == "" { + outputPath = filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) + } // create a new compacted db - t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))) + t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(outputPath) if err != nil { - return err + return "", err } level.Info(util_log.Logger).Log("msg", "starting compaction of dbs") @@ -169,26 +165,35 @@ func (t *table) compact() error { } if firstErr != nil { - return firstErr + return "", firstErr } // check whether we stopped compaction due to context being cancelled. select { case <-t.ctx.Done(): - return nil + return "", nil default: } level.Info(util_log.Logger).Log("msg", "finished compacting the dbs") // upload the compacted db - err = t.upload() + objectKey, err := t.upload() if err != nil { - return err + return "", err } // remove source files from storage which were compacted - return t.removeObjectsFromStorage(objects) + err = t.removeObjectsFromStorage(objects) + if err != nil { + return "", err + } + return objectKey, nil +} + +func (t *table) compact() error { + _, err := t.compactTo("", false) + return err } func (t *table) cleanup() error { @@ -283,13 +288,13 @@ func (t *table) readFile(path string) error { } // upload uploads the compacted db in compressed format. -func (t *table) upload() error { +func (t *table) upload() (string, error) { compactedDBPath := t.compactedDB.Path() // close the compactedDB to make sure all the writes are processed. err := t.compactedDB.Close() if err != nil { - return err + return "", err } t.compactedDB = nil @@ -298,13 +303,13 @@ func (t *table) upload() error { compressedDBPath := fmt.Sprintf("%s.gz", compactedDBPath) err = shipper_util.CompressFile(compactedDBPath, compressedDBPath) if err != nil { - return err + return "", err } // open the file for reading. compressedDB, err := os.Open(compressedDBPath) if err != nil { - return err + return "", err } defer func() { @@ -320,7 +325,7 @@ func (t *table) upload() error { objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) level.Info(util_log.Logger).Log("msg", "uploading the compacted file", "objectKey", objectKey) - return t.storageClient.PutObject(t.ctx, objectKey, compressedDB) + return objectKey, t.storageClient.PutObject(t.ctx, objectKey, compressedDB) } // removeObjectsFromStorage deletes objects from storage. From 6687145bc63686320e864612b0cd73c1b34f9b80 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 17:02:58 +0200 Subject: [PATCH 2/9] Hook retention into compaction. Signed-off-by: Cyril Tovena --- .../stores/shipper/compactor/compactor.go | 86 +----------- .../stores/shipper/compactor/metrics.go | 20 --- .../shipper/compactor/retention/retention.go | 127 +++-------------- .../compactor/retention/retention_test.go | 7 +- pkg/storage/stores/shipper/compactor/table.go | 129 +++++++++++++----- .../stores/shipper/compactor/table_test.go | 7 +- 6 files changed, 121 insertions(+), 255 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index a8a97bcdac1ab..747a0f7f9e3fb 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -22,7 +22,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/util" - errUtil "github.com/grafana/loki/pkg/util" ) const delimiter = "/" @@ -65,9 +64,8 @@ type Compactor struct { cfg Config objectClient chunk.ObjectClient - tableMarker *retention.Marker + tableMarker retention.TableMarker sweeper *retention.Sweeper - mutex sync.Mutex metrics *metrics } @@ -100,7 +98,7 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st metrics: newMetrics(r), sweeper: sweeper, } - marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, prefixedClient, retention.NewExpirationChecker(limits), compactor, r) + marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, retention.NewExpirationChecker(limits), r) if err != nil { return nil, err } @@ -116,12 +114,6 @@ func (c *Compactor) loop(ctx context.Context) error { level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) } } - runRetention := func() { - err := c.RunRetention(ctx) - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to run retention", "err", err) - } - } var wg sync.WaitGroup wg.Add(1) go func() { @@ -141,7 +133,7 @@ func (c *Compactor) loop(ctx context.Context) error { } }() if c.cfg.RetentionEnabled { - wg.Add(2) + wg.Add(1) go func() { // starts the chunk sweeper defer func() { @@ -151,47 +143,14 @@ func (c *Compactor) loop(ctx context.Context) error { c.sweeper.Start() <-ctx.Done() }() - go func() { - // start the index marker - defer wg.Done() - ticker := time.NewTicker(c.cfg.RetentionInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - runRetention() - case <-ctx.Done(): - return - } - } - }() } wg.Wait() return nil } -// Compact implements retention.TableCompactor. -func (c *Compactor) Compact(ctx context.Context, tableName string, destinationPath string) (string, error) { - c.mutex.Lock() - defer c.mutex.Unlock() - table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient) - if err != nil { - return "", err - } - - objectKey, err := table.compactTo(destinationPath, true) - if err != nil { - return "", err - } - return objectKey, nil -} - func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { - c.mutex.Lock() - defer c.mutex.Unlock() - table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient) + table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient, c.cfg.RetentionEnabled, c.tableMarker) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err) return err @@ -242,40 +201,3 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { return nil } - -func (c *Compactor) RunRetention(ctx context.Context) error { - status := statusSuccess - start := time.Now() - - defer func() { - level.Debug(util_log.Logger).Log("msg", "finished to processing retention on all tables", "status", status, "duration", time.Since(start)) - c.metrics.retentionOperationTotal.WithLabelValues(status).Inc() - if status == statusSuccess { - c.metrics.retentionOperationDurationSeconds.Set(time.Since(start).Seconds()) - c.metrics.retentionOperationLastSuccess.SetToCurrentTime() - } - }() - level.Debug(util_log.Logger).Log("msg", "starting to processing retention on all all tables") - - _, dirs, err := c.objectClient.List(ctx, "", delimiter) - if err != nil { - status = statusFailure - return err - } - - tables := make([]string, len(dirs)) - for i, dir := range dirs { - tables[i] = strings.TrimSuffix(string(dir), delimiter) - } - - var errs errUtil.MultiError - - for _, tableName := range tables { - if err := c.tableMarker.MarkForDelete(ctx, tableName); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to mark table for deletes", "table", tableName, "err", err) - errs.Add(err) - status = statusFailure - } - } - return errs.Err() -} diff --git a/pkg/storage/stores/shipper/compactor/metrics.go b/pkg/storage/stores/shipper/compactor/metrics.go index 96b4a2aff7cc3..fdb304b7897b9 100644 --- a/pkg/storage/stores/shipper/compactor/metrics.go +++ b/pkg/storage/stores/shipper/compactor/metrics.go @@ -14,10 +14,6 @@ type metrics struct { compactTablesOperationTotal *prometheus.CounterVec compactTablesOperationDurationSeconds prometheus.Gauge compactTablesOperationLastSuccess prometheus.Gauge - - retentionOperationTotal *prometheus.CounterVec - retentionOperationDurationSeconds prometheus.Gauge - retentionOperationLastSuccess prometheus.Gauge } func newMetrics(r prometheus.Registerer) *metrics { @@ -37,22 +33,6 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "compact_tables_operation_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run", }), - - retentionOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki_boltdb_shipper", - Name: "retention_operation_total", - Help: "Total number of retention applied by status", - }, []string{"status"}), - retentionOperationDurationSeconds: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "retention_operation_duration_seconds", - Help: "Time (in seconds) spent in applying retention for all the tables", - }), - retentionOperationLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "retention_operation_last_successful_run_timestamp_seconds", - Help: "Unix timestamp of the last successful retention run", - }), } return &m diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index cd3e2479dbf49..f6eebadce0927 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -4,23 +4,16 @@ import ( "context" "encoding/base64" "fmt" - "os" - "path" - "path/filepath" - "strings" "time" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/local" - chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/bbolt" "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/storage/stores/shipper/util" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) var ( @@ -34,23 +27,19 @@ const ( markersFolder = "markers" ) -// TableCompactor can compact tables. -type TableCompactor interface { - // Compact compacts the given tableName and output the result at the destinationPath then returns the object key. - // The object key is expected to be already uploaded and the local path is expected to be a copy of it. - Compact(ctx context.Context, tableName string, destinationPath string) (string, error) +type TableMarker interface { + // MarkForDelete marks a given table and returns if it's empty and how many marks were created. + MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) } type Marker struct { workingDirectory string config storage.SchemaConfig - objectClient chunk.ObjectClient expiration ExpirationChecker markerMetrics *markerMetrics - compactor TableCompactor } -func NewMarker(workingDirectory string, config storage.SchemaConfig, objectClient chunk.ObjectClient, expiration ExpirationChecker, compactor TableCompactor, r prometheus.Registerer) (*Marker, error) { +func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration ExpirationChecker, r prometheus.Registerer) (*Marker, error) { if err := validatePeriods(config); err != nil { return nil, err } @@ -58,15 +47,13 @@ func NewMarker(workingDirectory string, config storage.SchemaConfig, objectClien return &Marker{ workingDirectory: workingDirectory, config: config, - objectClient: objectClient, expiration: expiration, markerMetrics: metrics, - compactor: compactor, }, nil } // MarkForDelete marks all chunks expired for a given table. -func (t *Marker) MarkForDelete(ctx context.Context, tableName string) error { +func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { start := time.Now() status := statusSuccess defer func() { @@ -75,78 +62,23 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName string) error { }() level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName) - if err := t.markTable(ctx, tableName); err != nil { - status = statusFailure - return err - } - return nil -} - -func (t *Marker) markTable(ctx context.Context, tableName string) error { - tableDirectory := path.Join(t.workingDirectory, tableName) - err := chunk_util.EnsureDirectory(tableDirectory) - if err != nil { - return err - } - defer func() { - if err := os.RemoveAll(tableDirectory); err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to remove temporary table directory", "err", err, "path", tableDirectory) - } - }() - - downloadAt := filepath.Join(tableDirectory, fmt.Sprintf("retention-%d", time.Now().UnixNano())) - - objects, err := util.ListDirectory(ctx, tableName, t.objectClient) + empty, markCount, err := t.markTable(ctx, tableName, db) if err != nil { - return err - } - if len(objects) != 1 { - level.Info(util_log.Logger).Log("msg", "compacting table before applying retention", "table", tableName) - // if there are more than one table file let's compact first. - objectKey, err := t.compactor.Compact(ctx, tableName, downloadAt) - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to compact files before retention", "table", tableName, "err", err) - return err - } - return t.markTableFromPath(ctx, tableName, objectKey, downloadAt) - } - - objectKey := objects[0].Key - - if shipper_util.IsDirectory(objectKey) { - level.Debug(util_log.Logger).Log("msg", "skipping retention no table file found", "objectKey", objectKey) - return nil - } - - err = shipper_util.GetFileFromStorage(ctx, t.objectClient, objectKey, downloadAt) - if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to download table", "err", err, "path", downloadAt, "objectKey", objectKey) - return err + status = statusFailure + return false, 0, err } - return t.markTableFromPath(ctx, tableName, objectKey, downloadAt) + return empty, markCount, nil } -func (t *Marker) markTableFromPath(ctx context.Context, tableName, objectKey, filepath string) error { - db, err := shipper_util.SafeOpenBoltdbFile(filepath) - if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to open db", "err", err, "path", filepath) - return err - } - - defer func() { - if err := db.Close(); err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to close local db", "err", err) - } - }() - +func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { schemaCfg, ok := schemaPeriodForTable(t.config, tableName) if !ok { - return fmt.Errorf("could not find schema for table: %s", tableName) + return false, 0, fmt.Errorf("could not find schema for table: %s", tableName) } markerWriter, err := NewMarkerStorageWriter(t.workingDirectory) if err != nil { - return fmt.Errorf("failed to create marker writer: %w", err) + return false, 0, fmt.Errorf("failed to create marker writer: %w", err) } var empty bool @@ -172,45 +104,18 @@ func (t *Marker) markTableFromPath(ctx context.Context, tableName, objectKey, fi return nil }) if err != nil { - return err + return false, 0, err } - // if the index is empty we can delete the index table. if empty { t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc() - return t.objectClient.DeleteObject(ctx, objectKey) + return empty, markerWriter.Count(), nil } - // No chunks to delete means no changes to the remote index, we don't need to upload it. if markerWriter.Count() == 0 { t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc() - return nil + return empty, markerWriter.Count(), nil } t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc() - return t.uploadDB(ctx, db, objectKey) -} - -func (t *Marker) uploadDB(ctx context.Context, db *bbolt.DB, objectKey string) error { - sourcePath := db.Path() - if strings.HasSuffix(objectKey, ".gz") { - compressedPath := fmt.Sprintf("%s.gz", sourcePath) - err := shipper_util.CompressFile(sourcePath, compressedPath) - if err != nil { - return err - } - defer func() { - os.Remove(compressedPath) - }() - sourcePath = compressedPath - } - sourceFile, err := os.Open(sourcePath) - if err != nil { - return err - } - defer func() { - if err := sourceFile.Close(); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to close file", "path", sourceFile, "err", err) - } - }() - return t.objectClient.PutObject(ctx, objectKey, sourceFile) + return empty, markerWriter.Count(), nil } func markforDelete(marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker) (bool, error) { diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 7b72e850b826e..9ae82009cd98a 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -22,7 +22,6 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/stores/util" "github.com/grafana/loki/pkg/validation" ) @@ -125,11 +124,13 @@ func Test_Retention(t *testing.T) { sweep.Start() defer sweep.Stop() - marker, err := NewMarker(workDir, store.schemaCfg, util.NewPrefixedObjectClient(store.objectClient, "index/"), expiration, nil, prometheus.NewRegistry()) + marker, err := NewMarker(workDir, store.schemaCfg, expiration, prometheus.NewRegistry()) require.NoError(t, err) for _, table := range store.indexTables() { + _, _, err := marker.MarkForDelete(context.Background(), table.name, table.DB) + require.Nil(t, err) table.Close() - require.NoError(t, marker.MarkForDelete(context.Background(), table.name)) + } // assert using the store again. diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 02040c330a7ec..de57c502cac52 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/kit/log/level" "go.etcd.io/bbolt" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/util" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) @@ -37,6 +38,8 @@ type table struct { name string workingDirectory string storageClient chunk.ObjectClient + applyRetention bool + tableMarker retention.TableMarker compactedDB *bbolt.DB @@ -44,7 +47,7 @@ type table struct { quit chan struct{} } -func newTable(ctx context.Context, workingDirectory string, objectClient chunk.ObjectClient) (*table, error) { +func newTable(ctx context.Context, workingDirectory string, objectClient chunk.ObjectClient, applyRetention bool, tableMarker retention.TableMarker) (*table, error) { err := chunk_util.EnsureDirectory(workingDirectory) if err != nil { return nil, err @@ -56,31 +59,104 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O workingDirectory: workingDirectory, storageClient: objectClient, quit: make(chan struct{}), + applyRetention: applyRetention, + tableMarker: tableMarker, } return &table, nil } -func (t *table) compactTo(outputPath string, force bool) (string, error) { +func (t *table) compact() error { objects, err := util.ListDirectory(t.ctx, t.name, t.storageClient) if err != nil { - return "", err + return err } level.Info(util_log.Logger).Log("msg", "listed files", "count", len(objects)) - if len(objects) < compactMinDBs && !force { - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) - return "", nil + defer func() { + err := t.cleanup() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to cleanup table", "name", t.name) + } + }() + + if !t.applyRetention { + if len(objects) < compactMinDBs { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) + return nil + } + if err := t.compactFiles(objects); err != nil { + return err + } + // upload the compacted db + err = t.upload() + if err != nil { + return err + } + + // remove source files from storage which were compacted + err = t.removeObjectsFromStorage(objects) + if err != nil { + return err + } + return nil + } + + var compacted bool + if len(objects) > 1 { + if err := t.compactFiles(objects); err != nil { + return err + } + compacted = true + } + + if len(objects) == 1 { + // download the db + downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) + err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[0].Key, downloadAt) + if err != nil { + return err + } + t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(downloadAt) + if err != nil { + return err + } } - if outputPath == "" { - outputPath = filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) + if t.compactedDB == nil { + level.Info(util_log.Logger).Log("msg", "skipping compaction no files found.") + return nil } + + empty, markCount, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB) + if err != nil { + return err + } + + if empty { + return t.removeObjectsFromStorage(objects) + } + + if markCount == 0 && !compacted { + // we didn't make a modification so let's just return + return nil + } + + err = t.upload() + if err != nil { + return err + } + + return t.removeObjectsFromStorage(objects) +} + +func (t *table) compactFiles(objects []chunk.StorageObject) error { + var err error // create a new compacted db - t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(outputPath) + t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))) if err != nil { - return "", err + return err } level.Info(util_log.Logger).Log("msg", "starting compaction of dbs") @@ -165,35 +241,18 @@ func (t *table) compactTo(outputPath string, force bool) (string, error) { } if firstErr != nil { - return "", firstErr + return firstErr } // check whether we stopped compaction due to context being cancelled. select { case <-t.ctx.Done(): - return "", nil + return nil default: } level.Info(util_log.Logger).Log("msg", "finished compacting the dbs") - - // upload the compacted db - objectKey, err := t.upload() - if err != nil { - return "", err - } - - // remove source files from storage which were compacted - err = t.removeObjectsFromStorage(objects) - if err != nil { - return "", err - } - return objectKey, nil -} - -func (t *table) compact() error { - _, err := t.compactTo("", false) - return err + return nil } func (t *table) cleanup() error { @@ -288,13 +347,13 @@ func (t *table) readFile(path string) error { } // upload uploads the compacted db in compressed format. -func (t *table) upload() (string, error) { +func (t *table) upload() error { compactedDBPath := t.compactedDB.Path() // close the compactedDB to make sure all the writes are processed. err := t.compactedDB.Close() if err != nil { - return "", err + return err } t.compactedDB = nil @@ -303,13 +362,13 @@ func (t *table) upload() (string, error) { compressedDBPath := fmt.Sprintf("%s.gz", compactedDBPath) err = shipper_util.CompressFile(compactedDBPath, compressedDBPath) if err != nil { - return "", err + return err } // open the file for reading. compressedDB, err := os.Open(compressedDBPath) if err != nil { - return "", err + return err } defer func() { @@ -325,7 +384,7 @@ func (t *table) upload() (string, error) { objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) level.Info(util_log.Logger).Log("msg", "uploading the compacted file", "objectKey", objectKey) - return objectKey, t.storageClient.PutObject(t.ctx, objectKey, compressedDB) + return t.storageClient.PutObject(t.ctx, objectKey, compressedDB) } // removeObjectsFromStorage deletes objects from storage. diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 19f037e26d727..159ac7de0bd53 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -55,7 +55,7 @@ func TestTable_Compaction(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - table, err := newTable(context.Background(), tableWorkingDirectory, objectClient) + table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) require.NoError(t, err) require.NoError(t, table.compact()) @@ -104,7 +104,7 @@ func TestTable_CompactionFailure(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - table, err := newTable(context.Background(), tableWorkingDirectory, objectClient) + table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) require.NoError(t, err) // compaction should fail due to a non-boltdb file. @@ -121,7 +121,7 @@ func TestTable_CompactionFailure(t *testing.T) { // remove the non-boltdb file and ensure that compaction succeeds now. require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, "fail.txt"))) - table, err = newTable(context.Background(), tableWorkingDirectory, objectClient) + table, err = newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) require.NoError(t, err) require.NoError(t, table.compact()) @@ -168,7 +168,6 @@ func compareCompactedDB(t *testing.T, compactedDBPath string, sourceDBsPath stri require.Equal(t, v, val) return nil }) - }) require.NoError(t, err) From 0d6cdcb318a8f8633fb966e59e63f2e54fefed4d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 17:04:08 +0200 Subject: [PATCH 3/9] Remove unused flags. Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/compactor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 747a0f7f9e3fb..2dd17f9bdd398 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -42,8 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.") f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") - f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.") - f.DurationVar(&cfg.RetentionInterval, "boltdb.shipper.compactor.retention-interval", 10*time.Minute, "Interval at which to re-run the retention operation.") + f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.") f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") From cbe8aff703c9462fdff216fb83b45a4dad0da172 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 17:05:16 +0200 Subject: [PATCH 4/9] Better comment. Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/retention/retention.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index f6eebadce0927..61e90a2a4d797 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -28,7 +28,7 @@ const ( ) type TableMarker interface { - // MarkForDelete marks a given table and returns if it's empty and how many marks were created. + // MarkForDelete marks chunks to delete for a given table and returns if it's empty and how many marks were created. MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) } From b664b90da2dff313b4d99850fbdd27e5aa2813dc Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 17:10:15 +0200 Subject: [PATCH 5/9] Better cancellation. Signed-off-by: Cyril Tovena --- .../stores/shipper/compactor/retention/retention.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 61e90a2a4d797..b3cdec911551b 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -92,8 +92,10 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) if err != nil { return fmt.Errorf("failed to create chunk index iterator: %w", err) } - - empty, err = markforDelete(markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg), t.expiration) + if ctx.Err() != nil { + return ctx.Err() + } + empty, err = markforDelete(ctx, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg), t.expiration) if err != nil { return err } @@ -118,7 +120,7 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) return empty, markerWriter.Count(), nil } -func markforDelete(marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker) (bool, error) { +func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker) (bool, error) { seriesMap := newUserSeriesMap() empty := true for chunkIt.Next() { @@ -141,6 +143,9 @@ func markforDelete(marker MarkerStorageWriter, chunkIt ChunkEntryIterator, serie if empty { return true, nil } + if ctx.Err() != nil { + return false, ctx.Err() + } return false, seriesMap.ForEach(func(seriesID, userID []byte) error { return seriesCleaner.Cleanup(seriesID, userID) }) From dcf018e75a67caa241d1191980ee90b41a493203 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 17:10:38 +0200 Subject: [PATCH 6/9] Missing ctx in test. Signed-off-by: Cyril Tovena --- .../stores/shipper/compactor/retention/retention_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 9ae82009cd98a..d3bba96f23a70 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -182,7 +182,7 @@ func Test_EmptyTable(t *testing.T) { err := tables[0].DB.Update(func(tx *bbolt.Tx) error { it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) require.NoError(t, err) - empty, err := markforDelete(noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]time.Duration{"1": 0, "2": 0}})) + empty, err := markforDelete(context.Background(), noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]time.Duration{"1": 0, "2": 0}})) require.NoError(t, err) require.True(t, empty) return nil From 14df4697e10012244b0f2d5c98c9dc3024f425cb Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Apr 2021 17:13:09 +0200 Subject: [PATCH 7/9] Fixes flags that are not used. Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/compactor.go | 1 - pkg/storage/stores/shipper/compactor/compactor_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 2dd17f9bdd398..00d1076c5fc7b 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -32,7 +32,6 @@ type Config struct { SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` CompactionInterval time.Duration `yaml:"compaction_interval"` RetentionEnabled bool `yaml:"retention_enabled"` - RetentionInterval time.Duration `yaml:"retention_interval"` RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` } diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index 77d79fdc2232a..5d7913f2d8845 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -20,7 +20,6 @@ func TestIsDefaults(t *testing.T) { {&Config{ SharedStoreKeyPrefix: "index/", CompactionInterval: 2 * time.Hour, - RetentionInterval: 10 * time.Minute, RetentionDeleteDelay: 2 * time.Hour, RetentionDeleteWorkCount: 150, }, true}, From 1d31dd3d30552a84d249fc6c3adbb899ba1a2850 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 30 Apr 2021 10:42:01 +0200 Subject: [PATCH 8/9] Add compactor retention tests. Signed-off-by: Cyril Tovena --- .../shipper/compactor/compactor_test.go | 2 +- .../stores/shipper/compactor/table_test.go | 98 +++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index 5d7913f2d8845..a78955dce5c32 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -19,7 +19,7 @@ func TestIsDefaults(t *testing.T) { {&Config{}, false}, {&Config{ SharedStoreKeyPrefix: "index/", - CompactionInterval: 2 * time.Hour, + CompactionInterval: 10 * time.Minute, RetentionDeleteDelay: 2 * time.Hour, RetentionDeleteWorkCount: 150, }, true}, diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 159ac7de0bd53..88280b926c68b 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "go.etcd.io/bbolt" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) @@ -70,6 +71,103 @@ func TestTable_Compaction(t *testing.T) { compareCompactedDB(t, filepath.Join(tablePathInStorage, files[0].Name()), filepath.Join(objectStoragePath, "test-copy")) } +type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) + +func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { + return t(ctx, tableName, db) +} + +func TestTable_CompactionRetention(t *testing.T) { + for name, tt := range map[string]struct { + dbCount int + assert func(t *testing.T, storagePath string) + tableMarker retention.TableMarker + }{ + "emptied table": { + dbCount: 2, + assert: func(t *testing.T, storagePath string) { + _, err := os.Stat(storagePath) + require.True(t, os.IsNotExist(err)) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { + return true, 100, nil + }), + }, + "marked table": { + dbCount: 2, + assert: func(t *testing.T, storagePath string) { + files, err := ioutil.ReadDir(storagePath) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { + return false, 100, nil + }), + }, + "already compacted table": { + dbCount: 1, + assert: func(t *testing.T, storagePath string) { + files, err := ioutil.ReadDir(storagePath) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { + return false, 100, nil + }), + }, + "not modified": { + dbCount: 1, + assert: func(t *testing.T, storagePath string) { + files, err := ioutil.ReadDir(storagePath) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { + return false, 0, nil + }), + }, + } { + tt := tt + t.Run(name, func(t *testing.T) { + tempDir := t.TempDir() + + tableName := "test" + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) + + // setup some dbs + numDBs := tt.dbCount + numRecordsPerDB := 100 + + dbsToSetup := make(map[string]testutil.DBRecords) + for i := 0; i < numDBs; i++ { + dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{ + Start: i * numRecordsPerDB, + NumRecords: (i + 1) * numRecordsPerDB, + } + } + + testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true) + + // setup exact same copy of dbs for comparison. + testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false) + + // do the compaction + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, true, tt.tableMarker) + require.NoError(t, err) + + require.NoError(t, table.compact()) + tt.assert(t, filepath.Join(objectStoragePath, tableName)) + }) + } +} + func TestTable_CompactionFailure(t *testing.T) { tempDir, err := ioutil.TempDir("", "table-compaction-failure") require.NoError(t, err) From 3ea0c034d4e789fa6fa932eba2cc910c3ede162f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 30 Apr 2021 12:01:23 +0200 Subject: [PATCH 9/9] Review feedback. Signed-off-by: Cyril Tovena --- .../stores/shipper/compactor/table_test.go | 26 ++++++++++--------- .../stores/shipper/testutil/testutil.go | 10 +++++++ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 88280b926c68b..c18d3a9b547ad 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -20,6 +20,7 @@ import ( const ( objectsStorageDirName = "objects" workingDirName = "working-dir" + tableName = "test" ) func TestTable_Compaction(t *testing.T) { @@ -30,7 +31,6 @@ func TestTable_Compaction(t *testing.T) { require.NoError(t, os.RemoveAll(tempDir)) }() - tableName := "test" objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) tablePathInStorage := filepath.Join(objectStoragePath, tableName) tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) @@ -80,13 +80,13 @@ func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db func TestTable_CompactionRetention(t *testing.T) { for name, tt := range map[string]struct { dbCount int - assert func(t *testing.T, storagePath string) + assert func(t *testing.T, storagePath, tableName string) tableMarker retention.TableMarker }{ "emptied table": { dbCount: 2, - assert: func(t *testing.T, storagePath string) { - _, err := os.Stat(storagePath) + assert: func(t *testing.T, storagePath, tableName string) { + _, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.True(t, os.IsNotExist(err)) }, tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { @@ -95,11 +95,12 @@ func TestTable_CompactionRetention(t *testing.T) { }, "marked table": { dbCount: 2, - assert: func(t *testing.T, storagePath string) { - files, err := ioutil.ReadDir(storagePath) + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.NoError(t, err) require.Len(t, files, 1) require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) }, tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { return false, 100, nil @@ -107,11 +108,12 @@ func TestTable_CompactionRetention(t *testing.T) { }, "already compacted table": { dbCount: 1, - assert: func(t *testing.T, storagePath string) { - files, err := ioutil.ReadDir(storagePath) + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.NoError(t, err) require.Len(t, files, 1) require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) }, tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { return false, 100, nil @@ -119,11 +121,12 @@ func TestTable_CompactionRetention(t *testing.T) { }, "not modified": { dbCount: 1, - assert: func(t *testing.T, storagePath string) { - files, err := ioutil.ReadDir(storagePath) + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.NoError(t, err) require.Len(t, files, 1) require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) }, tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { return false, 0, nil @@ -134,7 +137,6 @@ func TestTable_CompactionRetention(t *testing.T) { t.Run(name, func(t *testing.T) { tempDir := t.TempDir() - tableName := "test" objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) @@ -163,7 +165,7 @@ func TestTable_CompactionRetention(t *testing.T) { require.NoError(t, err) require.NoError(t, table.compact()) - tt.assert(t, filepath.Join(objectStoragePath, tableName)) + tt.assert(t, objectStoragePath, tableName) }) } } diff --git a/pkg/storage/stores/shipper/testutil/testutil.go b/pkg/storage/stores/shipper/testutil/testutil.go index c17d5fe3e5346..7b3c3ff449bdc 100644 --- a/pkg/storage/stores/shipper/testutil/testutil.go +++ b/pkg/storage/stores/shipper/testutil/testutil.go @@ -21,6 +21,7 @@ import ( var boltBucketName = []byte("index") func AddRecordsToDB(t *testing.T, path string, dbClient *local.BoltIndexClient, start, numRecords int) { + t.Helper() db, err := local.OpenBoltdbFile(path) require.NoError(t, err) @@ -45,6 +46,7 @@ type SingleTableQuerier interface { } func TestSingleTableQuery(t *testing.T, queries []chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) { + t.Helper() minValue := start maxValue := start + numRecords fetchedRecords := make(map[string]string) @@ -60,6 +62,7 @@ type SingleDBQuerier interface { } func TestSingleDBQuery(t *testing.T, query chunk.IndexQuery, db *bbolt.DB, querier SingleDBQuerier, start, numRecords int) { + t.Helper() minValue := start maxValue := start + numRecords fetchedRecords := make(map[string]string) @@ -75,6 +78,7 @@ type MultiTableQuerier interface { } func TestMultiTableQuery(t *testing.T, queries []chunk.IndexQuery, querier MultiTableQuerier, start, numRecords int) { + t.Helper() minValue := start maxValue := start + numRecords fetchedRecords := make(map[string]string) @@ -86,6 +90,7 @@ func TestMultiTableQuery(t *testing.T, queries []chunk.IndexQuery, querier Multi } func makeTestCallback(t *testing.T, minValue, maxValue int, records map[string]string) func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { + t.Helper() recordsMtx := sync.Mutex{} return func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { itr := batch.Iterator() @@ -106,6 +111,7 @@ func makeTestCallback(t *testing.T, minValue, maxValue int, records map[string]s } func CompareDBs(t *testing.T, db1, db2 *bbolt.DB) { + t.Helper() db1Records := readDB(t, db1) db2Records := readDB(t, db2) @@ -113,6 +119,7 @@ func CompareDBs(t *testing.T, db1, db2 *bbolt.DB) { } func readDB(t *testing.T, db *bbolt.DB) map[string]string { + t.Helper() dbRecords := map[string]string{} err := db.View(func(tx *bbolt.Tx) error { @@ -134,6 +141,7 @@ type DBRecords struct { } func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DBRecords, compressRandomFiles bool) string { + t.Helper() boltIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: path}) require.NoError(t, err) @@ -155,6 +163,7 @@ func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DB } func compressFile(t *testing.T, filepath string) { + t.Helper() uncompressedFile, err := os.Open(filepath) require.NoError(t, err) @@ -173,6 +182,7 @@ func compressFile(t *testing.T, filepath string) { } func DecompressFile(t *testing.T, src, dest string) { + t.Helper() // open compressed file from storage compressedFile, err := os.Open(src) require.NoError(t, err)