diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/util.go b/pkg/storage/stores/shipper/indexshipper/downloads/util.go index d9eda440b3ca..457f76b3433d 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/util.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/util.go @@ -2,7 +2,9 @@ package downloads import ( "context" + "errors" "sync" + "time" ) // mtxWithReadiness combines a mutex with readiness channel. It would acquire lock only when the channel is closed to mark it ready. @@ -22,6 +24,9 @@ func (m *mtxWithReadiness) markReady() { } func (m *mtxWithReadiness) awaitReady(ctx context.Context) error { + ctx, cancel := context.WithTimeoutCause(ctx, 30*time.Second, errors.New("exceeded 30 seconds in awaitReady")) + defer cancel() + select { case <-ctx.Done(): return ctx.Err() diff --git a/pkg/storage/stores/shipper/indexshipper/storage/util.go b/pkg/storage/stores/shipper/indexshipper/storage/util.go index 3e0bcc9324b8..7da0123ecbdd 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/util.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/util.go @@ -55,6 +55,38 @@ func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, } }() + tmpName := destination + "-tmp" + + ftmp, err := os.Create(tmpName) + if err != nil { + return err + } + defer func() { + err := os.Remove(tmpName) + if err != nil { + level.Warn(logger).Log("msg", "failed to delete temp file from index download", "err", err) + } + }() + + _, err = io.Copy(ftmp, readCloser) + if err != nil { + return err + } + + dlTime := time.Since(start) + level.Info(logger).Log("msg", "downloaded file", "total_time", dlTime) + start = time.Now() + + tmpReader, err := os.Open(tmpName) + if err != nil { + return err + } + defer func() { + if err := tmpReader.Close(); err != nil { + level.Warn(logger).Log("msg", "failed to close file", "file", destination+"-tmp") + } + }() + f, err := os.Create(destination) if err != nil { return err @@ -65,9 +97,9 @@ func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, level.Warn(logger).Log("msg", "failed to close file", "file", destination) } }() - var objectReader io.Reader = readCloser + var objectReader io.Reader = tmpReader if decompressFile { - decompressedReader, err := getGzipReader(readCloser) + decompressedReader, err := getGzipReader(tmpReader) if err != nil { return err } @@ -89,7 +121,7 @@ func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, if err == nil { logger = log.With(logger, "size", humanize.Bytes(uint64(fStat.Size()))) } - level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start)) + level.Info(logger).Log("msg", "downloaded and extracted file", "download time", dlTime, "extract time", time.Since(start)) if sync { return f.Sync()