Skip to content

Commit

Permalink
Emit additional metrics on replicate blobs and refresh blobs (#226)
Browse files Browse the repository at this point in the history
This change extends LocalBlobAccess FlatBlobAccess to provide more detailed metrics on blobs refreshing. Instead of only exposing the object count, it now also has metrics for duration and total amount of data refreshed.

Furthermore, it adds a MetricsBlobReplicator, allowing us to determine for backends like MirroredBlobAccess how many requests actually lead to refreshes.
  • Loading branch information
kevinye202 authored Nov 21, 2024
1 parent 7c63b92 commit f5a181e
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 32 deletions.
3 changes: 2 additions & 1 deletion cmd/bb_copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func main() {
configuration.Replicator,
source.BlobAccess,
sink,
blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory))
blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory),
)
if err != nil {
return util.StatusWrap(err, "Failed to create replicator")
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/bb_replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func main() {
configuration.Replicator,
source.BlobAccess,
sink,
blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory))
blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory),
)
if err != nil {
return util.StatusWrap(err, "Failed to create replicator")
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/blobstore/configuration/blob_access_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ type BlobAccessCreator interface {
// GetReadBufferFactory() returns operations that can be used by
// BlobAccess to create Buffer objects to return data.
GetReadBufferFactory() blobstore.ReadBufferFactory
// GetStorageTypeName() returns a short string that identifies
// the purpose of this storage (e.g., "ac", "cas").
GetStorageTypeName() string
// GetCapabilitiesProvider() returns a provider of REv2
// ServerCapabilities messages that should be returned for
// backends that can't report their own capabilities. This
Expand Down
4 changes: 4 additions & 0 deletions pkg/blobstore/configuration/blob_replicator_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ type BlobReplicatorCreator interface {
// type. For example, sending replication requests over gRPC is
// only supported for the Content Addressable Storage.
NewCustomBlobReplicator(configuration *pb.BlobReplicatorConfiguration, source blobstore.BlobAccess, sink BlobAccessInfo) (replication.BlobReplicator, error)

// GetStorageTypeName returns the name of the storage type that
// this BlobReplicatorCreator is able to create BlobReplicators for.
GetStorageTypeName() string
}
4 changes: 0 additions & 4 deletions pkg/blobstore/configuration/cas_blob_access_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func (bac *casBlobAccessCreator) GetReadBufferFactory() blobstore.ReadBufferFact
return blobstore.CASReadBufferFactory
}

func (bac *casBlobAccessCreator) GetStorageTypeName() string {
return "cas"
}

func (bac *casBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provider {
return casCapabilitiesProvider
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/blobstore/configuration/cas_blob_replicator_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func NewCASBlobReplicatorCreator(grpcClientFactory grpc.ClientFactory) BlobRepli
}
}

func (brc *casBlobReplicatorCreator) GetStorageTypeName() string {
return "cas"
}

func (brc *casBlobReplicatorCreator) NewCustomBlobReplicator(configuration *pb.BlobReplicatorConfiguration, source blobstore.BlobAccess, sink BlobAccessInfo) (replication.BlobReplicator, error) {
switch mode := configuration.Mode.(type) {
case *pb.BlobReplicatorConfiguration_Deduplicating:
Expand Down
4 changes: 0 additions & 4 deletions pkg/blobstore/configuration/icas_blob_access_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ func (bac *icasBlobAccessCreator) GetReadBufferFactory() blobstore.ReadBufferFac
return blobstore.ICASReadBufferFactory
}

func (bac *icasBlobAccessCreator) GetStorageTypeName() string {
return "icas"
}

func (bac *icasBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provider {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/blobstore/configuration/icas_blob_replicator_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ func (brc icasBlobReplicatorCreator) NewCustomBlobReplicator(configuration *pb.B
return nil, status.Error(codes.InvalidArgument, "Configuration did not contain a supported replicator")
}
}

func (brc icasBlobReplicatorCreator) GetStorageTypeName() string {
return "icas"
}
20 changes: 14 additions & 6 deletions pkg/blobstore/configuration/new_blob_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package configuration
import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/replication"
"github.com/buildbarn/bb-storage/pkg/clock"
"github.com/buildbarn/bb-storage/pkg/digest"
pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore"

Expand All @@ -17,20 +18,22 @@ func NewBlobReplicatorFromConfiguration(configuration *pb.BlobReplicatorConfigur
if configuration == nil {
return nil, status.Error(codes.InvalidArgument, "Replicator configuration not specified")
}
storageTypeName := creator.GetStorageTypeName()
var configuredBlobReplicator replication.BlobReplicator
switch mode := configuration.Mode.(type) {
case *pb.BlobReplicatorConfiguration_ConcurrencyLimiting:
base, err := NewBlobReplicatorFromConfiguration(mode.ConcurrencyLimiting.Base, source, sink, creator)
if err != nil {
return nil, err
}
return replication.NewConcurrencyLimitingBlobReplicator(
configuredBlobReplicator = replication.NewConcurrencyLimitingBlobReplicator(
base,
sink.BlobAccess,
semaphore.NewWeighted(mode.ConcurrencyLimiting.MaximumConcurrency)), nil
semaphore.NewWeighted(mode.ConcurrencyLimiting.MaximumConcurrency))
case *pb.BlobReplicatorConfiguration_Local:
return replication.NewLocalBlobReplicator(source, sink.BlobAccess), nil
configuredBlobReplicator = replication.NewLocalBlobReplicator(source, sink.BlobAccess)
case *pb.BlobReplicatorConfiguration_Noop:
return replication.NewNoopBlobReplicator(source), nil
configuredBlobReplicator = replication.NewNoopBlobReplicator(source)
case *pb.BlobReplicatorConfiguration_Queued:
base, err := NewBlobReplicatorFromConfiguration(mode.Queued.Base, source, sink, creator)
if err != nil {
Expand All @@ -40,8 +43,13 @@ func NewBlobReplicatorFromConfiguration(configuration *pb.BlobReplicatorConfigur
if err != nil {
return nil, err
}
return replication.NewQueuedBlobReplicator(source, base, existenceCache), nil
configuredBlobReplicator = replication.NewQueuedBlobReplicator(source, base, existenceCache)
default:
return creator.NewCustomBlobReplicator(configuration, source, sink)
var err error
configuredBlobReplicator, err = creator.NewCustomBlobReplicator(configuration, source, sink)
if err != nil {
return nil, err
}
}
return replication.NewMetricsBlobReplicator(configuredBlobReplicator, clock.SystemClock, storageTypeName), nil
}
77 changes: 64 additions & 13 deletions pkg/blobstore/local/flat_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"sync"
"time"

"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
Expand All @@ -19,15 +20,35 @@ import (
var (
flatBlobAccessPrometheusMetrics sync.Once

flatBlobAccessRefreshes = prometheus.NewHistogramVec(
flatBlobAccessRefreshesBlobs = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "buildbarn",
Subsystem: "blobstore",
Name: "flat_blob_access_refreshes",
Name: "flat_blob_access_refreshes_blobs",
Help: "The number of blobs that were refreshed when requested",
Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...),
},
[]string{"storage_type", "operation"})

flatBlobAccessRefreshesDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "buildbarn",
Subsystem: "blobstore",
Name: "flat_blob_access_refreshes_duration_seconds",
Help: "Time spent refreshing blobs in seconds",
Buckets: util.DecimalExponentialBuckets(-3, 6, 2),
},
[]string{"storage_type", "operation"})

flatBlobAccessRefreshesSizeBytes = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "buildbarn",
Subsystem: "blobstore",
Name: "flat_blob_access_refreshes_size_bytes",
Help: "Size of blobs being refreshed in bytes",
Buckets: prometheus.ExponentialBuckets(1.0, 2.0, 33),
},
[]string{"storage_type", "operation"})
)

type flatBlobAccess struct {
Expand All @@ -40,9 +61,16 @@ type flatBlobAccess struct {
lock *sync.RWMutex
refreshLock sync.Mutex

refreshesGet prometheus.Observer
refreshesGetFromComposite prometheus.Observer
refreshesFindMissing prometheus.Observer
refreshesBlobsGet prometheus.Observer
refreshesBlobsGetFromComposite prometheus.Observer
refreshesBlobsFindMissing prometheus.Observer

refreshesBlobsDurationGet prometheus.Observer
refreshesBlobsDurationGetFromComposite prometheus.Observer
refreshesBlobsDurationFindMissing prometheus.Observer
refreshesBlobsSizeGet prometheus.Observer
refreshesBlbosSizeGetFromComposite prometheus.Observer
refreshesBlobsSizeFindMissing prometheus.Observer
}

// NewFlatBlobAccess creates a BlobAccess that forwards all calls to
Expand All @@ -53,7 +81,9 @@ type flatBlobAccess struct {
// any hierarchy.
func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBlobMap, digestKeyFormat digest.KeyFormat, lock *sync.RWMutex, storageType string, capabilitiesProvider capabilities.Provider) blobstore.BlobAccess {
flatBlobAccessPrometheusMetrics.Do(func() {
prometheus.MustRegister(flatBlobAccessRefreshes)
prometheus.MustRegister(flatBlobAccessRefreshesBlobs)
prometheus.MustRegister(flatBlobAccessRefreshesDurationSeconds)
prometheus.MustRegister(flatBlobAccessRefreshesSizeBytes)
})

return &flatBlobAccess{
Expand All @@ -64,9 +94,16 @@ func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBl
digestKeyFormat: digestKeyFormat,
lock: lock,

refreshesGet: flatBlobAccessRefreshes.WithLabelValues(storageType, "Get"),
refreshesGetFromComposite: flatBlobAccessRefreshes.WithLabelValues(storageType, "GetFromComposite"),
refreshesFindMissing: flatBlobAccessRefreshes.WithLabelValues(storageType, "FindMissing"),
refreshesBlobsGet: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "Get"),
refreshesBlobsGetFromComposite: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "GetFromComposite"),
refreshesBlobsFindMissing: flatBlobAccessRefreshesBlobs.WithLabelValues(storageType, "FindMissing"),

refreshesBlobsDurationGet: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "Get"),
refreshesBlobsDurationGetFromComposite: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "GetFromComposite"),
refreshesBlobsDurationFindMissing: flatBlobAccessRefreshesDurationSeconds.WithLabelValues(storageType, "FindMissing"),
refreshesBlobsSizeGet: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "Get"),
refreshesBlbosSizeGetFromComposite: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "GetFromComposite"),
refreshesBlobsSizeFindMissing: flatBlobAccessRefreshesSizeBytes.WithLabelValues(storageType, "FindMissing"),
}
}

Expand Down Expand Up @@ -113,6 +150,8 @@ func (ba *flatBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buf
// TODO: Instead of copying data on the fly, should this be done
// immediately, so that we can prevent potential duplication by
// picking up the refresh lock?
refreshStart := time.Now()

ba.lock.Lock()
location, err = ba.keyLocationMap.Get(key)
if err != nil {
Expand Down Expand Up @@ -144,7 +183,9 @@ func (ba *flatBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buf
ba.lock.Lock()
_, err := ba.finalizePut(putFinalizer, key)
if err == nil {
ba.refreshesGet.Observe(1)
ba.refreshesBlobsGet.Observe(1)
ba.refreshesBlobsSizeGet.Observe(float64(location.SizeBytes))
ba.refreshesBlobsDurationGet.Observe(time.Since(refreshStart).Seconds())
}
ba.lock.Unlock()
if err != nil {
Expand Down Expand Up @@ -201,6 +242,8 @@ func (ba *flatBlobAccess) GetFromComposite(ctx context.Context, parentDigest, ch
var bParentSlicing buffer.Buffer
var putFinalizer LocationBlobPutFinalizer
parentGetter, needsRefresh := ba.locationBlobMap.Get(parentLocation)
// Add refresh start time
refreshStart := time.Now()
if needsRefresh {
// The parent object needs to be refreshed and sliced.
bParent := parentGetter(parentDigest)
Expand Down Expand Up @@ -246,12 +289,15 @@ func (ba *flatBlobAccess) GetFromComposite(ctx context.Context, parentDigest, ch
ba.lock.Lock()
if needsRefresh {
parentLocation, err = ba.finalizePut(putFinalizer, parentKey)
// Add size metric before refresh
ba.refreshesBlbosSizeGetFromComposite.Observe(float64(parentLocation.SizeBytes))
if err != nil {
ba.lock.Unlock()
bChild.Discard()
return buffer.NewBufferFromError(util.StatusWrap(err, "Failed to refresh blob"))
}
ba.refreshesGetFromComposite.Observe(1)
ba.refreshesBlobsDurationGetFromComposite.Observe(time.Since(refreshStart).Seconds())
ba.refreshesBlobsGetFromComposite.Observe(1)
}

// Create key-location map entries for each of the slices. This
Expand Down Expand Up @@ -351,8 +397,10 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (
// one thread.
ba.refreshLock.Lock()
defer ba.refreshLock.Unlock()

// Add refresh start time before the refresh loop
refreshStart := time.Now()
blobsRefreshedSuccessfully := 0
var blobRefreshSizeBytes int64
ba.lock.Lock()
for _, blobToRefresh := range blobsToRefresh {
if location, err := ba.keyLocationMap.Get(blobToRefresh.key); err == nil {
Expand All @@ -361,6 +409,7 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (
// Blob is present and still needs to be
// refreshed. Allocate space for a copy.
b := getter(blobToRefresh.digest)
blobRefreshSizeBytes += location.SizeBytes
putWriter, err := ba.locationBlobMap.Put(location.SizeBytes)
ba.lock.Unlock()
if err != nil {
Expand Down Expand Up @@ -390,6 +439,8 @@ func (ba *flatBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (
}
}
ba.lock.Unlock()
ba.refreshesFindMissing.Observe(float64(blobsRefreshedSuccessfully))
ba.refreshesBlobsFindMissing.Observe(float64(blobsRefreshedSuccessfully))
ba.refreshesBlobsDurationFindMissing.Observe(time.Since(refreshStart).Seconds())
ba.refreshesBlobsSizeFindMissing.Observe(float64(blobRefreshSizeBytes))
return missing.Build(), nil
}
3 changes: 3 additions & 0 deletions pkg/blobstore/replication/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"concurrency_limiting_blob_replicator.go",
"deduplicating_blob_replicator.go",
"local_blob_replicator.go",
"metrics_blob_replicator.go",
"nested_blob_replicator.go",
"noop_blob_replicator.go",
"queued_blob_replicator.go",
Expand All @@ -20,10 +21,12 @@ go_library(
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/blobstore/slicing",
"//pkg/clock",
"//pkg/digest",
"//pkg/proto/replicator",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
Loading

0 comments on commit f5a181e

Please sign in to comment.