Skip to content

Commit

Permalink
Fixes after rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed May 18, 2020
1 parent 7cf519d commit 39b249d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 47 deletions.
65 changes: 36 additions & 29 deletions pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package storecache
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -36,7 +35,7 @@ const (
opGetRange = "getrange"
opIter = "iter"
opExists = "exists"
opObjectSize = "objectsize"
opAttributes = "attributes"
)

var errObjNotFound = errors.Errorf("object not found")
Expand Down Expand Up @@ -291,50 +290,58 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length
return r, err
}

func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
cfgName, cfg := cb.cfg.findObjectSizeConfig(name)
func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
cfgName, cfg := cb.cfg.findAttributesConfig(name)
if cfg == nil {
return cb.Bucket.ObjectSize(ctx, name)
return cb.Bucket.Attributes(ctx, name)
}

return cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.ttl)
return cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.ttl)
}

func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (uint64, error) {
key := cachingKeyObjectSize(name)
func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
key := cachingKeyAttributes(name)

cb.operationRequests.WithLabelValues(opObjectSize, cfgName).Inc()
cb.operationRequests.WithLabelValues(opAttributes, cfgName).Inc()

hits := cache.Fetch(ctx, []string{key})
if s := hits[key]; len(s) == 8 {
cb.operationHits.WithLabelValues(opObjectSize, cfgName).Inc()
return binary.BigEndian.Uint64(s), nil
if raw, ok := hits[key]; ok {
var attrs objstore.ObjectAttributes
err := json.Unmarshal(raw, &attrs)
if err == nil {
cb.operationHits.WithLabelValues(opAttributes, cfgName).Inc()
return attrs, nil
}

level.Warn(cb.logger).Log("msg", "failed to decode cached Attributes result", "key", key, "err", err)
}

size, err := cb.Bucket.ObjectSize(ctx, name)
attrs, err := cb.Bucket.Attributes(ctx, name)
if err != nil {
return 0, err
return objstore.ObjectAttributes{}, err
}

var buf [8]byte
binary.BigEndian.PutUint64(buf[:], size)
cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl)
if raw, err := json.Marshal(attrs); err == nil {
cache.Store(ctx, map[string][]byte{key: raw}, ttl)
} else {
level.Warn(cb.logger).Log("msg", "failed to encode cached Attributes result", "key", key, "err", err)
}

return size, nil
return attrs, nil
}

func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc()
cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length))

size, err := cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.objectSizeTTL)
attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.attributesTTL)
if err != nil {
return nil, errors.Wrapf(err, "failed to get size of object: %s", name)
return nil, errors.Wrapf(err, "failed to get object attributes: %s", name)
}

// If length goes over object size, adjust length. We use it later to limit number of read bytes.
if uint64(offset+length) > size {
length = int64(size - uint64(offset))
if offset+length > attrs.Size {
length = attrs.Size - offset
}

// Start and end range are subrange-aligned offsets into object, that we're going to read.
Expand All @@ -347,9 +354,9 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
// The very last subrange in the object may have length that is not divisible by subrange size.
lastSubrangeOffset := endRange - cfg.subrangeSize
lastSubrangeLength := int(cfg.subrangeSize)
if uint64(endRange) > size {
lastSubrangeOffset = (int64(size) / cfg.subrangeSize) * cfg.subrangeSize
lastSubrangeLength = int(int64(size) - lastSubrangeOffset)
if endRange > attrs.Size {
lastSubrangeOffset = (attrs.Size / cfg.subrangeSize) * cfg.subrangeSize
lastSubrangeLength = int(attrs.Size - lastSubrangeOffset)
}

numSubranges := (endRange - startRange) / cfg.subrangeSize
Expand All @@ -360,8 +367,8 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
totalRequestedBytes := int64(0)
for off := startRange; off < endRange; off += cfg.subrangeSize {
end := off + cfg.subrangeSize
if end > int64(size) {
end = int64(size)
if end > attrs.Size {
end = attrs.Size
}
totalRequestedBytes += (end - off)

Expand Down Expand Up @@ -489,8 +496,8 @@ func mergeRanges(input []rng, limit int64) []rng {
return input[:last+1]
}

func cachingKeyObjectSize(name string) string {
return fmt.Sprintf("size:%s", name)
func cachingKeyAttributes(name string) string {
return fmt.Sprintf("attrs:%s", name)
}

func cachingKeyObjectSubrange(name string, start int64, end int64) string {
Expand Down
26 changes: 13 additions & 13 deletions pkg/store/cache/caching_bucket_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type CachingBucketConfig struct {
iter map[string]*iterConfig
exists map[string]*existsConfig
getRange map[string]*getRangeConfig
objectSize map[string]*objectSizeConfig
attributes map[string]*attributesConfig
}

func NewCachingBucketConfig() *CachingBucketConfig {
Expand All @@ -32,7 +32,7 @@ func NewCachingBucketConfig() *CachingBucketConfig {
iter: map[string]*iterConfig{},
exists: map[string]*existsConfig{},
getRange: map[string]*getRangeConfig{},
objectSize: map[string]*objectSizeConfig{},
attributes: map[string]*attributesConfig{},
}
}

Expand Down Expand Up @@ -65,11 +65,11 @@ type getRangeConfig struct {
operationConfig
subrangeSize int64
maxSubRequests int
objectSizeTTL time.Duration
attributesTTL time.Duration
subrangeTTL time.Duration
}

type objectSizeConfig struct {
type attributesConfig struct {
operationConfig
ttl time.Duration
}
Expand Down Expand Up @@ -124,19 +124,19 @@ func (cfg *CachingBucketConfig) CacheExists(configName string, cache cache.Cache
// Single "GetRange" requests can result in multiple smaller GetRange sub-requests issued on the underlying bucket.
// MaxSubRequests specifies how many such subrequests may be issued. Values <= 0 mean there is no limit (requests
// for adjacent missing subranges are still merged).
func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cache, matcher func(string) bool, subrangeSize int64, objectSizeTTL, subrangeTTL time.Duration, maxSubRequests int) {
func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cache, matcher func(string) bool, subrangeSize int64, attributesTTL, subrangeTTL time.Duration, maxSubRequests int) {
cfg.getRange[configName] = &getRangeConfig{
operationConfig: newOperationConfig(cache, matcher),
subrangeSize: subrangeSize,
objectSizeTTL: objectSizeTTL,
attributesTTL: attributesTTL,
subrangeTTL: subrangeTTL,
maxSubRequests: maxSubRequests,
}
}

// CacheObjectSize configures caching of "ObjectSize" operation for matching files.
func (cfg *CachingBucketConfig) CacheObjectSize(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) {
cfg.objectSize[configName] = &objectSizeConfig{
// CacheAttributes configures caching of "Attributes" operation for matching files.
func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) {
cfg.attributes[configName] = &attributesConfig{
operationConfig: newOperationConfig(cache, matcher),
ttl: ttl,
}
Expand All @@ -156,8 +156,8 @@ func (cfg *CachingBucketConfig) allConfigNames() map[string][]string {
for n := range cfg.getRange {
result[opGetRange] = append(result[opGetRange], n)
}
for n := range cfg.objectSize {
result[opObjectSize] = append(result[opObjectSize], n)
for n := range cfg.attributes {
result[opAttributes] = append(result[opAttributes], n)
}
return result
}
Expand Down Expand Up @@ -198,8 +198,8 @@ func (cfg *CachingBucketConfig) findGetRangeConfig(name string) (string, *getRan
return "", nil
}

func (cfg *CachingBucketConfig) findObjectSizeConfig(name string) (string, *objectSizeConfig) {
for n, cfg := range cfg.objectSize {
func (cfg *CachingBucketConfig) findAttributesConfig(name string) (string, *attributesConfig) {
for n, cfg := range cfg.attributes {
if cfg.matcher(name) {
return n, cfg
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/store/cache/caching_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func TestObjectSize(t *testing.T) {

cfg := NewCachingBucketConfig()
const cfgName = "test"
cfg.CacheObjectSize(cfgName, cache, matchAll, time.Minute)
cfg.CacheAttributes(cfgName, cache, matchAll, time.Minute)

cb, err := NewCachingBucket(inmem, cfg, nil, nil)
testutil.Ok(t, err)
Expand All @@ -637,16 +637,16 @@ func TestObjectSize(t *testing.T) {

func verifyObjectSize(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) {
t.Helper()
hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName)))
hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName)))

length, err := cb.ObjectSize(context.Background(), file)
attrs, err := cb.Attributes(context.Background(), file)
if expectedLength < 0 {
testutil.Assert(t, cb.IsObjNotFoundErr(err))
} else {
testutil.Ok(t, err)
testutil.Equals(t, uint64(expectedLength), length)
testutil.Equals(t, int64(expectedLength), attrs.Size)

hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName)))
hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName)))
if cacheUsed {
testutil.Equals(t, 1, hitsAfter-hitsBefore)
} else {
Expand Down

0 comments on commit 39b249d

Please sign in to comment.