Skip to content

Commit

Permalink
ignore download of missing boltdb files possibly removed during compa…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
sandeepsukhani committed Jun 10, 2021
1 parent 7d666cc commit 6e70fa4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
17 changes: 15 additions & 2 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package downloads

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -221,13 +222,16 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {

// open all the downloaded dbs
for _, object := range objects {

dbName, err := getDBNameFromObjectKey(object.Key)
if err != nil {
return err
}

filePath := path.Join(folderPath, dbName)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping opening of non-existent file %s, possibly not downloaded due to it being removed during compaction.", filePath))
continue
}
boltdb, err := shipper_util.SafeOpenBoltdbFile(filePath)
if err != nil {
return err
Expand Down Expand Up @@ -454,6 +458,10 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj

err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true)
if err != nil {
if errors.Is(err, chunk.ErrStorageObjectNotFound) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", storageObject.Key))
return nil
}
return err
}

Expand Down Expand Up @@ -529,7 +537,12 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO
filePath := path.Join(folderPathForTable, dbName)
err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true)
if err != nil {
break
if errors.Is(err, chunk.ErrStorageObjectNotFound) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", object.Key))
err = nil
} else {
break
}
}
}

Expand Down
41 changes: 35 additions & 6 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,33 @@ const (
objectsStorageDirName = "objects"
)

// storageClientWithFakeObjectsInList adds a fake object in the list call response which
// helps with testing the case where objects gets deleted in the middle of a Sync/Download operation due to compaction.
type storageClientWithFakeObjectsInList struct {
StorageClient
}

func newStorageClientWithFakeObjectsInList(storageClient StorageClient) StorageClient {
return storageClientWithFakeObjectsInList{storageClient}
}

func (o storageClientWithFakeObjectsInList) List(ctx context.Context, prefix string, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
objects, commonPrefixes, err := o.StorageClient.List(ctx, prefix, delimiter)
if err != nil {
return nil, nil, err
}

objects = append(objects, chunk.StorageObject{
Key: fmt.Sprintf(prefix, "fake-object"),
ModifiedAt: time.Now(),
})

return objects, commonPrefixes, nil
}

type stopFunc func()

func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local.FSObjectClient) {
func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, StorageClient) {
cachePath := filepath.Join(path, cacheDirName)

boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: cachePath})
Expand All @@ -38,10 +62,10 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local
}

func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIndexClient, stopFunc) {
boltDBIndexClient, fsObjectClient := buildTestClients(t, path)
boltDBIndexClient, storageClient := buildTestClients(t, path)
cachePath := filepath.Join(path, cacheDirName)

table := NewTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil))
table := NewTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil))

// wait for either table to get ready or a timeout hits
select {
Expand Down Expand Up @@ -138,6 +162,9 @@ func TestTable_Sync(t *testing.T) {
stopFunc()
}()

// replace the storage client with the one that adds fake objects in the list call
table.storageClient = newStorageClientWithFakeObjectsInList(table.storageClient)

// query table to see it has expected records setup
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 20)

Expand Down Expand Up @@ -304,11 +331,13 @@ func TestLoadTable(t *testing.T) {
// setup the table in storage with some records
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbs, false)

boltDBIndexClient, fsObjectClient := buildTestClients(t, tempDir)
boltDBIndexClient, storageClient := buildTestClients(t, tempDir)
cachePath := filepath.Join(tempDir, cacheDirName)

storageClient = newStorageClientWithFakeObjectsInList(storageClient)

// try loading the table.
table, err := LoadTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil))
table, err := LoadTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil))
require.NoError(t, err)
require.NotNil(t, table)

Expand Down Expand Up @@ -338,7 +367,7 @@ func TestLoadTable(t *testing.T) {
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbs, false)

// try loading the table, it should skip loading corrupt file and reload it from storage.
table, err = LoadTable(context.Background(), tableName, cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil))
table, err = LoadTable(context.Background(), tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil))
require.NoError(t, err)
require.NotNil(t, table)

Expand Down

0 comments on commit 6e70fa4

Please sign in to comment.