From f5a181ec81d804e9f7f2a3f0d268c1d4724b8c59 Mon Sep 17 00:00:00 2001 From: kevinye202 Date: Thu, 21 Nov 2024 10:40:59 -0500 Subject: [PATCH] Emit additional metrics on replicate blobs and refresh blobs (#226) This change extends LocalBlobAccess FlatBlobAccess to provide more detailed metrics on blobs refreshing. Instead of only exposing the object count, it now also has metrics for duration and total amount of data refreshed. Furthermore, it adds a MetricsBlobReplicator, allowing us to determine for backends like MirroredBlobAccess how many requests actually lead to refreshes. --- cmd/bb_copy/main.go | 3 +- cmd/bb_replicator/main.go | 3 +- .../configuration/blob_access_creator.go | 3 - .../configuration/blob_replicator_creator.go | 4 + .../configuration/cas_blob_access_creator.go | 4 - .../cas_blob_replicator_creator.go | 4 + .../configuration/icas_blob_access_creator.go | 4 - .../icas_blob_replicator_creator.go | 4 + .../configuration/new_blob_replicator.go | 20 ++- pkg/blobstore/local/flat_blob_access.go | 77 +++++++-- pkg/blobstore/replication/BUILD.bazel | 3 + .../replication/metrics_blob_replicator.go | 163 ++++++++++++++++++ 12 files changed, 260 insertions(+), 32 deletions(-) create mode 100644 pkg/blobstore/replication/metrics_blob_replicator.go diff --git a/cmd/bb_copy/main.go b/cmd/bb_copy/main.go index 71a01b11..9c0f7ce8 100644 --- a/cmd/bb_copy/main.go +++ b/cmd/bb_copy/main.go @@ -62,7 +62,8 @@ func main() { configuration.Replicator, source.BlobAccess, sink, - blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory)) + blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory), + ) if err != nil { return util.StatusWrap(err, "Failed to create replicator") } diff --git a/cmd/bb_replicator/main.go b/cmd/bb_replicator/main.go index c43cbf8e..bd53eac9 100644 --- a/cmd/bb_replicator/main.go +++ b/cmd/bb_replicator/main.go @@ -53,7 +53,8 @@ func main() { configuration.Replicator, source.BlobAccess, sink, - blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory)) + blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory), + ) if err != nil { return util.StatusWrap(err, "Failed to create replicator") } diff --git a/pkg/blobstore/configuration/blob_access_creator.go b/pkg/blobstore/configuration/blob_access_creator.go index a806cd08..50c83ba9 100644 --- a/pkg/blobstore/configuration/blob_access_creator.go +++ b/pkg/blobstore/configuration/blob_access_creator.go @@ -36,9 +36,6 @@ type BlobAccessCreator interface { // GetReadBufferFactory() returns operations that can be used by // BlobAccess to create Buffer objects to return data. GetReadBufferFactory() blobstore.ReadBufferFactory - // GetStorageTypeName() returns a short string that identifies - // the purpose of this storage (e.g., "ac", "cas"). - GetStorageTypeName() string // GetCapabilitiesProvider() returns a provider of REv2 // ServerCapabilities messages that should be returned for // backends that can't report their own capabilities. This diff --git a/pkg/blobstore/configuration/blob_replicator_creator.go b/pkg/blobstore/configuration/blob_replicator_creator.go index 3c62c6de..c0ca54d5 100644 --- a/pkg/blobstore/configuration/blob_replicator_creator.go +++ b/pkg/blobstore/configuration/blob_replicator_creator.go @@ -16,4 +16,8 @@ type BlobReplicatorCreator interface { // type. For example, sending replication requests over gRPC is // only supported for the Content Addressable Storage. NewCustomBlobReplicator(configuration *pb.BlobReplicatorConfiguration, source blobstore.BlobAccess, sink BlobAccessInfo) (replication.BlobReplicator, error) + + // GetStorageTypeName returns the name of the storage type that + // this BlobReplicatorCreator is able to create BlobReplicators for. + GetStorageTypeName() string } diff --git a/pkg/blobstore/configuration/cas_blob_access_creator.go b/pkg/blobstore/configuration/cas_blob_access_creator.go index bd9814cc..948d0b41 100644 --- a/pkg/blobstore/configuration/cas_blob_access_creator.go +++ b/pkg/blobstore/configuration/cas_blob_access_creator.go @@ -60,10 +60,6 @@ func (bac *casBlobAccessCreator) GetReadBufferFactory() blobstore.ReadBufferFact return blobstore.CASReadBufferFactory } -func (bac *casBlobAccessCreator) GetStorageTypeName() string { - return "cas" -} - func (bac *casBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provider { return casCapabilitiesProvider } diff --git a/pkg/blobstore/configuration/cas_blob_replicator_creator.go b/pkg/blobstore/configuration/cas_blob_replicator_creator.go index 892a5441..a10fae8e 100644 --- a/pkg/blobstore/configuration/cas_blob_replicator_creator.go +++ b/pkg/blobstore/configuration/cas_blob_replicator_creator.go @@ -24,6 +24,10 @@ func NewCASBlobReplicatorCreator(grpcClientFactory grpc.ClientFactory) BlobRepli } } +func (brc *casBlobReplicatorCreator) GetStorageTypeName() string { + return "cas" +} + func (brc *casBlobReplicatorCreator) NewCustomBlobReplicator(configuration *pb.BlobReplicatorConfiguration, source blobstore.BlobAccess, sink BlobAccessInfo) (replication.BlobReplicator, error) { switch mode := configuration.Mode.(type) { case *pb.BlobReplicatorConfiguration_Deduplicating: diff --git a/pkg/blobstore/configuration/icas_blob_access_creator.go b/pkg/blobstore/configuration/icas_blob_access_creator.go index be9bb9df..932d78d5 100644 --- a/pkg/blobstore/configuration/icas_blob_access_creator.go +++ b/pkg/blobstore/configuration/icas_blob_access_creator.go @@ -32,10 +32,6 @@ func (bac *icasBlobAccessCreator) GetReadBufferFactory() blobstore.ReadBufferFac return blobstore.ICASReadBufferFactory } -func (bac *icasBlobAccessCreator) GetStorageTypeName() string { - return "icas" -} - func (bac *icasBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provider { return nil } diff --git a/pkg/blobstore/configuration/icas_blob_replicator_creator.go b/pkg/blobstore/configuration/icas_blob_replicator_creator.go index 7f52f542..42dd97f4 100644 --- a/pkg/blobstore/configuration/icas_blob_replicator_creator.go +++ b/pkg/blobstore/configuration/icas_blob_replicator_creator.go @@ -23,3 +23,7 @@ func (brc icasBlobReplicatorCreator) NewCustomBlobReplicator(configuration *pb.B return nil, status.Error(codes.InvalidArgument, "Configuration did not contain a supported replicator") } } + +func (brc icasBlobReplicatorCreator) GetStorageTypeName() string { + return "icas" +} diff --git a/pkg/blobstore/configuration/new_blob_replicator.go b/pkg/blobstore/configuration/new_blob_replicator.go index 245796b3..af6adbb4 100644 --- a/pkg/blobstore/configuration/new_blob_replicator.go +++ b/pkg/blobstore/configuration/new_blob_replicator.go @@ -3,6 +3,7 @@ package configuration import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/replication" + "github.com/buildbarn/bb-storage/pkg/clock" "github.com/buildbarn/bb-storage/pkg/digest" pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore" @@ -17,20 +18,22 @@ func NewBlobReplicatorFromConfiguration(configuration *pb.BlobReplicatorConfigur if configuration == nil { return nil, status.Error(codes.InvalidArgument, "Replicator configuration not specified") } + storageTypeName := creator.GetStorageTypeName() + var configuredBlobReplicator replication.BlobReplicator switch mode := configuration.Mode.(type) { case *pb.BlobReplicatorConfiguration_ConcurrencyLimiting: base, err := NewBlobReplicatorFromConfiguration(mode.ConcurrencyLimiting.Base, source, sink, creator) if err != nil { return nil, err } - return replication.NewConcurrencyLimitingBlobReplicator( + configuredBlobReplicator = replication.NewConcurrencyLimitingBlobReplicator( base, sink.BlobAccess, - semaphore.NewWeighted(mode.ConcurrencyLimiting.MaximumConcurrency)), nil + semaphore.NewWeighted(mode.ConcurrencyLimiting.MaximumConcurrency)) case *pb.BlobReplicatorConfiguration_Local: - return replication.NewLocalBlobReplicator(source, sink.BlobAccess), nil + configuredBlobReplicator = replication.NewLocalBlobReplicator(source, sink.BlobAccess) case *pb.BlobReplicatorConfiguration_Noop: - return replication.NewNoopBlobReplicator(source), nil + configuredBlobReplicator = replication.NewNoopBlobReplicator(source) case *pb.BlobReplicatorConfiguration_Queued: base, err := NewBlobReplicatorFromConfiguration(mode.Queued.Base, source, sink, creator) if err != nil { @@ -40,8 +43,13 @@ func NewBlobReplicatorFromConfiguration(configuration *pb.BlobReplicatorConfigur if err != nil { return nil, err } - return replication.NewQueuedBlobReplicator(source, base, existenceCache), nil + configuredBlobReplicator = replication.NewQueuedBlobReplicator(source, base, existenceCache) default: - return creator.NewCustomBlobReplicator(configuration, source, sink) + var err error + configuredBlobReplicator, err = creator.NewCustomBlobReplicator(configuration, source, sink) + if err != nil { + return nil, err + } } + return replication.NewMetricsBlobReplicator(configuredBlobReplicator, clock.SystemClock, storageTypeName), nil } diff --git a/pkg/blobstore/local/flat_blob_access.go b/pkg/blobstore/local/flat_blob_access.go index 3648ea88..7c3d7257 100644 --- a/pkg/blobstore/local/flat_blob_access.go +++ b/pkg/blobstore/local/flat_blob_access.go @@ -3,6 +3,7 @@ package local import ( "context" "sync" + "time" "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" @@ -19,15 +20,35 @@ import ( var ( flatBlobAccessPrometheusMetrics sync.Once - flatBlobAccessRefreshes = prometheus.NewHistogramVec( + flatBlobAccessRefreshesBlobs = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "blobstore", - Name: "flat_blob_access_refreshes", + Name: "flat_blob_access_refreshes_blobs", Help: "The number of blobs that were refreshed when requested", Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...), }, []string{"storage_type", "operation"}) + + flatBlobAccessRefreshesDurationSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "flat_blob_access_refreshes_duration_seconds", + Help: "Time spent refreshing blobs in seconds", + Buckets: util.DecimalExponentialBuckets(-3, 6, 2), + }, + []string{"storage_type", "operation"}) + + flatBlobAccessRefreshesSizeBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "flat_blob_access_refreshes_size_bytes", + Help: "Size of blobs being refreshed in bytes", + Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 33), + }, + []string{"storage_type", "operation"}) ) type flatBlobAccess struct { @@ -40,9 +61,16 @@ type flatBlobAccess struct { lock *sync.RWMutex refreshLock sync.Mutex - refreshesGet prometheus.Observer - refreshesGetFromComposite prometheus.Observer - refreshesFindMissing prometheus.Observer + refreshesBlobsGet prometheus.Observer + refreshesBlobsGetFromComposite prometheus.Observer + refreshesBlobsFindMissing prometheus.Observer + + refreshesBlobsDurationGet prometheus.Observer + refreshesBlobsDurationGetFromComposite prometheus.Observer + refreshesBlobsDurationFindMissing prometheus.Observer + refreshesBlobsSizeGet prometheus.Observer + refreshesBlbosSizeGetFromComposite prometheus.Observer + refreshesBlobsSizeFindMissing prometheus.Observer } // NewFlatBlobAccess creates a BlobAccess that forwards all calls to @@ -53,7 +81,9 @@ type flatBlobAccess struct { // any hierarchy. func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBlobMap, digestKeyFormat digest.KeyFormat, lock *sync.RWMutex, storageType string, capabilitiesProvider capabilities.Provider) blobstore.BlobAccess { flatBlobAccessPrometheusMetrics.Do(func() { - prometheus.MustRegister(flatBlobAccessRefreshes) + prometheus.MustRegister(flatBlobAccessRefreshesBlobs) + prometheus.MustRegister(flatBlobAccessRefreshesDurationSeconds) + prometheus.MustRegister(flatBlobAccessRefreshesSizeBytes) }) return &flatBlobAccess{ @@ -64,9 +94,16 @@ func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBl digestKeyFormat: digestKeyFormat, lock: lock, - refreshesGet: flatBlobAccessRefreshes.WithLabelValues(storageType, "Get"), - refreshesGetFromComposite: flatBlobAccessRefreshes.WithLabelValues(storageType, "GetFromComposite"), - refreshesFindMissing: flatBlobAccessRefreshes.WithLabelValues(storageType, "FindMissing"), + refreshesBlobsGet: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "Get"), + refreshesBlobsGetFromComposite: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "GetFromComposite"), + refreshesBlobsFindMissing: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "FindMissing"), + + refreshesBlobsDurationGet: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "Get"), + refreshesBlobsDurationGetFromComposite: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "GetFromComposite"), + refreshesBlobsDurationFindMissing: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "FindMissing"), + refreshesBlobsSizeGet: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "Get"), + refreshesBlbosSizeGetFromComposite: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "GetFromComposite"), + refreshesBlobsSizeFindMissing: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "FindMissing"), } } @@ -113,6 +150,8 @@ func (ba *flatBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buf // TODO: Instead of copying data on the fly, should this be done // immediately, so that we can prevent potential duplication by // picking up the refresh lock? + refreshStart := time.Now() + ba.lock.Lock() location, err = ba.keyLocationMap.Get(key) if err != nil { @@ -144,7 +183,9 @@ func (ba *flatBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buf ba.lock.Lock() _, err := ba.finalizePut(putFinalizer, key) if err == nil { - ba.refreshesGet.Observe(1) + ba.refreshesBlobsGet.Observe(1) + ba.refreshesBlobsSizeGet.Observe(float64(location.SizeBytes)) + ba.refreshesBlobsDurationGet.Observe(time.Since(refreshStart).Seconds()) } ba.lock.Unlock() if err != nil { @@ -201,6 +242,8 @@ func (ba *flatBlobAccess) GetFromComposite(ctx context.Context, parentDigest, ch var bParentSlicing buffer.Buffer var putFinalizer LocationBlobPutFinalizer parentGetter, needsRefresh := ba.locationBlobMap.Get(parentLocation) + // Add refresh start time + refreshStart := time.Now() if needsRefresh { // The parent object needs to be refreshed and sliced. bParent := parentGetter(parentDigest) @@ -246,12 +289,15 @@ func (ba *flatBlobAccess) GetFromComposite(ctx context.Context, parentDigest, ch ba.lock.Lock() if needsRefresh { parentLocation, err = ba.finalizePut(putFinalizer, parentKey) + // Add size metric before refresh + ba.refreshesBlbosSizeGetFromComposite.Observe(float64(parentLocation.SizeBytes)) if err != nil { ba.lock.Unlock() bChild.Discard() return buffer.NewBufferFromError(util.StatusWrap(err, "Failed to refresh blob")) } - ba.refreshesGetFromComposite.Observe(1) + ba.refreshesBlobsDurationGetFromComposite.Observe(time.Since(refreshStart).Seconds()) + ba.refreshesBlobsGetFromComposite.Observe(1) } // Create key-location map entries for each of the slices. This @@ -351,8 +397,10 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) ( // one thread. ba.refreshLock.Lock() defer ba.refreshLock.Unlock() - + // Add refresh start time before the refresh loop + refreshStart := time.Now() blobsRefreshedSuccessfully := 0 + var blobRefreshSizeBytes int64 ba.lock.Lock() for _, blobToRefresh := range blobsToRefresh { if location, err := ba.keyLocationMap.Get(blobToRefresh.key); err == nil { @@ -361,6 +409,7 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) ( // Blob is present and still needs to be // refreshed. Allocate space for a copy. b := getter(blobToRefresh.digest) + blobRefreshSizeBytes += location.SizeBytes putWriter, err := ba.locationBlobMap.Put(location.SizeBytes) ba.lock.Unlock() if err != nil { @@ -390,6 +439,8 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) ( } } ba.lock.Unlock() - ba.refreshesFindMissing.Observe(float64(blobsRefreshedSuccessfully)) + ba.refreshesBlobsFindMissing.Observe(float64(blobsRefreshedSuccessfully)) + ba.refreshesBlobsDurationFindMissing.Observe(time.Since(refreshStart).Seconds()) + ba.refreshesBlobsSizeFindMissing.Observe(float64(blobRefreshSizeBytes)) return missing.Build(), nil } diff --git a/pkg/blobstore/replication/BUILD.bazel b/pkg/blobstore/replication/BUILD.bazel index 0c72710a..71c09025 100644 --- a/pkg/blobstore/replication/BUILD.bazel +++ b/pkg/blobstore/replication/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "concurrency_limiting_blob_replicator.go", "deduplicating_blob_replicator.go", "local_blob_replicator.go", + "metrics_blob_replicator.go", "nested_blob_replicator.go", "noop_blob_replicator.go", "queued_blob_replicator.go", @@ -20,10 +21,12 @@ go_library( "//pkg/blobstore", "//pkg/blobstore/buffer", "//pkg/blobstore/slicing", + "//pkg/clock", "//pkg/digest", "//pkg/proto/replicator", "//pkg/util", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", + "@com_github_prometheus_client_golang//prometheus", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/pkg/blobstore/replication/metrics_blob_replicator.go b/pkg/blobstore/replication/metrics_blob_replicator.go new file mode 100644 index 00000000..d54c5030 --- /dev/null +++ b/pkg/blobstore/replication/metrics_blob_replicator.go @@ -0,0 +1,163 @@ +package replication + +import ( + "context" + "sync" + "time" + + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/clock" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/util" + "github.com/prometheus/client_golang/prometheus" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + replicatorOperationsPrometheusMetrics sync.Once + + blobReplicatorOperationsDurationSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "blob_replicator_operations_duration_seconds", + Help: "Amount of time spent per operation on blob replicator, in seconds.", + Buckets: util.DecimalExponentialBuckets(-3, 6, 2), + }, + []string{"operation"}) + + blobReplicatorOperationsBlobSizeBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "blob_replicator_operations_blob_size_bytes", + Help: "Size of blobs being replicated, in bytes.", + Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 33), + }, + []string{"operation"}) + + blobReplicatorOperationsBatchSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "blob_replicator_operations_batch_size", + Help: "Number of blobs in batch replication requests.", + Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 17), + }, + []string{"operation"}) +) + +type metricsBlobReplicator struct { + replicator BlobReplicator + clock clock.Clock + source string + destination string + + singleDurationSeconds prometheus.ObserverVec + singleBlobSizeBytes prometheus.Observer + compositeDurationSeconds prometheus.ObserverVec + compositeBlobSizeBytes prometheus.Observer + multipleDurationSeconds prometheus.ObserverVec + multipleBatchSize prometheus.Observer + multipleBlobSizeBytes prometheus.Observer +} + +// NewMetricsBlobReplicator creates a wrapper around BlobReplicator that adds +// Prometheus metrics for monitoring replication operations. +func NewMetricsBlobReplicator(replicator BlobReplicator, clock clock.Clock, storageTypeName string) BlobReplicator { + replicatorOperationsPrometheusMetrics.Do(func() { + prometheus.MustRegister(blobReplicatorOperationsDurationSeconds) + prometheus.MustRegister(blobReplicatorOperationsBlobSizeBytes) + prometheus.MustRegister(blobReplicatorOperationsBatchSize) + }) + + return &metricsBlobReplicator{ + replicator: replicator, + clock: clock, + singleDurationSeconds: blobReplicatorOperationsDurationSeconds.MustCurryWith(map[string]string{ + "operation": "ReplicateSingle", + "storage": storageTypeName, + }), + singleBlobSizeBytes: blobReplicatorOperationsBlobSizeBytes.WithLabelValues("ReplicateSingle", storageTypeName), + compositeDurationSeconds: blobReplicatorOperationsDurationSeconds.MustCurryWith(map[string]string{ + "operation": "ReplicateComposite", + "storage": storageTypeName, + }), + compositeBlobSizeBytes: blobReplicatorOperationsBlobSizeBytes.WithLabelValues("ReplicateComposite", storageTypeName), + multipleDurationSeconds: blobReplicatorOperationsDurationSeconds.MustCurryWith(map[string]string{ + "operation": "ReplicateMultiple", + "storage": storageTypeName, + }), + multipleBlobSizeBytes: blobReplicatorOperationsBlobSizeBytes.WithLabelValues("ReplicateMultiple", storageTypeName), + multipleBatchSize: blobReplicatorOperationsBatchSize.WithLabelValues("ReplicateMultiple", storageTypeName), + } +} + +func (r *metricsBlobReplicator) updateDurationSeconds(vec prometheus.ObserverVec, code codes.Code, timeStart time.Time) { + vec.WithLabelValues(code.String()).Observe(r.clock.Now().Sub(timeStart).Seconds()) +} + +func (r *metricsBlobReplicator) ReplicateSingle(ctx context.Context, blobDigest digest.Digest) buffer.Buffer { + timeStart := r.clock.Now() + b := buffer.WithErrorHandler( + r.replicator.ReplicateSingle(ctx, blobDigest), + &metricsErrorHandler{ + replicator: r, + timeStart: timeStart, + errorCode: codes.OK, + durationSeconds: r.singleDurationSeconds, + }) + if sizeBytes, err := b.GetSizeBytes(); err == nil { + r.singleBlobSizeBytes.Observe(float64(sizeBytes)) + } + return b +} + +func (r *metricsBlobReplicator) ReplicateComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer { + timeStart := r.clock.Now() + b := buffer.WithErrorHandler( + r.replicator.ReplicateComposite(ctx, parentDigest, childDigest, slicer), + &metricsErrorHandler{ + replicator: r, + timeStart: timeStart, + errorCode: codes.OK, + durationSeconds: r.compositeDurationSeconds, + }) + if sizeBytes, err := b.GetSizeBytes(); err == nil { + r.compositeBlobSizeBytes.Observe(float64(sizeBytes)) + } + return b +} + +func (r *metricsBlobReplicator) ReplicateMultiple(ctx context.Context, digests digest.Set) error { + if digests.Empty() { + return nil + } + + timeStart := r.clock.Now() + r.multipleBatchSize.Observe(float64(digests.Length())) + // TODO: Add blob size metrics. + + err := r.replicator.ReplicateMultiple(ctx, digests) + r.updateDurationSeconds(r.multipleDurationSeconds, status.Code(err), timeStart) + return err +} + +type metricsErrorHandler struct { + replicator *metricsBlobReplicator + timeStart time.Time + errorCode codes.Code + durationSeconds prometheus.ObserverVec +} + +func (eh *metricsErrorHandler) OnError(err error) (buffer.Buffer, error) { + eh.errorCode = status.Code(err) + return nil, err +} + +func (eh *metricsErrorHandler) Done() { + eh.replicator.updateDurationSeconds(eh.durationSeconds, eh.errorCode, eh.timeStart) +}