Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return back improved key_ttl #175

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
return BlobAccessInfo{}, "", util.StatusWrap(err, "Failed to obtain TLS configuration")
}

var keyTTL time.Duration
if backend.Redis.KeyTtl != nil {
if err := backend.Redis.DialTimeout.CheckValid(); err != nil {
return BlobAccessInfo{}, "", util.StatusWrap(err, "Failed to obtain key TTL configuration")
}
keyTTL = backend.Redis.KeyTtl.AsDuration()
}

var replicationTimeout time.Duration
if backend.Redis.ReplicationTimeout != nil {
if err := backend.Redis.ReplicationTimeout.CheckValid(); err != nil {
Expand Down Expand Up @@ -191,6 +199,7 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
redisClient,
readBufferFactory,
digestKeyFormat,
keyTTL,
backend.Redis.ReplicationCount,
replicationTimeout,
creator.GetDefaultCapabilitiesProvider()),
Expand Down
49 changes: 42 additions & 7 deletions pkg/blobstore/redis_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ type redisBlobAccess struct {
redisClient RedisClient
readBufferFactory ReadBufferFactory
digestKeyFormat digest.KeyFormat
keyTTL time.Duration
replicationCount int64
replicationTimeout int
}

// NewRedisBlobAccess creates a BlobAccess that uses Redis as its
// backing store.
func NewRedisBlobAccess(redisClient RedisClient, readBufferFactory ReadBufferFactory, digestKeyFormat digest.KeyFormat, replicationCount int64, replicationTimeout time.Duration, capabilitiesProvider capabilities.Provider) BlobAccess {
func NewRedisBlobAccess(redisClient RedisClient, readBufferFactory ReadBufferFactory, digestKeyFormat digest.KeyFormat, keyTTL time.Duration, replicationCount int64, replicationTimeout time.Duration, capabilitiesProvider capabilities.Provider) BlobAccess {
return &redisBlobAccess{
Provider: capabilitiesProvider,

redisClient: redisClient,
readBufferFactory: readBufferFactory,
digestKeyFormat: digestKeyFormat,
keyTTL: keyTTL,
replicationCount: int64(replicationCount),
replicationTimeout: int(replicationTimeout.Milliseconds()),
}
Expand All @@ -53,7 +55,13 @@ func (ba *redisBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer
return buffer.NewBufferFromError(err)
}
key := digest.GetKey(ba.digestKeyFormat)
value, err := ba.redisClient.Get(ctx, key).Bytes()
var value []byte
var err error
if ba.keyTTL > 0 {
value, err = ba.redisClient.GetEx(ctx, key, ba.keyTTL).Bytes()
} else {
value, err = ba.redisClient.Get(ctx, key).Bytes()
}
if err == redis.Nil {
return buffer.NewBufferFromError(util.StatusWrapWithCode(err, codes.NotFound, "Blob not found"))
} else if err != nil {
Expand Down Expand Up @@ -88,7 +96,7 @@ func (ba *redisBlobAccess) Put(ctx context.Context, digest digest.Digest, b buff
if err != nil {
return util.StatusWrapWithCode(err, codes.Unavailable, "Failed to put blob")
}
if err := ba.redisClient.Set(ctx, digest.GetKey(ba.digestKeyFormat), value, 0).Err(); err != nil {
if err := ba.redisClient.Set(ctx, digest.GetKey(ba.digestKeyFormat), value, ba.keyTTL).Err(); err != nil {
return util.StatusWrapWithCode(err, codes.Unavailable, "Failed to put blob")
}
return ba.waitIfReplicationEnabled(ctx)
Expand Down Expand Up @@ -123,11 +131,11 @@ func (ba *redisBlobAccess) FindMissing(ctx context.Context, digests digest.Set)
return digest.EmptySet, nil
}

// Execute "EXISTS" requests all in a single pipeline.
// Execute "TOUCH/EXPIRE" requests all in a single pipeline.
pipeline := ba.redisClient.Pipeline()
cmds := make([]*redis.IntCmd, 0, digests.Length())
cmds := make([]RedisCmd, 0, digests.Length())
for _, digest := range digests.Items() {
cmds = append(cmds, pipeline.Exists(ctx, digest.GetKey(ba.digestKeyFormat)))
cmds = append(cmds, touchOrExpire(ctx, pipeline, digest.GetKey(ba.digestKeyFormat), ba.keyTTL))
}
if _, err := pipeline.Exec(ctx); err != nil {
return digest.EmptySet, util.StatusWrapWithCode(err, codes.Unavailable, "Failed to find missing blobs")
Expand All @@ -136,10 +144,37 @@ func (ba *redisBlobAccess) FindMissing(ctx context.Context, digests digest.Set)
missing := digest.NewSetBuilder()
i := 0
for _, digest := range digests.Items() {
if cmds[i].Val() == 0 {
if !cmds[i].Val() {
missing.Add(digest)
}
i++
}
return missing.Build(), nil
}

type RedisCmd interface {
Val() bool
}

type intCmdWrapper struct {
cmd *redis.IntCmd
}

func (w *intCmdWrapper) Val() bool {
return w.cmd.Val() != 0
}

type boolCmdWrapper struct {
cmd *redis.BoolCmd
}

func (w *boolCmdWrapper) Val() bool {
return w.cmd.Val()
}

func touchOrExpire(ctx context.Context, cmdable redis.Cmdable, key string, ttl time.Duration) RedisCmd {
if ttl > 0 {
return &boolCmdWrapper{cmd: cmdable.Expire(ctx, key, ttl)}
}
return &intCmdWrapper{cmd: cmdable.Touch(ctx, key)}
}
2 changes: 1 addition & 1 deletion pkg/blobstore/redis_blob_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRedisBlobAccessContextCanceled(t *testing.T) {

redisClient := mock.NewMockRedisClient(ctrl)
capabilitiesProvider := mock.NewMockCapabilitiesProvider(ctrl)
blobAccess := blobstore.NewRedisBlobAccess(redisClient, blobstore.CASReadBufferFactory, digest.KeyWithoutInstance, 0, 0, capabilitiesProvider)
blobAccess := blobstore.NewRedisBlobAccess(redisClient, blobstore.CASReadBufferFactory, digest.KeyWithoutInstance, 0, 0, 0, capabilitiesProvider)

canceledCtx, cancel := context.WithCancel(ctx)
cancel()
Expand Down
11 changes: 8 additions & 3 deletions pkg/proto/configuration/blobstore/blobstore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,14 @@ message RedisBlobAccessConfiguration {
// when not set.
buildbarn.configuration.tls.ClientConfiguration tls = 4;

// key_ttl was removed because it gives the wrong eviction
// behaviour. Use redis's own eviction policies instead.
reserved 7;
// How long to keep a cache key in Redis, specified as a duration.
// When unset, this means keys do not expire and and rely on Redis
// eviction policy to efficiently remove keys when storage gets full.
// A reasonable number for this would allow keys to live long enough
// objects to be found in the CAS once the client uploads them.
// TTL is set on Put() method and updated every time object
// is accessed by Get() and FindMissing() methods.
google.protobuf.Duration key_ttl = 7;

// The minimum number of replicas to successfully replicate put calls to
// before considering it successful.
Expand Down