diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/shipper/compactor/retention/marker.go index 2eb708b5e3715..4593e44d14207 100644 --- a/pkg/storage/stores/shipper/compactor/retention/marker.go +++ b/pkg/storage/stores/shipper/compactor/retention/marker.go @@ -203,6 +203,8 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C if err != nil { return err } + // we don't need to force sync to file, we just view the file. + dbView.NoSync = true defer func() { if err := dbView.Close(); err != nil { level.Warn(util_log.Logger).Log("msg", "failed to close db view", "err", err) @@ -212,6 +214,7 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C if err != nil { return err } + dbUpdate.MaxBatchDelay = 1 * time.Second // 1 s is way enough for saving changes, worst case this operation is idempotent. defer func() { close(queue) wg.Wait() diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index de57c502cac52..a6c79e745a3b1 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -114,7 +114,7 @@ func (t *table) compact() error { if len(objects) == 1 { // download the db downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) - err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[0].Key, downloadAt) + err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[0].Key, downloadAt, false) if err != nil { return err } @@ -122,6 +122,8 @@ func (t *table) compact() error { if err != nil { return err } + // no need to enforce write to disk, we'll upload and delete the file anyway. + t.compactedDB.NoSync = true } if t.compactedDB == nil { @@ -158,7 +160,9 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { if err != nil { return err } - + // no need to enforce write to disk, we'll upload and delete the file anyway. + // in case of failure we'll restart the whole process anyway. + t.compactedDB.NoSync = true level.Info(util_log.Logger).Log("msg", "starting compaction of dbs") errChan := make(chan error) @@ -193,7 +197,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { downloadAt := filepath.Join(t.workingDirectory, dbName) - err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objectKey, downloadAt) + err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objectKey, downloadAt, false) if err != nil { return } @@ -293,7 +297,7 @@ func (t *table) readFile(path string) error { if err != nil { return err } - + db.NoSync = true defer func() { if err := db.Close(); err != nil { level.Error(util_log.Logger).Log("msg", "failed to close db", "path", path, "err", err) @@ -360,7 +364,7 @@ func (t *table) upload() error { // compress the compactedDB. compressedDBPath := fmt.Sprintf("%s.gz", compactedDBPath) - err = shipper_util.CompressFile(compactedDBPath, compressedDBPath) + err = shipper_util.CompressFile(compactedDBPath, compressedDBPath, false) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 363ecc946b4b3..3574d6d71ba5b 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -307,7 +307,6 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca return nil }) - if err != nil { return err } @@ -453,7 +452,7 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj folderPath, _ := t.folderPathForTable(false) filePath := path.Join(folderPath, dbName) - err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath) + err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true) if err != nil { return err } @@ -528,7 +527,7 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO } filePath := path.Join(folderPathForTable, dbName) - err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath) + err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true) if err != nil { break } @@ -546,7 +545,6 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO case <-ctx.Done(): break } - } close(queue) }() diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index 39b111b71a9ed..bed0b4e87bd8b 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -67,7 +67,7 @@ type StorageClient interface { } // GetFileFromStorage downloads a file from storage to given location. -func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string) error { +func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string, sync bool) error { readCloser, err := storageClient.GetObject(ctx, objectKey) if err != nil { return err @@ -84,6 +84,11 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object return err } + defer func() { + if err := f.Close(); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to close file", "file", destination) + } + }() var objectReader io.Reader = readCloser if strings.HasSuffix(objectKey, ".gz") { decompressedReader := getGzipReader(readCloser) @@ -98,8 +103,10 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object } level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) - - return f.Sync() + if sync { + return f.Sync() + } + return nil } func GetDBNameFromObjectKey(objectKey string) (string, error) { @@ -126,7 +133,7 @@ func BuildObjectKey(tableName, uploader, dbName string) string { return objectKey } -func CompressFile(src, dest string) error { +func CompressFile(src, dest string, sync bool) error { level.Info(util_log.Logger).Log("msg", "compressing the file", "src", src, "dest", dest) uncompressedFile, err := os.Open(src) if err != nil { @@ -162,8 +169,10 @@ func CompressFile(src, dest string) error { if err == nil { return err } - - return compressedFile.Sync() + if sync { + return compressedFile.Sync() + } + return nil } type result struct { diff --git a/pkg/storage/stores/shipper/util/util_test.go b/pkg/storage/stores/shipper/util/util_test.go index 03d39f07025a1..6275020a79807 100644 --- a/pkg/storage/stores/shipper/util/util_test.go +++ b/pkg/storage/stores/shipper/util/util_test.go @@ -31,7 +31,7 @@ func Test_GetFileFromStorage(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) require.NoError(t, err) - require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src", filepath.Join(tempDir, "dest"))) + require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src", filepath.Join(tempDir, "dest"), false)) // verify the contents of the downloaded file. b, err := ioutil.ReadFile(filepath.Join(tempDir, "dest")) @@ -40,11 +40,11 @@ func Test_GetFileFromStorage(t *testing.T) { require.Equal(t, testData, b) // compress the file in storage - err = CompressFile(filepath.Join(tempDir, "src"), filepath.Join(tempDir, "src.gz")) + err = CompressFile(filepath.Join(tempDir, "src"), filepath.Join(tempDir, "src.gz"), true) require.NoError(t, err) // get the compressed file from storage - require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src.gz", filepath.Join(tempDir, "dest.gz"))) + require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src.gz", filepath.Join(tempDir, "dest.gz"), false)) // verify the contents of the downloaded gz file. b, err = ioutil.ReadFile(filepath.Join(tempDir, "dest.gz")) @@ -69,7 +69,7 @@ func Test_CompressFile(t *testing.T) { require.NoError(t, ioutil.WriteFile(uncompressedFilePath, testData, 0666)) - require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath)) + require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath, true)) require.FileExists(t, compressedFilePath) testutil.DecompressFile(t, compressedFilePath, decompressedFilePath)