Skip to content

Commit

Permalink
Merge pull request thanos-io#103 from thanos-io/do-not-return-error-w…
Browse files Browse the repository at this point in the history
…hen-expected

Metric bucket should not return error when error is expected
  • Loading branch information
yeya24 authored Dec 12, 2024
2 parents 8d266b9 + 238812f commit d69df72
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#71](https://github.com/thanos-io/objstore/pull/71) Replace method `IsCustomerManagedKeyError` for a more generic `IsAccessDeniedErr` on the bucket interface.
- [#89](https://github.com/thanos-io/objstore/pull/89) GCS: Upgrade cloud.google.com/go/storage version to `v1.35.1`.
- [#123](https://github.com/thanos-io/objstore/pull/123) *: Upgrade minio-go version to `v7.0.71`.
- [#103](https://github.com/thanos-io/objstore/pull/103) *: Don't return error in metric bucket if the error is expected.

### Removed
32 changes: 24 additions & 8 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,9 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) erro

err := b.bkt.Iter(ctx, dir, f, options...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}
Expand All @@ -649,7 +651,9 @@ func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f fun

err := b.bkt.IterWithAttributes(ctx, dir, f, options...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}
Expand All @@ -668,7 +672,9 @@ func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttri
start := time.Now()
attrs, err := b.bkt.Attributes(ctx, name)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
return attrs, err
Expand All @@ -685,7 +691,9 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err

rc, err := b.bkt.Get(ctx, name)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
Expand All @@ -712,7 +720,9 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in

rc, err := b.bkt.GetRange(ctx, name, off, length)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
Expand All @@ -738,7 +748,9 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
start := time.Now()
ok, err := b.bkt.Exists(ctx, name)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
return false, err
Expand Down Expand Up @@ -767,7 +779,9 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
defer trc.Close()
err := b.bkt.Upload(ctx, name, trc)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
return err
Expand All @@ -783,7 +797,9 @@ func (b *metricBucket) Delete(ctx context.Context, name string) error {

start := time.Now()
if err := b.bkt.Delete(ctx, name); err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
if b.metrics.isOpFailureExpected(err) {
err = nil
} else if ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
return err
Expand Down
82 changes: 77 additions & 5 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMetricBucket_Close(t *testing.T) {
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.metrics.opsFailures))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.metrics.opsDuration))

AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr))
AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), true)
testutil.Equals(t, float64(9), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpGet)))
Expand All @@ -51,7 +51,7 @@ func TestMetricBucket_Close(t *testing.T) {

// Clear bucket, but don't clear metrics to ensure we use same.
bkt.bkt = NewInMemBucket()
AcceptanceTest(t, bkt)
AcceptanceTestWithoutNotFoundErr(t, bkt)
testutil.Equals(t, float64(18), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpGet)))
Expand Down Expand Up @@ -537,6 +537,54 @@ func TestDownloadDir_CleanUp(t *testing.T) {
testutil.Assert(t, os.IsNotExist(err))
}

func TestBucketExpectedErrNoReturnError(t *testing.T) {
expectedErr := errors.New("test error")

bucket := WrapWithMetrics(&mockBucket{
get: func(_ context.Context, _ string) (io.ReadCloser, error) {
return nil, expectedErr
},
getRange: func(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return nil, expectedErr
},
upload: func(ctx context.Context, name string, r io.Reader) error {
return expectedErr
},
iter: func(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
return expectedErr
},
attributes: func(ctx context.Context, name string) (ObjectAttributes, error) {
return ObjectAttributes{}, expectedErr
},
exists: func(ctx context.Context, name string) (bool, error) {
return false, expectedErr
},
}, nil, "").WithExpectedErrs(func(err error) bool {
return errors.Is(err, expectedErr)
})

// Expect no error to be returned since the error is expected.
_, err := bucket.Get(context.Background(), "")
testutil.Ok(t, err)

_, err = bucket.GetRange(context.Background(), "", 1, 2)
testutil.Ok(t, err)

err = bucket.Upload(context.Background(), "", nil)
testutil.Ok(t, err)

err = bucket.Iter(context.Background(), "", func(s string) error {
return nil
})
testutil.Ok(t, err)

_, err = bucket.Exists(context.Background(), "")
testutil.Ok(t, err)

_, err = bucket.Attributes(context.Background(), "")
testutil.Ok(t, err)
}

// unreliableBucket implements Bucket and returns an error on every n-th Get.
type unreliableBucket struct {
Bucket
Expand Down Expand Up @@ -570,9 +618,12 @@ func (r *mockReader) Close() error {
type mockBucket struct {
Bucket

upload func(ctx context.Context, name string, r io.Reader) error
get func(ctx context.Context, name string) (io.ReadCloser, error)
getRange func(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)
upload func(ctx context.Context, name string, r io.Reader) error
get func(ctx context.Context, name string) (io.ReadCloser, error)
getRange func(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)
iter func(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
attributes func(ctx context.Context, name string) (ObjectAttributes, error)
exists func(ctx context.Context, name string) (bool, error)
}

func (b *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
Expand All @@ -596,6 +647,27 @@ func (b *mockBucket) GetRange(ctx context.Context, name string, off, length int6
return nil, errors.New("GetRange has not been mocked")
}

func (b *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
if b.iter != nil {
return b.iter(ctx, dir, f, options...)
}
return errors.New("Iter has not been mocked")
}

func (b *mockBucket) Exists(ctx context.Context, name string) (bool, error) {
if b.exists != nil {
return b.exists(ctx, name)
}
return false, errors.New("Exists has not been mocked")
}

func (b *mockBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
if b.attributes != nil {
return b.attributes(ctx, name)
}
return ObjectAttributes{}, errors.New("Attributes has not been mocked")
}

func Test_TryToGetSizeLimitedReader(t *testing.T) {
b := &bytes.Buffer{}
r := io.LimitReader(b, 1024)
Expand Down
2 changes: 1 addition & 1 deletion objtesting/acceptance_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ import (
// NOTE: This test assumes strong consistency, but in the same way it does not guarantee that if it passes, the
// used object store is strongly consistent.
func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
ForeachStore(t, objstore.AcceptanceTest)
ForeachStore(t, objstore.AcceptanceTestWithoutNotFoundErr)
}
2 changes: 1 addition & 1 deletion prefixed_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestPrefixedBucket_Acceptance(t *testing.T) {
"someprefix"}

for _, prefix := range prefixes {
AcceptanceTest(t, NewPrefixedBucket(NewInMemBucket(), prefix))
AcceptanceTestWithoutNotFoundErr(t, NewPrefixedBucket(NewInMemBucket(), prefix))
UsesPrefixTest(t, NewInMemBucket(), prefix)
}
}
Expand Down
18 changes: 13 additions & 5 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,32 @@ func (b noopInstrumentedBucket) ReaderWithExpectedErrs(IsOpFailureExpectedFunc)
return b
}

func AcceptanceTest(t *testing.T, bkt Bucket) {
func AcceptanceTestWithoutNotFoundErr(t *testing.T, bkt Bucket) {
AcceptanceTest(t, bkt, false)
}

func AcceptanceTest(t *testing.T, bkt Bucket, expectedNotFoundErr bool) {
ctx := context.Background()

_, err := bkt.Get(ctx, "")
testutil.NotOk(t, err)
testutil.Assert(t, !bkt.IsObjNotFoundErr(err), "expected user error got not found %s", err)

_, err = bkt.Get(ctx, "id1/obj_1.some")
testutil.NotOk(t, err)
testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error got %s", err)
if !expectedNotFoundErr {
testutil.NotOk(t, err)
testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error got %s", err)
}

ok, err := bkt.Exists(ctx, "id1/obj_1.some")
testutil.Ok(t, err)
testutil.Assert(t, !ok, "expected not exits")

_, err = bkt.Attributes(ctx, "id1/obj_1.some")
testutil.NotOk(t, err)
testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err)
if !expectedNotFoundErr {
testutil.NotOk(t, err)
testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error got %s", err)
}

// Upload first object.
testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@")))
Expand Down

0 comments on commit d69df72

Please sign in to comment.