Skip to content

Commit

Permalink
Use a histogram to record transferred bytes for get get_range
Browse files Browse the repository at this point in the history
This will allow to understand the distribution of object store fetches.

The metric has been purposely named so it can in the future also cover
uploads.
  • Loading branch information
simonswine committed Jul 27, 2023
1 parent 89475d4 commit 8a2dd18
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
30 changes: 27 additions & 3 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
ConstLabels: prometheus.Labels{"bucket": name},
}, []string{"operation"}),

opsTransferredBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "objstore_bucket_operation_transferred_bytes",
Help: "Number of bytes transferred from/to bucket per operation.",
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: prometheus.ExponentialBuckets(2<<14, 2, 16), // 32KiB, 64KiB, ... 1GiB
}, []string{"operation"}),

opsDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "objstore_bucket_operation_duration_seconds",
Help: "Duration of successful operations against the bucket",
Expand All @@ -450,6 +457,14 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
bkt.opsDuration.WithLabelValues(op)
bkt.opsFetchedBytes.WithLabelValues(op)
}
// fetched bytes only relevant for get and getrange
for _, op := range []string{
OpGet,
OpGetRange,
// TODO: Add uploads
} {
bkt.opsTransferredBytes.WithLabelValues(op)
}
bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name())
return bkt
}
Expand All @@ -461,8 +476,8 @@ type metricBucket struct {
opsFailures *prometheus.CounterVec
isOpFailureExpected IsOpFailureExpectedFunc

opsFetchedBytes *prometheus.CounterVec

opsFetchedBytes *prometheus.CounterVec
opsTransferredBytes *prometheus.HistogramVec
opsDuration *prometheus.HistogramVec
lastSuccessfulUploadTime *prometheus.GaugeVec
}
Expand All @@ -473,6 +488,7 @@ func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket {
ops: b.ops,
opsFailures: b.opsFailures,
opsFetchedBytes: b.opsFetchedBytes,
opsTransferredBytes: b.opsTransferredBytes,
isOpFailureExpected: fn,
opsDuration: b.opsDuration,
lastSuccessfulUploadTime: b.lastSuccessfulUploadTime,
Expand Down Expand Up @@ -530,6 +546,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
b.opsFailures,
b.isOpFailureExpected,
b.opsFetchedBytes,
b.opsTransferredBytes,
), nil
}

Expand All @@ -551,6 +568,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
b.opsFailures,
b.isOpFailureExpected,
b.opsFetchedBytes,
b.opsTransferredBytes,
), nil
}

Expand Down Expand Up @@ -627,13 +645,15 @@ type timingReadCloser struct {

start time.Time
op string
readBytes int64
duration *prometheus.HistogramVec
failed *prometheus.CounterVec
isFailureExpected IsOpFailureExpectedFunc
fetchedBytes *prometheus.CounterVec
transferredBytes *prometheus.HistogramVec
}

func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec) *timingReadCloser {
func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) *timingReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
Expand All @@ -648,6 +668,8 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
failed: failed,
isFailureExpected: isFailureExpected,
fetchedBytes: fetchedBytes,
transferredBytes: transferredBytes,
readBytes: 0,
}
}

Expand All @@ -662,6 +684,7 @@ func (rc *timingReadCloser) Close() error {
}
if !rc.alreadyGotErr && err == nil {
rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds())
rc.transferredBytes.WithLabelValues(rc.op).Observe(float64(rc.readBytes))
rc.alreadyGotErr = true
}
return err
Expand All @@ -670,6 +693,7 @@ func (rc *timingReadCloser) Close() error {
func (rc *timingReadCloser) Read(b []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(b)
rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n))
rc.readBytes += int64(n)
// Report metric just once.
if !rc.alreadyGotErr && err != nil && err != io.EOF {
if !rc.isFailureExpected(err) {
Expand Down
51 changes: 47 additions & 4 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {

testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader(bytes.Repeat([]byte("3"), 1024*1024))))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operations_total Total number of all attempted operations against a bucket.
Expand Down Expand Up @@ -110,17 +110,60 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
`), `objstore_bucket_operations_total`))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operation_fetched_bytes_total Total number of bytes fetched from bucket, per operation.
# HELP objstore_bucket_operation_fetched_bytes_total Total number of bytes fetched from bucket, per operation.
# TYPE objstore_bucket_operation_fetched_bytes_total counter
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="attributes"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="delete"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="exists"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get"} 3
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get"} 1.048578e+06
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get_range"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="iter"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="upload"} 0
`), `objstore_bucket_operation_fetched_bytes_total`))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operation_transferred_bytes Number of bytes transferred from/to bucket per operation.
# TYPE objstore_bucket_operation_transferred_bytes histogram
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="32768"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="65536"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="131072"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="262144"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="524288"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.048576e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="2.097152e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="4.194304e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="8.388608e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.6777216e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="3.3554432e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="6.7108864e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.34217728e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="2.68435456e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="5.36870912e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.073741824e+09"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="+Inf"} 3
objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get"} 1.048578e+06
objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="32768"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="65536"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="131072"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="262144"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="524288"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.048576e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="2.097152e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="4.194304e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="8.388608e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.6777216e+07"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="3.3554432e+07"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="6.7108864e+07"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.34217728e+08"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="2.68435456e+08"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="5.36870912e+08"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.073741824e+09"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="+Inf"} 0
objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get_range"} 0
objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get_range"} 0
`), `objstore_bucket_operation_transferred_bytes`))

testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10)))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
Expand All @@ -143,7 +186,7 @@ func TestTimingTracingReader(t *testing.T) {
tr := NopCloserWithSize(r)
tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool {
return false
}, m.opsFetchedBytes)
}, m.opsFetchedBytes, m.opsTransferredBytes)

size, err := TryToGetSize(tr)

Expand Down

0 comments on commit 8a2dd18

Please sign in to comment.