Skip to content

Commit

Permalink
objstore: Added WithExpectedErrs to Reader which allows to control in…
Browse files Browse the repository at this point in the history
…strumentation (e.g not increment failures for expected not found).

This allows to not wake up oncall in the middle of night, becuase of expeced, properly handled case (:

Also: Has to move inmem to objstore for testing.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 3, 2020
1 parent 84495fa commit f777523
Show file tree
Hide file tree
Showing 29 changed files with 494 additions and 345 deletions.
6 changes: 3 additions & 3 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand All @@ -36,7 +36,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := inmem.NewBucket()
bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}

// Upload one compaction lvl = 2 block, one compaction lvl = 1.
// We generate index cache files only for lvl > 1 blocks.
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := inmem.NewBucket()
bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
var id ulid.ULID
{
id, err = e2eutil.CreateBlock(
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"

Expand Down Expand Up @@ -81,7 +81,7 @@ func TestUpload(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NewInMemBucket()
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestDelete(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NewInMemBucket()
{
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NewInMemBucket()
{
blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
Expand Down
12 changes: 6 additions & 6 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type MetadataModifier interface {
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.BucketReader
bkt objstore.InstrumentedBucketReader

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -152,7 +152,7 @@ type BaseFetcher struct {
}

// NewBaseFetcher constructs BaseFetcher.
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -236,7 +236,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
}
}

r, err := f.bkt.Get(ctx, metaFile)
r, err := f.bkt.ReaderWithExpectedErrs(f.bkt.IsObjNotFoundErr).Get(ctx, metaFile)
if f.bkt.IsObjNotFoundErr(err) {
// Meta.json was deleted between bkt.Exists and here.
return nil, errors.Wrapf(ErrorSyncMetaNotFound, "%v", err)
Expand Down Expand Up @@ -740,12 +740,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
bkt objstore.BucketReader
bkt objstore.InstrumentedBucketReader
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}

// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.BucketReader, delay time.Duration) *IgnoreDeletionMarkFilter {
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
logger: logger,
bkt: bkt,
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, bkt, dir, r)
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, objstore.NoopInstrumentedBucket{bkt}, dir, r)
testutil.Ok(t, err)

fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
now := time.Now()
f := &IgnoreDeletionMarkFilter{
logger: log.NewNopLogger(),
bkt: bkt,
bkt: objstore.NoopInstrumentedBucket{bkt},
delay: 48 * time.Hour,
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/block/indexheader/json_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ type JSONReader struct {
}

// NewJSONReader loads or builds new index-cache.json if not present on disk or object storage.
func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
cachefn := filepath.Join(dir, id.String(), block.IndexCacheFilename)
jr, err := newFileJSONReader(logger, cachefn)
if err == nil {
Expand All @@ -216,7 +216,7 @@ func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketRe
}

// Try to download index cache file from object store.
if err = objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
if err = objstore.DownloadFile(ctx, logger, bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr), filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
return newFileJSONReader(logger, cachefn)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/block/metadata/deletionmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ type DeletionMark struct {
}

// ReadDeletionMark reads the given deletion mark file from <dir>/deletion-mark.json in bucket.
func ReadDeletionMark(ctx context.Context, bkt objstore.BucketReader, logger log.Logger, dir string) (*DeletionMark, error) {
func ReadDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, dir string) (*DeletionMark, error) {
deletionMarkFile := path.Join(dir, DeletionMarkFilename)

r, err := bkt.Get(ctx, deletionMarkFile)
r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, deletionMarkFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil, ErrorDeletionMarkNotFound
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/metadata/deletionmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/fortytw2/leaktest"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand All @@ -29,7 +29,7 @@ func TestReadDeletionMark(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
{
blockWithoutDeletionMark := ulid.MustNew(uint64(1), nil)
_, err := ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithoutDeletionMark.String()))
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
logger := log.NewNopLogger()

metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

duplicateBlocksFilter := block.NewDeduplicateFilter()
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.NoopInstrumentedBucket{bkt}, "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
}, nil)
testutil.Ok(t, err)
Expand Down Expand Up @@ -176,9 +176,9 @@ func TestGroup_Compact_e2e(t *testing.T) {

reg := prometheus.NewRegistry()

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 48*time.Hour)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.NoopInstrumentedBucket{bkt}, 48*time.Hour)
duplicateBlocksFilter := block.NewDeduplicateFilter()
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.NoopInstrumentedBucket{bkt}, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, nil)
Expand Down
3 changes: 1 addition & 2 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -240,7 +239,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
bkt := inmem.NewBucket()
bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
for _, b := range tt.blocks {
uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (conf *Config) validate() error {
return nil
}

// NewBucket returns a new Bucket using the provided Azure config.
// NewInMemBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)

Expand Down
4 changes: 2 additions & 2 deletions pkg/objstore/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type BucketConfig struct {
Config interface{} `yaml:"config"`
}

// NewBucket initializes and returns new object storage clients.
// NewInMemBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.InstrumentedBucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewBucketFromConfig(conf []byte) (*Bucket, error) {
return NewBucket(c.Directory)
}

// NewBucket returns a new filesystem.Bucket.
// NewInMemBucket returns a new filesystem.Bucket.
func NewBucket(rootDir string) (*Bucket, error) {
absDir, err := filepath.Abs(rootDir)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Bucket struct {
closer io.Closer
}

// NewBucket returns a new Bucket against the given bucket handle.
// NewInMemBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
var gc Config
if err := yaml.Unmarshal(conf, &gc); err != nil {
Expand Down
Loading

0 comments on commit f777523

Please sign in to comment.