-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
boltdb shipper download failure handling and some refactorings #2156
Changes from all commits
e74f62d
a35c4cc
5b0ba1f
a764871
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,26 +17,19 @@ import ( | |
) | ||
|
||
// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache | ||
func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc *filesCollection) (toDownload []chunk.StorageObject, toDelete []string, err error) { | ||
if s.cfg.Mode == ShipperModeWriteOnly { | ||
return | ||
} | ||
|
||
func (fc *FilesCollection) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.StorageObject, toDelete []string, err error) { | ||
// listing tables from store | ||
var objects []chunk.StorageObject | ||
objects, _, err = s.storageClient.List(ctx, period+"/") | ||
objects, _, err = fc.storageClient.List(ctx, fc.period+"/") | ||
if err != nil { | ||
return | ||
} | ||
|
||
listedUploaders := make(map[string]struct{}, len(objects)) | ||
|
||
fc.mtx.RLock() | ||
for _, object := range objects { | ||
uploader := strings.Split(object.Key, "/")[1] | ||
// don't include the file which was uploaded by same ingester | ||
if uploader == s.uploader { | ||
continue | ||
} | ||
listedUploaders[uploader] = struct{}{} | ||
|
||
// Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates | ||
|
@@ -45,37 +38,39 @@ func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc | |
toDownload = append(toDownload, object) | ||
} | ||
} | ||
fc.mtx.RUnlock() | ||
|
||
for uploader := range fc.files { | ||
err = fc.ForEach(func(uploader string, df *downloadedFile) error { | ||
if _, isOK := listedUploaders[uploader]; !isOK { | ||
toDelete = append(toDelete, uploader) | ||
} | ||
} | ||
return nil | ||
}) | ||
|
||
return | ||
} | ||
|
||
// syncFilesForPeriod downloads updated and new files from for given period from all the uploaders and removes deleted ones | ||
func (s *Shipper) syncFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error { | ||
level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", period)) | ||
|
||
fc.RLock() | ||
toDownload, toDelete, err := s.checkStorageForUpdates(ctx, period, fc) | ||
fc.RUnlock() | ||
// Sync downloads updated and new files from for given period from all the uploaders and removes deleted ones | ||
func (fc *FilesCollection) Sync(ctx context.Context) error { | ||
level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", fc.period)) | ||
|
||
toDownload, toDelete, err := fc.checkStorageForUpdates(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, storageObject := range toDownload { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there should be some bounded concurrency mechanism for downloads. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is in my todo and I think it deserves a separate PR. What do you think? |
||
err = s.downloadFile(ctx, period, storageObject, fc) | ||
err = fc.downloadFile(ctx, storageObject) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
fc.mtx.Lock() | ||
defer fc.mtx.Unlock() | ||
|
||
for _, uploader := range toDelete { | ||
err := s.deleteFileFromCache(period, uploader, fc) | ||
err := fc.cleanupFile(uploader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be better expressed as
This cleanup function already takes the lock in the exact same way, so we'd be able to increase code reuse and be more idiomatic. |
||
if err != nil { | ||
return err | ||
} | ||
|
@@ -85,29 +80,29 @@ func (s *Shipper) syncFilesForPeriod(ctx context.Context, period string, fc *fil | |
} | ||
|
||
// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it. | ||
func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject chunk.StorageObject, fc *filesCollection) error { | ||
func (fc *FilesCollection) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error { | ||
uploader := strings.Split(storageObject.Key, "/")[1] | ||
folderPath, _ := s.getFolderPathForPeriod(period, false) | ||
folderPath, _ := fc.getFolderPathForPeriod(false) | ||
filePath := path.Join(folderPath, uploader) | ||
|
||
// download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists | ||
tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp")) | ||
|
||
err := s.getFileFromStorage(ctx, storageObject.Key, tempFilePath) | ||
err := fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
fc.Lock() | ||
defer fc.Unlock() | ||
fc.mtx.Lock() | ||
defer fc.mtx.Unlock() | ||
|
||
df, ok := fc.files[uploader] | ||
if ok { | ||
if err := df.boltdb.Close(); err != nil { | ||
return err | ||
} | ||
} else { | ||
df = downloadedFiles{} | ||
df = &downloadedFile{} | ||
} | ||
|
||
// move the file from temp location to actual location | ||
|
@@ -128,8 +123,8 @@ func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject | |
} | ||
|
||
// getFileFromStorage downloads a file from storage to given location. | ||
func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination string) error { | ||
readCloser, err := s.storageClient.GetObject(ctx, objectKey) | ||
func (fc *FilesCollection) getFileFromStorage(ctx context.Context, objectKey, destination string) error { | ||
readCloser, err := fc.storageClient.GetObject(ctx, objectKey) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -155,45 +150,47 @@ func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination | |
return f.Sync() | ||
} | ||
|
||
// downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL | ||
// While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock | ||
func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) (err error) { | ||
fc.Lock() | ||
defer fc.Unlock() | ||
|
||
// downloadAllFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL | ||
// While files are being downloaded it will block all reads/writes on FilesCollection by taking an exclusive lock | ||
func (fc *FilesCollection) downloadAllFilesForPeriod(ctx context.Context) (err error) { | ||
defer func() { | ||
status := statusSuccess | ||
if err != nil { | ||
status = statusFailure | ||
fc.setErr(err) | ||
|
||
// cleaning up files due to error to avoid returning invalid results. | ||
for fileName := range fc.files { | ||
if err := fc.cleanupFile(fileName); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Locks aren't obtained for cleanup (they technically are when this is called via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, as mentioned earlier, all the public methods take care of locking. I did it this way to do all the operations of initializing the |
||
level.Error(util.Logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) | ||
} | ||
} | ||
} | ||
s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() | ||
fc.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() | ||
}() | ||
|
||
startTime := time.Now() | ||
totalFilesSize := int64(0) | ||
|
||
objects, _, err := s.storageClient.List(ctx, period+"/") | ||
objects, _, err := fc.storageClient.List(ctx, fc.period+"/") | ||
if err != nil { | ||
return | ||
} | ||
|
||
level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects)) | ||
level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", fc.period, objects)) | ||
|
||
folderPath, err := s.getFolderPathForPeriod(period, true) | ||
folderPath, err := fc.getFolderPathForPeriod(true) | ||
if err != nil { | ||
return | ||
} | ||
|
||
for _, object := range objects { | ||
uploader := getUploaderFromObjectKey(object.Key) | ||
if uploader == s.uploader { | ||
continue | ||
} | ||
|
||
filePath := path.Join(folderPath, uploader) | ||
df := downloadedFiles{} | ||
df := downloadedFile{} | ||
|
||
err = s.getFileFromStorage(ctx, object.Key, filePath) | ||
err = fc.getFileFromStorage(ctx, object.Key, filePath) | ||
if err != nil { | ||
return | ||
} | ||
|
@@ -212,18 +209,18 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc | |
|
||
totalFilesSize += stat.Size() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the lock can be held here rather than implicitly require the caller to do so. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am taking the lock from the caller because this function is called in a goroutine. The goroutine could be waiting to be scheduled and some other goroutine could take the lock. While I am checking the |
||
fc.files[uploader] = df | ||
fc.files[uploader] = &df | ||
} | ||
|
||
duration := time.Since(startTime).Seconds() | ||
s.metrics.filesDownloadDurationSeconds.add(period, duration) | ||
s.metrics.filesDownloadSizeBytes.add(period, totalFilesSize) | ||
fc.metrics.filesDownloadDurationSeconds.add(fc.period, duration) | ||
fc.metrics.filesDownloadSizeBytes.add(fc.period, totalFilesSize) | ||
|
||
return | ||
} | ||
|
||
func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) { | ||
folderPath := path.Join(s.cfg.CacheLocation, period) | ||
func (fc *FilesCollection) getFolderPathForPeriod(ensureExists bool) (string, error) { | ||
folderPath := path.Join(fc.cacheLocation, fc.period) | ||
|
||
if ensureExists { | ||
err := chunk_util.EnsureDirectory(folderPath) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package local | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/cortexproject/cortex/pkg/chunk" | ||
"github.com/cortexproject/cortex/pkg/util" | ||
"github.com/go-kit/kit/log/level" | ||
"go.etcd.io/bbolt" | ||
) | ||
|
||
// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time. | ||
const downloadTimeout = 5 * time.Minute | ||
|
||
type downloadedFile struct { | ||
mtime time.Time | ||
boltdb *bbolt.DB | ||
} | ||
|
||
// FilesCollection holds info about shipped boltdb index files by other uploaders(ingesters). | ||
// It is used to hold boltdb files created by all the ingesters for same period i.e with same name. | ||
// In the object store files are uploaded as <boltdb-filename>/<uploader-id> to manage files with same name from different ingesters. | ||
// Note: FilesCollection takes care of locking with all the exported/public methods. | ||
// Note2: It has an err variable which is set when FilesCollection is in invalid state due to some issue. | ||
// All operations which try to access/update the files except cleanup returns an error if set. | ||
type FilesCollection struct { | ||
mtx sync.RWMutex | ||
|
||
period string | ||
cacheLocation string | ||
metrics *downloaderMetrics | ||
storageClient chunk.ObjectClient | ||
|
||
lastUsedAt time.Time | ||
files map[string]*downloadedFile | ||
err error | ||
ready chan struct{} | ||
} | ||
|
||
func NewFilesCollection(period, cacheLocation string, metrics *downloaderMetrics, storageClient chunk.ObjectClient) *FilesCollection { | ||
fc := FilesCollection{ | ||
period: period, | ||
cacheLocation: cacheLocation, | ||
metrics: metrics, | ||
storageClient: storageClient, | ||
files: map[string]*downloadedFile{}, | ||
ready: make(chan struct{}), | ||
} | ||
|
||
// keep the files collection locked until all the files are downloaded. | ||
fc.mtx.Lock() | ||
go func() { | ||
defer fc.mtx.Unlock() | ||
defer close(fc.ready) | ||
slim-bean marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout) | ||
defer cancel() | ||
|
||
// Using background context to avoid cancellation of download when request times out. | ||
// We would anyways need the files for serving next requests. | ||
if err := fc.downloadAllFilesForPeriod(ctx); err != nil { | ||
level.Error(util.Logger).Log("msg", "failed to download files", "period", fc.period) | ||
} | ||
}() | ||
|
||
return &fc | ||
} | ||
|
||
func (fc *FilesCollection) cleanupFile(fileName string) error { | ||
df, ok := fc.files[fileName] | ||
if !ok { | ||
return fmt.Errorf("file %s not found in files collection for cleaning up", fileName) | ||
} | ||
|
||
filePath := df.boltdb.Path() | ||
|
||
if err := df.boltdb.Close(); err != nil { | ||
return err | ||
} | ||
|
||
delete(fc.files, fileName) | ||
|
||
return os.Remove(filePath) | ||
} | ||
|
||
func (fc *FilesCollection) ForEach(callback func(fileName string, df *downloadedFile) error) error { | ||
if fc.err != nil { | ||
return fc.err | ||
} | ||
|
||
fc.mtx.RLock() | ||
defer fc.mtx.RUnlock() | ||
|
||
for fileName, df := range fc.files { | ||
if err := callback(fileName, df); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (fc *FilesCollection) CleanupAllFiles() error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As expressed earlier, I think this can be changed to |
||
fc.mtx.Lock() | ||
defer fc.mtx.Unlock() | ||
|
||
for fileName := range fc.files { | ||
if err := fc.cleanupFile(fileName); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (fc *FilesCollection) UpdateLastUsedAt() { | ||
fc.lastUsedAt = time.Now() | ||
} | ||
|
||
func (fc *FilesCollection) LastUsedAt() time.Time { | ||
return fc.lastUsedAt | ||
} | ||
|
||
func (fc *FilesCollection) setErr(err error) { | ||
fc.err = err | ||
} | ||
|
||
func (fc *FilesCollection) Err() error { | ||
return fc.err | ||
} | ||
|
||
func (fc *FilesCollection) IsReady() chan struct{} { | ||
return fc.ready | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means we'd delete when
if uploader == s.uploader
, is this correct?