diff --git a/objstore.go b/objstore.go index d7d04888..2a0fe4dc 100644 --- a/objstore.go +++ b/objstore.go @@ -424,6 +424,13 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu ConstLabels: prometheus.Labels{"bucket": name}, }, []string{"operation"}), + opsTransferredBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "objstore_bucket_operation_transferred_bytes", + Help: "Number of bytes transferred from/to bucket per operation.", + ConstLabels: prometheus.Labels{"bucket": name}, + Buckets: prometheus.ExponentialBuckets(2<<14, 2, 16), // 32KiB, 64KiB, ... 1GiB + }, []string{"operation"}), + opsDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "objstore_bucket_operation_duration_seconds", Help: "Duration of successful operations against the bucket", @@ -450,6 +457,14 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu bkt.opsDuration.WithLabelValues(op) bkt.opsFetchedBytes.WithLabelValues(op) } + // fetched bytes only relevant for get and getrange + for _, op := range []string{ + OpGet, + OpGetRange, + // TODO: Add uploads + } { + bkt.opsTransferredBytes.WithLabelValues(op) + } bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name()) return bkt } @@ -461,8 +476,8 @@ type metricBucket struct { opsFailures *prometheus.CounterVec isOpFailureExpected IsOpFailureExpectedFunc - opsFetchedBytes *prometheus.CounterVec - + opsFetchedBytes *prometheus.CounterVec + opsTransferredBytes *prometheus.HistogramVec opsDuration *prometheus.HistogramVec lastSuccessfulUploadTime *prometheus.GaugeVec } @@ -473,6 +488,7 @@ func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket { ops: b.ops, opsFailures: b.opsFailures, opsFetchedBytes: b.opsFetchedBytes, + opsTransferredBytes: b.opsTransferredBytes, isOpFailureExpected: fn, opsDuration: b.opsDuration, lastSuccessfulUploadTime: b.lastSuccessfulUploadTime, @@ -530,6 +546,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err b.opsFailures, b.isOpFailureExpected, b.opsFetchedBytes, + b.opsTransferredBytes, ), nil } @@ -551,6 +568,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in b.opsFailures, b.isOpFailureExpected, b.opsFetchedBytes, + b.opsTransferredBytes, ), nil } @@ -627,13 +645,15 @@ type timingReadCloser struct { start time.Time op string + readBytes int64 duration *prometheus.HistogramVec failed *prometheus.CounterVec isFailureExpected IsOpFailureExpectedFunc fetchedBytes *prometheus.CounterVec + transferredBytes *prometheus.HistogramVec } -func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec) *timingReadCloser { +func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) *timingReadCloser { // Initialize the metrics with 0. dur.WithLabelValues(op) failed.WithLabelValues(op) @@ -648,6 +668,8 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV failed: failed, isFailureExpected: isFailureExpected, fetchedBytes: fetchedBytes, + transferredBytes: transferredBytes, + readBytes: 0, } } @@ -662,6 +684,7 @@ func (rc *timingReadCloser) Close() error { } if !rc.alreadyGotErr && err == nil { rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds()) + rc.transferredBytes.WithLabelValues(rc.op).Observe(float64(rc.readBytes)) rc.alreadyGotErr = true } return err @@ -670,6 +693,7 @@ func (rc *timingReadCloser) Close() error { func (rc *timingReadCloser) Read(b []byte) (n int, err error) { n, err = rc.ReadCloser.Read(b) rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n)) + rc.readBytes += int64(n) // Report metric just once. if !rc.alreadyGotErr && err != nil && err != io.EOF { if !rc.isFailureExpected(err) { diff --git a/objstore_test.go b/objstore_test.go index 92b781a8..ab5c5e5b 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -79,7 +79,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) - testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) + testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader(bytes.Repeat([]byte("3"), 1024*1024)))) testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` # HELP objstore_bucket_operations_total Total number of all attempted operations against a bucket. @@ -110,17 +110,60 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { `), `objstore_bucket_operations_total`)) testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` - # HELP objstore_bucket_operation_fetched_bytes_total Total number of bytes fetched from bucket, per operation. + # HELP objstore_bucket_operation_fetched_bytes_total Total number of bytes fetched from bucket, per operation. # TYPE objstore_bucket_operation_fetched_bytes_total counter objstore_bucket_operation_fetched_bytes_total{bucket="",operation="attributes"} 0 objstore_bucket_operation_fetched_bytes_total{bucket="",operation="delete"} 0 objstore_bucket_operation_fetched_bytes_total{bucket="",operation="exists"} 0 - objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get"} 3 + objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get"} 1.048578e+06 objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get_range"} 0 objstore_bucket_operation_fetched_bytes_total{bucket="",operation="iter"} 0 objstore_bucket_operation_fetched_bytes_total{bucket="",operation="upload"} 0 `), `objstore_bucket_operation_fetched_bytes_total`)) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP objstore_bucket_operation_transferred_bytes Number of bytes transferred from/to bucket per operation. + # TYPE objstore_bucket_operation_transferred_bytes histogram + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="32768"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="65536"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="131072"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="262144"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="524288"} 2 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.048576e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="2.097152e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="4.194304e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="8.388608e+06"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.6777216e+07"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="3.3554432e+07"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="6.7108864e+07"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.34217728e+08"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="2.68435456e+08"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="5.36870912e+08"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.073741824e+09"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="+Inf"} 3 + objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get"} 1.048578e+06 + objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get"} 3 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="32768"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="65536"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="131072"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="262144"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="524288"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.048576e+06"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="2.097152e+06"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="4.194304e+06"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="8.388608e+06"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.6777216e+07"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="3.3554432e+07"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="6.7108864e+07"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.34217728e+08"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="2.68435456e+08"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="5.36870912e+08"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.073741824e+09"} 0 + objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="+Inf"} 0 + objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get_range"} 0 + objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get_range"} 0 + `), `objstore_bucket_operation_transferred_bytes`)) + testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10))) testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` @@ -143,7 +186,7 @@ func TestTimingTracingReader(t *testing.T) { tr := NopCloserWithSize(r) tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool { return false - }, m.opsFetchedBytes) + }, m.opsFetchedBytes, m.opsTransferredBytes) size, err := TryToGetSize(tr)