diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index 37bfef6b03..91529db2a5 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -333,8 +333,19 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close") return nil, err } + // Add size info into reader to pass it to Upload function. + r := objectSizerReadCloser{ReadCloser: resp.Body, size: resp.ContentLength} + return r, nil +} + +type objectSizerReadCloser struct { + io.ReadCloser + size int64 +} - return resp.Body, nil +// ObjectSize implement objstore.ObjectSizer. +func (o objectSizerReadCloser) ObjectSize() (int64, error) { + return o.size, nil } // Get returns a reader for the given object name. diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 500b86a5ea..5089e492e0 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -492,6 +492,8 @@ func (b *metricBucket) Name() string { type timingReadCloser struct { io.ReadCloser + objSize int64 + objSizeErr error alreadyGotErr bool @@ -506,8 +508,11 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV // Initialize the metrics with 0. dur.WithLabelValues(op) failed.WithLabelValues(op) + objSize, objSizeErr := TryToGetSize(rc) return &timingReadCloser{ ReadCloser: rc, + objSize: objSize, + objSizeErr: objSizeErr, start: time.Now(), op: op, duration: dur, @@ -516,6 +521,10 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV } } +func (t *timingReadCloser) ObjectSize() (int64, error) { + return t.objSize, t.objSizeErr +} + func (rc *timingReadCloser) Close() error { err := rc.ReadCloser.Close() if !rc.alreadyGotErr && err != nil { diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index acd46a93f5..e57e23324e 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -86,3 +86,30 @@ func TestTracingReader(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, int64(11), size) } + +func TestTimingTracingReader(t *testing.T) { + m := BucketWithMetrics("", NewInMemBucket(), nil) + r := bytes.NewReader([]byte("hello world")) + + tr := NopCloserWithSize(r) + tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool { + return false + }) + tr = newTracingReadCloser(tr, nil) + + size, err := TryToGetSize(tr) + + testutil.Ok(t, err) + testutil.Equals(t, int64(11), size) + + smallBuf := make([]byte, 4) + n, err := io.ReadFull(tr, smallBuf) + testutil.Ok(t, err) + testutil.Equals(t, 4, n) + + // Verify that size is still the same, after reading 4 bytes. + size, err = TryToGetSize(tr) + + testutil.Ok(t, err) + testutil.Equals(t, int64(11), size) +}