Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes file sync syscall for compaction. #3693

Merged
merged 1 commit into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C
if err != nil {
return err
}
// we don't need to force sync to file, we just view the file.
dbView.NoSync = true
defer func() {
if err := dbView.Close(); err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to close db view", "err", err)
Expand All @@ -212,6 +214,7 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C
if err != nil {
return err
}
dbUpdate.MaxBatchDelay = 1 * time.Second // 1 s is way enough for saving changes, worst case this operation is idempotent.
defer func() {
close(queue)
wg.Wait()
Expand Down
14 changes: 9 additions & 5 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,16 @@ func (t *table) compact() error {
if len(objects) == 1 {
// download the db
downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[0].Key, downloadAt)
err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[0].Key, downloadAt, false)
if err != nil {
return err
}
t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(downloadAt)
if err != nil {
return err
}
// no need to enforce write to disk, we'll upload and delete the file anyway.
t.compactedDB.NoSync = true
}

if t.compactedDB == nil {
Expand Down Expand Up @@ -158,7 +160,9 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
if err != nil {
return err
}

// no need to enforce write to disk, we'll upload and delete the file anyway.
// in case of failure we'll restart the whole process anyway.
t.compactedDB.NoSync = true
level.Info(util_log.Logger).Log("msg", "starting compaction of dbs")

errChan := make(chan error)
Expand Down Expand Up @@ -193,7 +197,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {

downloadAt := filepath.Join(t.workingDirectory, dbName)

err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objectKey, downloadAt)
err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objectKey, downloadAt, false)
if err != nil {
return
}
Expand Down Expand Up @@ -293,7 +297,7 @@ func (t *table) readFile(path string) error {
if err != nil {
return err
}

db.NoSync = true
defer func() {
if err := db.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close db", "path", path, "err", err)
Expand Down Expand Up @@ -360,7 +364,7 @@ func (t *table) upload() error {

// compress the compactedDB.
compressedDBPath := fmt.Sprintf("%s.gz", compactedDBPath)
err = shipper_util.CompressFile(compactedDBPath, compressedDBPath)
err = shipper_util.CompressFile(compactedDBPath, compressedDBPath, false)
if err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca

return nil
})

if err != nil {
return err
}
Expand Down Expand Up @@ -453,7 +452,7 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj
folderPath, _ := t.folderPathForTable(false)
filePath := path.Join(folderPath, dbName)

err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath)
err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -528,7 +527,7 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO
}

filePath := path.Join(folderPathForTable, dbName)
err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath)
err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true)
if err != nil {
break
}
Expand All @@ -546,7 +545,6 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO
case <-ctx.Done():
break
}

}
close(queue)
}()
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/stores/shipper/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type StorageClient interface {
}

// GetFileFromStorage downloads a file from storage to given location.
func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string) error {
func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string, sync bool) error {
readCloser, err := storageClient.GetObject(ctx, objectKey)
if err != nil {
return err
Expand All @@ -84,6 +84,11 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object
return err
}

defer func() {
if err := f.Close(); err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to close file", "file", destination)
}
}()
var objectReader io.Reader = readCloser
if strings.HasSuffix(objectKey, ".gz") {
decompressedReader := getGzipReader(readCloser)
Expand All @@ -98,8 +103,10 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object
}

level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey))

return f.Sync()
if sync {
return f.Sync()
}
return nil
}

func GetDBNameFromObjectKey(objectKey string) (string, error) {
Expand All @@ -126,7 +133,7 @@ func BuildObjectKey(tableName, uploader, dbName string) string {
return objectKey
}

func CompressFile(src, dest string) error {
func CompressFile(src, dest string, sync bool) error {
level.Info(util_log.Logger).Log("msg", "compressing the file", "src", src, "dest", dest)
uncompressedFile, err := os.Open(src)
if err != nil {
Expand Down Expand Up @@ -162,8 +169,10 @@ func CompressFile(src, dest string) error {
if err == nil {
return err
}

return compressedFile.Sync()
if sync {
return compressedFile.Sync()
}
return nil
}

type result struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_GetFileFromStorage(t *testing.T) {
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir})
require.NoError(t, err)

require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src", filepath.Join(tempDir, "dest")))
require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src", filepath.Join(tempDir, "dest"), false))

// verify the contents of the downloaded file.
b, err := ioutil.ReadFile(filepath.Join(tempDir, "dest"))
Expand All @@ -40,11 +40,11 @@ func Test_GetFileFromStorage(t *testing.T) {
require.Equal(t, testData, b)

// compress the file in storage
err = CompressFile(filepath.Join(tempDir, "src"), filepath.Join(tempDir, "src.gz"))
err = CompressFile(filepath.Join(tempDir, "src"), filepath.Join(tempDir, "src.gz"), true)
require.NoError(t, err)

// get the compressed file from storage
require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src.gz", filepath.Join(tempDir, "dest.gz")))
require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src.gz", filepath.Join(tempDir, "dest.gz"), false))

// verify the contents of the downloaded gz file.
b, err = ioutil.ReadFile(filepath.Join(tempDir, "dest.gz"))
Expand All @@ -69,7 +69,7 @@ func Test_CompressFile(t *testing.T) {

require.NoError(t, ioutil.WriteFile(uncompressedFilePath, testData, 0666))

require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath))
require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath, true))
require.FileExists(t, compressedFilePath)

testutil.DecompressFile(t, compressedFilePath, decompressedFilePath)
Expand Down