Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boltdb shipper download failure handling and some refactorings #2156

Merged
merged 4 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 44 additions & 47 deletions pkg/storage/stores/local/downloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,19 @@ import (
)

// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache
func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc *filesCollection) (toDownload []chunk.StorageObject, toDelete []string, err error) {
if s.cfg.Mode == ShipperModeWriteOnly {
return
}

func (fc *FilesCollection) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.StorageObject, toDelete []string, err error) {
// listing tables from store
var objects []chunk.StorageObject
objects, _, err = s.storageClient.List(ctx, period+"/")
objects, _, err = fc.storageClient.List(ctx, fc.period+"/")
if err != nil {
return
}

listedUploaders := make(map[string]struct{}, len(objects))

fc.RLock()
for _, object := range objects {
uploader := strings.Split(object.Key, "/")[1]
// don't include the file which was uploaded by same ingester
if uploader == s.uploader {
continue
}
listedUploaders[uploader] = struct{}{}

// Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates
Expand All @@ -45,37 +38,39 @@ func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc
toDownload = append(toDownload, object)
}
}
fc.RUnlock()

for uploader := range fc.files {
err = fc.ForEach(func(uploader string, df *downloadedFile) error {
if _, isOK := listedUploaders[uploader]; !isOK {
toDelete = append(toDelete, uploader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means we'd delete when if uploader == s.uploader, is this correct?

}
}
return nil
})

return
}

// syncFilesForPeriod downloads updated and new files from for given period from all the uploaders and removes deleted ones
func (s *Shipper) syncFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error {
level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", period))

fc.RLock()
toDownload, toDelete, err := s.checkStorageForUpdates(ctx, period, fc)
fc.RUnlock()
// Sync downloads updated and new files from for given period from all the uploaders and removes deleted ones
func (fc *FilesCollection) Sync(ctx context.Context) error {
level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", fc.period))

toDownload, toDelete, err := fc.checkStorageForUpdates(ctx)
if err != nil {
return err
}

for _, storageObject := range toDownload {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be some bounded concurrency mechanism for downloads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in my todo and I think it deserves a separate PR. What do you think?

err = s.downloadFile(ctx, period, storageObject, fc)
err = fc.downloadFile(ctx, storageObject)
if err != nil {
return err
}
}

fc.Lock()
defer fc.Unlock()

for _, uploader := range toDelete {
err := s.deleteFileFromCache(period, uploader, fc)
err := fc.cleanupFile(uploader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better expressed as fc.CleanupFiles(toDelete...), which could then be refactored like

func (fc *FilesCollection) CleanupAllFiles(files ...string)

This cleanup function already takes the lock in the exact same way, so we'd be able to increase code reuse and be more idiomatic.

if err != nil {
return err
}
Expand All @@ -85,15 +80,15 @@ func (s *Shipper) syncFilesForPeriod(ctx context.Context, period string, fc *fil
}

// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it.
func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject chunk.StorageObject, fc *filesCollection) error {
func (fc *FilesCollection) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error {
uploader := strings.Split(storageObject.Key, "/")[1]
folderPath, _ := s.getFolderPathForPeriod(period, false)
folderPath, _ := fc.getFolderPathForPeriod(false)
filePath := path.Join(folderPath, uploader)

// download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists
tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp"))

err := s.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
err := fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
if err != nil {
return err
}
Expand All @@ -107,7 +102,7 @@ func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject
return err
}
} else {
df = downloadedFiles{}
df = &downloadedFile{}
}

// move the file from temp location to actual location
Expand All @@ -128,8 +123,8 @@ func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject
}

// getFileFromStorage downloads a file from storage to given location.
func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination string) error {
readCloser, err := s.storageClient.GetObject(ctx, objectKey)
func (fc *FilesCollection) getFileFromStorage(ctx context.Context, objectKey, destination string) error {
readCloser, err := fc.storageClient.GetObject(ctx, objectKey)
if err != nil {
return err
}
Expand All @@ -155,45 +150,47 @@ func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination
return f.Sync()
}

// downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL
// While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock
func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) (err error) {
fc.Lock()
defer fc.Unlock()

// downloadAllFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL
// While files are being downloaded it will block all reads/writes on FilesCollection by taking an exclusive lock
func (fc *FilesCollection) downloadAllFilesForPeriod(ctx context.Context) (err error) {
defer func() {
status := statusSuccess
if err != nil {
status = statusFailure
fc.setErr(err)

// cleaning up files due to error to avoid returning invalid results.
for fileName := range fc.files {
if err := fc.cleanupFile(fileName); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locks aren't obtained for cleanup (they technically are when this is called via NewFilesCollection, but I think it's better design to use the CleanupFIles(filename) method I proposed earlier).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as mentioned earlier, all the public methods take care of locking. I did it this way to do all the operations of initializing the filesCollection or cleaning up due to failures without releasing the locks. This way it is straightforward and less error-prone.

level.Error(util.Logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
}
}
}
s.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc()
fc.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc()
}()

startTime := time.Now()
totalFilesSize := int64(0)

objects, _, err := s.storageClient.List(ctx, period+"/")
objects, _, err := fc.storageClient.List(ctx, fc.period+"/")
if err != nil {
return
}

level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects))
level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", fc.period, objects))

folderPath, err := s.getFolderPathForPeriod(period, true)
folderPath, err := fc.getFolderPathForPeriod(true)
if err != nil {
return
}

for _, object := range objects {
uploader := getUploaderFromObjectKey(object.Key)
if uploader == s.uploader {
continue
}

filePath := path.Join(folderPath, uploader)
df := downloadedFiles{}
df := downloadedFile{}

err = s.getFileFromStorage(ctx, object.Key, filePath)
err = fc.getFileFromStorage(ctx, object.Key, filePath)
if err != nil {
return
}
Expand All @@ -212,18 +209,18 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc

totalFilesSize += stat.Size()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the lock can be held here rather than implicitly require the caller to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am taking the lock from the caller because this function is called in a goroutine. The goroutine could be waiting to be scheduled and some other goroutine could take the lock. While I am checking the ready channel in reads but it would be error-prone to rely only on that because someone else might forget to check the channel. The contract is simple now, all the public methods take care of locking.

fc.files[uploader] = df
fc.files[uploader] = &df
}

duration := time.Since(startTime).Seconds()
s.metrics.filesDownloadDurationSeconds.add(period, duration)
s.metrics.filesDownloadSizeBytes.add(period, totalFilesSize)
fc.metrics.filesDownloadDurationSeconds.add(fc.period, duration)
fc.metrics.filesDownloadSizeBytes.add(fc.period, totalFilesSize)

return
}

func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) {
folderPath := path.Join(s.cfg.CacheLocation, period)
func (fc *FilesCollection) getFolderPathForPeriod(ensureExists bool) (string, error) {
folderPath := path.Join(fc.cacheLocation, fc.period)

if ensureExists {
err := chunk_util.EnsureDirectory(folderPath)
Expand Down
132 changes: 132 additions & 0 deletions pkg/storage/stores/local/filescollection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package local

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"
)

type downloadedFile struct {
mtime time.Time
boltdb *bbolt.DB
}

// FilesCollection holds info about shipped boltdb index files by other uploaders(ingesters).
// It is used to hold boltdb files created by all the ingesters for same period i.e with same name.
// In the object store files are uploaded as <boltdb-filename>/<uploader-id> to manage files with same name from different ingesters.
// Note: FilesCollection does not manage the locks on mutex to allow users of it to
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
// do batch operations or single operation or operations requiring intermittent locking.
// Note2: It has an err variable which is set when FilesCollection is in invalid state due to some issue.
// All operations which try to access/update the files except cleanup returns an error if set.
type FilesCollection struct {
sync.RWMutex
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved

period string
cacheLocation string
metrics *downloaderMetrics
storageClient chunk.ObjectClient

lastUsedAt time.Time
files map[string]*downloadedFile
err error
ready chan struct{}
}

func NewFilesCollection(period, cacheLocation string, metrics *downloaderMetrics, storageClient chunk.ObjectClient) *FilesCollection {
fc := FilesCollection{
period: period,
cacheLocation: cacheLocation,
metrics: metrics,
storageClient: storageClient,
files: map[string]*downloadedFile{},
ready: make(chan struct{}),
}

// keep the files collection locked until all the files are downloaded.
fc.Lock()
go func() {
defer fc.Unlock()
defer close(fc.ready)
slim-bean marked this conversation as resolved.
Show resolved Hide resolved

// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
if err := fc.downloadAllFilesForPeriod(context.Background()); err != nil {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
level.Error(util.Logger).Log("msg", "failed to download files", "period", fc.period)
}
}()

return &fc
}

func (fc *FilesCollection) cleanupFile(fileName string) error {
df, ok := fc.files[fileName]
if !ok {
return fmt.Errorf("file %s not found in files collection for cleaning up", fileName)
}

filePath := df.boltdb.Path()

if err := df.boltdb.Close(); err != nil {
return err
}

delete(fc.files, fileName)

return os.Remove(filePath)
}

func (fc *FilesCollection) ForEach(callback func(fileName string, df *downloadedFile) error) error {
if fc.err != nil {
return fc.err
}

fc.RLock()
defer fc.RUnlock()

for fileName, df := range fc.files {
if err := callback(fileName, df); err != nil {
return err
}
}

return nil
}

func (fc *FilesCollection) CleanupAllFiles() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As expressed earlier, I think this can be changed to
func (fc *FilesCollection) CleanupFiles(files ...string)

fc.Lock()
defer fc.Unlock()

for fileName := range fc.files {
if err := fc.cleanupFile(fileName); err != nil {
return err
}
}
return nil
}

func (fc *FilesCollection) UpdateLastUsedAt() {
fc.lastUsedAt = time.Now()
}

func (fc *FilesCollection) LastUsedAt() time.Time {
return fc.lastUsedAt
}

func (fc *FilesCollection) setErr(err error) {
fc.err = err
}

func (fc *FilesCollection) Err() error {
return fc.err
}

func (fc *FilesCollection) IsReady() chan struct{} {
return fc.ready
}
Loading