diff --git a/cmd/bb_copy/main.go b/cmd/bb_copy/main.go index 71a01b11..9c0f7ce8 100644 --- a/cmd/bb_copy/main.go +++ b/cmd/bb_copy/main.go @@ -62,7 +62,8 @@ func main() { configuration.Replicator, source.BlobAccess, sink, - blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory)) + blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory), + ) if err != nil { return util.StatusWrap(err, "Failed to create replicator") } diff --git a/cmd/bb_replicator/main.go b/cmd/bb_replicator/main.go index c43cbf8e..bd53eac9 100644 --- a/cmd/bb_replicator/main.go +++ b/cmd/bb_replicator/main.go @@ -53,7 +53,8 @@ func main() { configuration.Replicator, source.BlobAccess, sink, - blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory)) + blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory), + ) if err != nil { return util.StatusWrap(err, "Failed to create replicator") } diff --git a/pkg/blobstore/configuration/blob_access_creator.go b/pkg/blobstore/configuration/blob_access_creator.go index a806cd08..50c83ba9 100644 --- a/pkg/blobstore/configuration/blob_access_creator.go +++ b/pkg/blobstore/configuration/blob_access_creator.go @@ -36,9 +36,6 @@ type BlobAccessCreator interface { // GetReadBufferFactory() returns operations that can be used by // BlobAccess to create Buffer objects to return data. GetReadBufferFactory() blobstore.ReadBufferFactory - // GetStorageTypeName() returns a short string that identifies - // the purpose of this storage (e.g., "ac", "cas"). - GetStorageTypeName() string // GetCapabilitiesProvider() returns a provider of REv2 // ServerCapabilities messages that should be returned for // backends that can't report their own capabilities. This diff --git a/pkg/blobstore/configuration/blob_replicator_creator.go b/pkg/blobstore/configuration/blob_replicator_creator.go index 3c62c6de..c0ca54d5 100644 --- a/pkg/blobstore/configuration/blob_replicator_creator.go +++ b/pkg/blobstore/configuration/blob_replicator_creator.go @@ -16,4 +16,8 @@ type BlobReplicatorCreator interface { // type. For example, sending replication requests over gRPC is // only supported for the Content Addressable Storage. NewCustomBlobReplicator(configuration *pb.BlobReplicatorConfiguration, source blobstore.BlobAccess, sink BlobAccessInfo) (replication.BlobReplicator, error) + + // GetStorageTypeName returns the name of the storage type that + // this BlobReplicatorCreator is able to create BlobReplicators for. + GetStorageTypeName() string } diff --git a/pkg/blobstore/configuration/cas_blob_access_creator.go b/pkg/blobstore/configuration/cas_blob_access_creator.go index bd9814cc..948d0b41 100644 --- a/pkg/blobstore/configuration/cas_blob_access_creator.go +++ b/pkg/blobstore/configuration/cas_blob_access_creator.go @@ -60,10 +60,6 @@ func (bac *casBlobAccessCreator) GetReadBufferFactory() blobstore.ReadBufferFact return blobstore.CASReadBufferFactory } -func (bac *casBlobAccessCreator) GetStorageTypeName() string { - return "cas" -} - func (bac *casBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provider { return casCapabilitiesProvider } diff --git a/pkg/blobstore/configuration/cas_blob_replicator_creator.go b/pkg/blobstore/configuration/cas_blob_replicator_creator.go index 892a5441..a10fae8e 100644 --- a/pkg/blobstore/configuration/cas_blob_replicator_creator.go +++ b/pkg/blobstore/configuration/cas_blob_replicator_creator.go @@ -24,6 +24,10 @@ func NewCASBlobReplicatorCreator(grpcClientFactory grpc.ClientFactory) BlobRepli } } +func (brc *casBlobReplicatorCreator) GetStorageTypeName() string { + return "cas" +} + func (brc *casBlobReplicatorCreator) NewCustomBlobReplicator(configuration *pb.BlobReplicatorConfiguration, source blobstore.BlobAccess, sink BlobAccessInfo) (replication.BlobReplicator, error) { switch mode := configuration.Mode.(type) { case *pb.BlobReplicatorConfiguration_Deduplicating: diff --git a/pkg/blobstore/configuration/icas_blob_access_creator.go b/pkg/blobstore/configuration/icas_blob_access_creator.go index be9bb9df..932d78d5 100644 --- a/pkg/blobstore/configuration/icas_blob_access_creator.go +++ b/pkg/blobstore/configuration/icas_blob_access_creator.go @@ -32,10 +32,6 @@ func (bac *icasBlobAccessCreator) GetReadBufferFactory() blobstore.ReadBufferFac return blobstore.ICASReadBufferFactory } -func (bac *icasBlobAccessCreator) GetStorageTypeName() string { - return "icas" -} - func (bac *icasBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provider { return nil } diff --git a/pkg/blobstore/configuration/icas_blob_replicator_creator.go b/pkg/blobstore/configuration/icas_blob_replicator_creator.go index 7f52f542..42dd97f4 100644 --- a/pkg/blobstore/configuration/icas_blob_replicator_creator.go +++ b/pkg/blobstore/configuration/icas_blob_replicator_creator.go @@ -23,3 +23,7 @@ func (brc icasBlobReplicatorCreator) NewCustomBlobReplicator(configuration *pb.B return nil, status.Error(codes.InvalidArgument, "Configuration did not contain a supported replicator") } } + +func (brc icasBlobReplicatorCreator) GetStorageTypeName() string { + return "icas" +} diff --git a/pkg/blobstore/configuration/new_blob_replicator.go b/pkg/blobstore/configuration/new_blob_replicator.go index 245796b3..af6adbb4 100644 --- a/pkg/blobstore/configuration/new_blob_replicator.go +++ b/pkg/blobstore/configuration/new_blob_replicator.go @@ -3,6 +3,7 @@ package configuration import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/replication" + "github.com/buildbarn/bb-storage/pkg/clock" "github.com/buildbarn/bb-storage/pkg/digest" pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore" @@ -17,20 +18,22 @@ func NewBlobReplicatorFromConfiguration(configuration *pb.BlobReplicatorConfigur if configuration == nil { return nil, status.Error(codes.InvalidArgument, "Replicator configuration not specified") } + storageTypeName := creator.GetStorageTypeName() + var configuredBlobReplicator replication.BlobReplicator switch mode := configuration.Mode.(type) { case *pb.BlobReplicatorConfiguration_ConcurrencyLimiting: base, err := NewBlobReplicatorFromConfiguration(mode.ConcurrencyLimiting.Base, source, sink, creator) if err != nil { return nil, err } - return replication.NewConcurrencyLimitingBlobReplicator( + configuredBlobReplicator = replication.NewConcurrencyLimitingBlobReplicator( base, sink.BlobAccess, - semaphore.NewWeighted(mode.ConcurrencyLimiting.MaximumConcurrency)), nil + semaphore.NewWeighted(mode.ConcurrencyLimiting.MaximumConcurrency)) case *pb.BlobReplicatorConfiguration_Local: - return replication.NewLocalBlobReplicator(source, sink.BlobAccess), nil + configuredBlobReplicator = replication.NewLocalBlobReplicator(source, sink.BlobAccess) case *pb.BlobReplicatorConfiguration_Noop: - return replication.NewNoopBlobReplicator(source), nil + configuredBlobReplicator = replication.NewNoopBlobReplicator(source) case *pb.BlobReplicatorConfiguration_Queued: base, err := NewBlobReplicatorFromConfiguration(mode.Queued.Base, source, sink, creator) if err != nil { @@ -40,8 +43,13 @@ func NewBlobReplicatorFromConfiguration(configuration *pb.BlobReplicatorConfigur if err != nil { return nil, err } - return replication.NewQueuedBlobReplicator(source, base, existenceCache), nil + configuredBlobReplicator = replication.NewQueuedBlobReplicator(source, base, existenceCache) default: - return creator.NewCustomBlobReplicator(configuration, source, sink) + var err error + configuredBlobReplicator, err = creator.NewCustomBlobReplicator(configuration, source, sink) + if err != nil { + return nil, err + } } + return replication.NewMetricsBlobReplicator(configuredBlobReplicator, clock.SystemClock, storageTypeName), nil } diff --git a/pkg/blobstore/local/flat_blob_access.go b/pkg/blobstore/local/flat_blob_access.go index 3648ea88..7c3d7257 100644 --- a/pkg/blobstore/local/flat_blob_access.go +++ b/pkg/blobstore/local/flat_blob_access.go @@ -3,6 +3,7 @@ package local import ( "context" "sync" + "time" "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" @@ -19,15 +20,35 @@ import ( var ( flatBlobAccessPrometheusMetrics sync.Once - flatBlobAccessRefreshes = prometheus.NewHistogramVec( + flatBlobAccessRefreshesBlobs = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "blobstore", - Name: "flat_blob_access_refreshes", + Name: "flat_blob_access_refreshes_blobs", Help: "The number of blobs that were refreshed when requested", Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...), }, []string{"storage_type", "operation"}) + + flatBlobAccessRefreshesDurationSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "flat_blob_access_refreshes_duration_seconds", + Help: "Time spent refreshing blobs in seconds", + Buckets: util.DecimalExponentialBuckets(-3, 6, 2), + }, + []string{"storage_type", "operation"}) + + flatBlobAccessRefreshesSizeBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "flat_blob_access_refreshes_size_bytes", + Help: "Size of blobs being refreshed in bytes", + Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 33), + }, + []string{"storage_type", "operation"}) ) type flatBlobAccess struct { @@ -40,9 +61,16 @@ type flatBlobAccess struct { lock *sync.RWMutex refreshLock sync.Mutex - refreshesGet prometheus.Observer - refreshesGetFromComposite prometheus.Observer - refreshesFindMissing prometheus.Observer + refreshesBlobsGet prometheus.Observer + refreshesBlobsGetFromComposite prometheus.Observer + refreshesBlobsFindMissing prometheus.Observer + + refreshesBlobsDurationGet prometheus.Observer + refreshesBlobsDurationGetFromComposite prometheus.Observer + refreshesBlobsDurationFindMissing prometheus.Observer + refreshesBlobsSizeGet prometheus.Observer + refreshesBlbosSizeGetFromComposite prometheus.Observer + refreshesBlobsSizeFindMissing prometheus.Observer } // NewFlatBlobAccess creates a BlobAccess that forwards all calls to @@ -53,7 +81,9 @@ type flatBlobAccess struct { // any hierarchy. func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBlobMap, digestKeyFormat digest.KeyFormat, lock *sync.RWMutex, storageType string, capabilitiesProvider capabilities.Provider) blobstore.BlobAccess { flatBlobAccessPrometheusMetrics.Do(func() { - prometheus.MustRegister(flatBlobAccessRefreshes) + prometheus.MustRegister(flatBlobAccessRefreshesBlobs) + prometheus.MustRegister(flatBlobAccessRefreshesDurationSeconds) + prometheus.MustRegister(flatBlobAccessRefreshesSizeBytes) }) return &flatBlobAccess{ @@ -64,9 +94,16 @@ func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBl digestKeyFormat: digestKeyFormat, lock: lock, - refreshesGet: flatBlobAccessRefreshes.WithLabelValues(storageType, "Get"), - refreshesGetFromComposite: flatBlobAccessRefreshes.WithLabelValues(storageType, "GetFromComposite"), - refreshesFindMissing: flatBlobAccessRefreshes.WithLabelValues(storageType, "FindMissing"), + refreshesBlobsGet: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "Get"), + refreshesBlobsGetFromComposite: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "GetFromComposite"), + refreshesBlobsFindMissing: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "FindMissing"), + + refreshesBlobsDurationGet: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "Get"), + refreshesBlobsDurationGetFromComposite: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "GetFromComposite"), + refreshesBlobsDurationFindMissing: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "FindMissing"), + refreshesBlobsSizeGet: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "Get"), + refreshesBlbosSizeGetFromComposite: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "GetFromComposite"), + refreshesBlobsSizeFindMissing: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "FindMissing"), } } @@ -113,6 +150,8 @@ func (ba *flatBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buf // TODO: Instead of copying data on the fly, should this be done // immediately, so that we can prevent potential duplication by // picking up the refresh lock? + refreshStart := time.Now() + ba.lock.Lock() location, err = ba.keyLocationMap.Get(key) if err != nil { @@ -144,7 +183,9 @@ func (ba *flatBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buf ba.lock.Lock() _, err := ba.finalizePut(putFinalizer, key) if err == nil { - ba.refreshesGet.Observe(1) + ba.refreshesBlobsGet.Observe(1) + ba.refreshesBlobsSizeGet.Observe(float64(location.SizeBytes)) + ba.refreshesBlobsDurationGet.Observe(time.Since(refreshStart).Seconds()) } ba.lock.Unlock() if err != nil { @@ -201,6 +242,8 @@ func (ba *flatBlobAccess) GetFromComposite(ctx context.Context, parentDigest, ch var bParentSlicing buffer.Buffer var putFinalizer LocationBlobPutFinalizer parentGetter, needsRefresh := ba.locationBlobMap.Get(parentLocation) + // Add refresh start time + refreshStart := time.Now() if needsRefresh { // The parent object needs to be refreshed and sliced. bParent := parentGetter(parentDigest) @@ -246,12 +289,15 @@ func (ba *flatBlobAccess) GetFromComposite(ctx context.Context, parentDigest, ch ba.lock.Lock() if needsRefresh { parentLocation, err = ba.finalizePut(putFinalizer, parentKey) + // Add size metric before refresh + ba.refreshesBlbosSizeGetFromComposite.Observe(float64(parentLocation.SizeBytes)) if err != nil { ba.lock.Unlock() bChild.Discard() return buffer.NewBufferFromError(util.StatusWrap(err, "Failed to refresh blob")) } - ba.refreshesGetFromComposite.Observe(1) + ba.refreshesBlobsDurationGetFromComposite.Observe(time.Since(refreshStart).Seconds()) + ba.refreshesBlobsGetFromComposite.Observe(1) } // Create key-location map entries for each of the slices. This @@ -351,8 +397,10 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) ( // one thread. ba.refreshLock.Lock() defer ba.refreshLock.Unlock() - + // Add refresh start time before the refresh loop + refreshStart := time.Now() blobsRefreshedSuccessfully := 0 + var blobRefreshSizeBytes int64 ba.lock.Lock() for _, blobToRefresh := range blobsToRefresh { if location, err := ba.keyLocationMap.Get(blobToRefresh.key); err == nil { @@ -361,6 +409,7 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) ( // Blob is present and still needs to be // refreshed. Allocate space for a copy. b := getter(blobToRefresh.digest) + blobRefreshSizeBytes += location.SizeBytes putWriter, err := ba.locationBlobMap.Put(location.SizeBytes) ba.lock.Unlock() if err != nil { @@ -390,6 +439,8 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) ( } } ba.lock.Unlock() - ba.refreshesFindMissing.Observe(float64(blobsRefreshedSuccessfully)) + ba.refreshesBlobsFindMissing.Observe(float64(blobsRefreshedSuccessfully)) + ba.refreshesBlobsDurationFindMissing.Observe(time.Since(refreshStart).Seconds()) + ba.refreshesBlobsSizeFindMissing.Observe(float64(blobRefreshSizeBytes)) return missing.Build(), nil } diff --git a/pkg/blobstore/replication/BUILD.bazel b/pkg/blobstore/replication/BUILD.bazel index 0c72710a..71c09025 100644 --- a/pkg/blobstore/replication/BUILD.bazel +++ b/pkg/blobstore/replication/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "concurrency_limiting_blob_replicator.go", "deduplicating_blob_replicator.go", "local_blob_replicator.go", + "metrics_blob_replicator.go", "nested_blob_replicator.go", "noop_blob_replicator.go", "queued_blob_replicator.go", @@ -20,10 +21,12 @@ go_library( "//pkg/blobstore", "//pkg/blobstore/buffer", "//pkg/blobstore/slicing", + "//pkg/clock", "//pkg/digest", "//pkg/proto/replicator", "//pkg/util", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", + "@com_github_prometheus_client_golang//prometheus", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/pkg/blobstore/replication/metrics_blob_replicator.go b/pkg/blobstore/replication/metrics_blob_replicator.go new file mode 100644 index 00000000..d54c5030 --- /dev/null +++ b/pkg/blobstore/replication/metrics_blob_replicator.go @@ -0,0 +1,163 @@ +package replication + +import ( + "context" + "sync" + "time" + + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/clock" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/util" + "github.com/prometheus/client_golang/prometheus" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + replicatorOperationsPrometheusMetrics sync.Once + + blobReplicatorOperationsDurationSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "blob_replicator_operations_duration_seconds", + Help: "Amount of time spent per operation on blob replicator, in seconds.", + Buckets: util.DecimalExponentialBuckets(-3, 6, 2), + }, + []string{"operation"}) + + blobReplicatorOperationsBlobSizeBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "blob_replicator_operations_blob_size_bytes", + Help: "Size of blobs being replicated, in bytes.", + Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 33), + }, + []string{"operation"}) + + blobReplicatorOperationsBatchSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "buildbarn", + Subsystem: "blobstore", + Name: "blob_replicator_operations_batch_size", + Help: "Number of blobs in batch replication requests.", + Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 17), + }, + []string{"operation"}) +) + +type metricsBlobReplicator struct { + replicator BlobReplicator + clock clock.Clock + source string + destination string + + singleDurationSeconds prometheus.ObserverVec + singleBlobSizeBytes prometheus.Observer + compositeDurationSeconds prometheus.ObserverVec + compositeBlobSizeBytes prometheus.Observer + multipleDurationSeconds prometheus.ObserverVec + multipleBatchSize prometheus.Observer + multipleBlobSizeBytes prometheus.Observer +} + +// NewMetricsBlobReplicator creates a wrapper around BlobReplicator that adds +// Prometheus metrics for monitoring replication operations. +func NewMetricsBlobReplicator(replicator BlobReplicator, clock clock.Clock, storageTypeName string) BlobReplicator { + replicatorOperationsPrometheusMetrics.Do(func() { + prometheus.MustRegister(blobReplicatorOperationsDurationSeconds) + prometheus.MustRegister(blobReplicatorOperationsBlobSizeBytes) + prometheus.MustRegister(blobReplicatorOperationsBatchSize) + }) + + return &metricsBlobReplicator{ + replicator: replicator, + clock: clock, + singleDurationSeconds: blobReplicatorOperationsDurationSeconds.MustCurryWith(map[string]string{ + "operation": "ReplicateSingle", + "storage": storageTypeName, + }), + singleBlobSizeBytes: blobReplicatorOperationsBlobSizeBytes.WithLabelValues("ReplicateSingle", storageTypeName), + compositeDurationSeconds: blobReplicatorOperationsDurationSeconds.MustCurryWith(map[string]string{ + "operation": "ReplicateComposite", + "storage": storageTypeName, + }), + compositeBlobSizeBytes: blobReplicatorOperationsBlobSizeBytes.WithLabelValues("ReplicateComposite", storageTypeName), + multipleDurationSeconds: blobReplicatorOperationsDurationSeconds.MustCurryWith(map[string]string{ + "operation": "ReplicateMultiple", + "storage": storageTypeName, + }), + multipleBlobSizeBytes: blobReplicatorOperationsBlobSizeBytes.WithLabelValues("ReplicateMultiple", storageTypeName), + multipleBatchSize: blobReplicatorOperationsBatchSize.WithLabelValues("ReplicateMultiple", storageTypeName), + } +} + +func (r *metricsBlobReplicator) updateDurationSeconds(vec prometheus.ObserverVec, code codes.Code, timeStart time.Time) { + vec.WithLabelValues(code.String()).Observe(r.clock.Now().Sub(timeStart).Seconds()) +} + +func (r *metricsBlobReplicator) ReplicateSingle(ctx context.Context, blobDigest digest.Digest) buffer.Buffer { + timeStart := r.clock.Now() + b := buffer.WithErrorHandler( + r.replicator.ReplicateSingle(ctx, blobDigest), + &metricsErrorHandler{ + replicator: r, + timeStart: timeStart, + errorCode: codes.OK, + durationSeconds: r.singleDurationSeconds, + }) + if sizeBytes, err := b.GetSizeBytes(); err == nil { + r.singleBlobSizeBytes.Observe(float64(sizeBytes)) + } + return b +} + +func (r *metricsBlobReplicator) ReplicateComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer { + timeStart := r.clock.Now() + b := buffer.WithErrorHandler( + r.replicator.ReplicateComposite(ctx, parentDigest, childDigest, slicer), + &metricsErrorHandler{ + replicator: r, + timeStart: timeStart, + errorCode: codes.OK, + durationSeconds: r.compositeDurationSeconds, + }) + if sizeBytes, err := b.GetSizeBytes(); err == nil { + r.compositeBlobSizeBytes.Observe(float64(sizeBytes)) + } + return b +} + +func (r *metricsBlobReplicator) ReplicateMultiple(ctx context.Context, digests digest.Set) error { + if digests.Empty() { + return nil + } + + timeStart := r.clock.Now() + r.multipleBatchSize.Observe(float64(digests.Length())) + // TODO: Add blob size metrics. + + err := r.replicator.ReplicateMultiple(ctx, digests) + r.updateDurationSeconds(r.multipleDurationSeconds, status.Code(err), timeStart) + return err +} + +type metricsErrorHandler struct { + replicator *metricsBlobReplicator + timeStart time.Time + errorCode codes.Code + durationSeconds prometheus.ObserverVec +} + +func (eh *metricsErrorHandler) OnError(err error) (buffer.Buffer, error) { + eh.errorCode = status.Code(err) + return nil, err +} + +func (eh *metricsErrorHandler) Done() { + eh.replicator.updateDurationSeconds(eh.durationSeconds, eh.errorCode, eh.timeStart) +}