Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [k207] chore: download then extract in separate steps #13279

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/storage/stores/shipper/indexshipper/downloads/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package downloads

import (
"context"
"errors"
"sync"
"time"
)

// mtxWithReadiness combines a mutex with readiness channel. It would acquire lock only when the channel is closed to mark it ready.
Expand All @@ -22,6 +24,9 @@ func (m *mtxWithReadiness) markReady() {
}

func (m *mtxWithReadiness) awaitReady(ctx context.Context) error {
ctx, cancel := context.WithTimeoutCause(ctx, 30*time.Second, errors.New("exceeded 30 seconds in awaitReady"))
defer cancel()

select {
case <-ctx.Done():
return ctx.Err()
Expand Down
38 changes: 35 additions & 3 deletions pkg/storage/stores/shipper/indexshipper/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,38 @@ func DownloadFileFromStorage(destination string, decompressFile bool, sync bool,
}
}()

tmpName := destination + "-tmp"

ftmp, err := os.Create(tmpName)
if err != nil {
return err
}
defer func() {
err := os.Remove(tmpName)
if err != nil {
level.Warn(logger).Log("msg", "failed to delete temp file from index download", "err", err)
}
}()

_, err = io.Copy(ftmp, readCloser)
if err != nil {
return err
}

dlTime := time.Since(start)
level.Info(logger).Log("msg", "downloaded file", "total_time", dlTime)
start = time.Now()

tmpReader, err := os.Open(tmpName)
if err != nil {
return err
}
defer func() {
if err := tmpReader.Close(); err != nil {
level.Warn(logger).Log("msg", "failed to close file", "file", destination+"-tmp")
}
}()

f, err := os.Create(destination)
if err != nil {
return err
Expand All @@ -65,9 +97,9 @@ func DownloadFileFromStorage(destination string, decompressFile bool, sync bool,
level.Warn(logger).Log("msg", "failed to close file", "file", destination)
}
}()
var objectReader io.Reader = readCloser
var objectReader io.Reader = tmpReader
if decompressFile {
decompressedReader, err := getGzipReader(readCloser)
decompressedReader, err := getGzipReader(tmpReader)
if err != nil {
return err
}
Expand All @@ -89,7 +121,7 @@ func DownloadFileFromStorage(destination string, decompressFile bool, sync bool,
if err == nil {
logger = log.With(logger, "size", humanize.Bytes(uint64(fStat.Size())))
}
level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start))
level.Info(logger).Log("msg", "downloaded and extracted file", "download time", dlTime, "extract time", time.Since(start))

if sync {
return f.Sync()
Expand Down
Loading