Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore download of missing boltdb files possibly removed during compaction #3563

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package downloads

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -221,13 +222,16 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {

// open all the downloaded dbs
for _, object := range objects {

dbName, err := getDBNameFromObjectKey(object.Key)
if err != nil {
return err
}

filePath := path.Join(folderPath, dbName)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping opening of non-existent file %s, possibly not downloaded due to it being removed during compaction.", filePath))
continue
}
boltdb, err := shipper_util.SafeOpenBoltdbFile(filePath)
if err != nil {
return err
Expand Down Expand Up @@ -454,6 +458,10 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj

err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true)
if err != nil {
if errors.Is(err, chunk.ErrStorageObjectNotFound) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", storageObject.Key))
return nil
}
return err
}

Expand Down Expand Up @@ -529,7 +537,12 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO
filePath := path.Join(folderPathForTable, dbName)
err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true)
if err != nil {
break
if errors.Is(err, chunk.ErrStorageObjectNotFound) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", object.Key))
err = nil
} else {
break
}
}
}

Expand Down
41 changes: 35 additions & 6 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,33 @@ const (
objectsStorageDirName = "objects"
)

// storageClientWithFakeObjectsInList adds a fake object in the list call response which
// helps with testing the case where objects gets deleted in the middle of a Sync/Download operation due to compaction.
type storageClientWithFakeObjectsInList struct {
StorageClient
}

func newStorageClientWithFakeObjectsInList(storageClient StorageClient) StorageClient {
return storageClientWithFakeObjectsInList{storageClient}
}

func (o storageClientWithFakeObjectsInList) List(ctx context.Context, prefix string, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
objects, commonPrefixes, err := o.StorageClient.List(ctx, prefix, delimiter)
if err != nil {
return nil, nil, err
}

objects = append(objects, chunk.StorageObject{
Key: fmt.Sprintf(prefix, "fake-object"),
ModifiedAt: time.Now(),
})

return objects, commonPrefixes, nil
}

type stopFunc func()

func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local.FSObjectClient) {
func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, StorageClient) {
cachePath := filepath.Join(path, cacheDirName)

boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: cachePath})
Expand All @@ -38,10 +62,10 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local
}

func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIndexClient, stopFunc) {
boltDBIndexClient, fsObjectClient := buildTestClients(t, path)
boltDBIndexClient, storageClient := buildTestClients(t, path)
cachePath := filepath.Join(path, cacheDirName)

table := NewTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil))
table := NewTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil))

// wait for either table to get ready or a timeout hits
select {
Expand Down Expand Up @@ -138,6 +162,9 @@ func TestTable_Sync(t *testing.T) {
stopFunc()
}()

// replace the storage client with the one that adds fake objects in the list call
table.storageClient = newStorageClientWithFakeObjectsInList(table.storageClient)

// query table to see it has expected records setup
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 20)

Expand Down Expand Up @@ -304,11 +331,13 @@ func TestLoadTable(t *testing.T) {
// setup the table in storage with some records
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbs, false)

boltDBIndexClient, fsObjectClient := buildTestClients(t, tempDir)
boltDBIndexClient, storageClient := buildTestClients(t, tempDir)
cachePath := filepath.Join(tempDir, cacheDirName)

storageClient = newStorageClientWithFakeObjectsInList(storageClient)

// try loading the table.
table, err := LoadTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil))
table, err := LoadTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil))
require.NoError(t, err)
require.NotNil(t, table)

Expand Down Expand Up @@ -338,7 +367,7 @@ func TestLoadTable(t *testing.T) {
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbs, false)

// try loading the table, it should skip loading corrupt file and reload it from storage.
table, err = LoadTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil))
table, err = LoadTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil))
require.NoError(t, err)
require.NotNil(t, table)

Expand Down