Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a histogram to record transferred bytes for get get_range #69

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
ConstLabels: prometheus.Labels{"bucket": name},
}, []string{"operation"}),

opsTransferredBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "objstore_bucket_operation_transferred_bytes",
Help: "Number of bytes transferred from/to bucket per operation.",
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: prometheus.ExponentialBuckets(2<<14, 2, 16), // 32KiB, 64KiB, ... 1GiB
}, []string{"operation"}),

opsDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "objstore_bucket_operation_duration_seconds",
Help: "Duration of successful operations against the bucket",
Expand All @@ -450,6 +457,14 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
bkt.opsDuration.WithLabelValues(op)
bkt.opsFetchedBytes.WithLabelValues(op)
}
// fetched bytes only relevant for get and getrange
for _, op := range []string{
OpGet,
OpGetRange,
// TODO: Add uploads
} {
bkt.opsTransferredBytes.WithLabelValues(op)
}
bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name())
return bkt
}
Expand All @@ -461,8 +476,8 @@ type metricBucket struct {
opsFailures *prometheus.CounterVec
isOpFailureExpected IsOpFailureExpectedFunc

opsFetchedBytes *prometheus.CounterVec

opsFetchedBytes *prometheus.CounterVec
opsTransferredBytes *prometheus.HistogramVec
opsDuration *prometheus.HistogramVec
lastSuccessfulUploadTime *prometheus.GaugeVec
}
Expand All @@ -473,6 +488,7 @@ func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket {
ops: b.ops,
opsFailures: b.opsFailures,
opsFetchedBytes: b.opsFetchedBytes,
opsTransferredBytes: b.opsTransferredBytes,
isOpFailureExpected: fn,
opsDuration: b.opsDuration,
lastSuccessfulUploadTime: b.lastSuccessfulUploadTime,
Expand Down Expand Up @@ -530,6 +546,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
b.opsFailures,
b.isOpFailureExpected,
b.opsFetchedBytes,
b.opsTransferredBytes,
), nil
}

Expand All @@ -551,6 +568,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
b.opsFailures,
b.isOpFailureExpected,
b.opsFetchedBytes,
b.opsTransferredBytes,
), nil
}

Expand Down Expand Up @@ -627,13 +645,15 @@ type timingReadCloser struct {

start time.Time
op string
readBytes int64
duration *prometheus.HistogramVec
failed *prometheus.CounterVec
isFailureExpected IsOpFailureExpectedFunc
fetchedBytes *prometheus.CounterVec
transferredBytes *prometheus.HistogramVec
}

func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec) *timingReadCloser {
func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) *timingReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
Expand All @@ -648,6 +668,8 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
failed: failed,
isFailureExpected: isFailureExpected,
fetchedBytes: fetchedBytes,
transferredBytes: transferredBytes,
readBytes: 0,
}
}

Expand All @@ -662,6 +684,7 @@ func (rc *timingReadCloser) Close() error {
}
if !rc.alreadyGotErr && err == nil {
rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds())
rc.transferredBytes.WithLabelValues(rc.op).Observe(float64(rc.readBytes))
rc.alreadyGotErr = true
}
return err
Expand All @@ -670,6 +693,7 @@ func (rc *timingReadCloser) Close() error {
func (rc *timingReadCloser) Read(b []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(b)
rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n))
rc.readBytes += int64(n)
// Report metric just once.
if !rc.alreadyGotErr && err != nil && err != io.EOF {
if !rc.isFailureExpected(err) {
Expand Down
51 changes: 47 additions & 4 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {

testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader(bytes.Repeat([]byte("3"), 1024*1024))))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operations_total Total number of all attempted operations against a bucket.
Expand Down Expand Up @@ -110,17 +110,60 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
`), `objstore_bucket_operations_total`))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operation_fetched_bytes_total Total number of bytes fetched from bucket, per operation.
# HELP objstore_bucket_operation_fetched_bytes_total Total number of bytes fetched from bucket, per operation.
# TYPE objstore_bucket_operation_fetched_bytes_total counter
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="attributes"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="delete"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="exists"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get"} 3
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get"} 1.048578e+06
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="get_range"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="iter"} 0
objstore_bucket_operation_fetched_bytes_total{bucket="",operation="upload"} 0
`), `objstore_bucket_operation_fetched_bytes_total`))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operation_transferred_bytes Number of bytes transferred from/to bucket per operation.
# TYPE objstore_bucket_operation_transferred_bytes histogram
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="32768"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="65536"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="131072"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="262144"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="524288"} 2
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.048576e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="2.097152e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="4.194304e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="8.388608e+06"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.6777216e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="3.3554432e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="6.7108864e+07"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.34217728e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="2.68435456e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="5.36870912e+08"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="1.073741824e+09"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get",le="+Inf"} 3
objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get"} 1.048578e+06
objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get"} 3
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="32768"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="65536"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="131072"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="262144"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="524288"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.048576e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="2.097152e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="4.194304e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="8.388608e+06"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.6777216e+07"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="3.3554432e+07"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="6.7108864e+07"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.34217728e+08"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="2.68435456e+08"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="5.36870912e+08"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="1.073741824e+09"} 0
objstore_bucket_operation_transferred_bytes_bucket{bucket="",operation="get_range",le="+Inf"} 0
objstore_bucket_operation_transferred_bytes_sum{bucket="",operation="get_range"} 0
objstore_bucket_operation_transferred_bytes_count{bucket="",operation="get_range"} 0
`), `objstore_bucket_operation_transferred_bytes`))

testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10)))

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
Expand All @@ -143,7 +186,7 @@ func TestTimingTracingReader(t *testing.T) {
tr := NopCloserWithSize(r)
tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool {
return false
}, m.opsFetchedBytes)
}, m.opsFetchedBytes, m.opsTransferredBytes)

size, err := TryToGetSize(tr)

Expand Down
Loading