Skip to content

Commit

Permalink
sharedcache: refactor block math
Browse files Browse the repository at this point in the history
This change moves all offset/block-index math to a special `blockMath`
type and switches to bit operations instead of division and
multiplication.
  • Loading branch information
RaduBerinde committed Jun 14, 2023
1 parent 32834aa commit 2fed2e7
Showing 1 changed file with 79 additions and 36 deletions.
115 changes: 79 additions & 36 deletions objstorage/objstorageprovider/sharedcache/shared_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"io"
"math/bits"
"sync"
"sync/atomic"

Expand All @@ -29,7 +30,7 @@ type Cache struct {
shards []shard
logger base.Logger

blockSize int
bm blockMath

writeBackWaitGroup sync.WaitGroup
// TODO(josh): Have a dedicated metrics struct. Right now, this
Expand All @@ -48,8 +49,8 @@ func Open(
}

sc := &Cache{
logger: logger,
blockSize: blockSize,
logger: logger,
bm: makeBlockMath(blockSize),
}
sc.shards = make([]shard, numShards)
blocksPerShard := sizeBytes / int64(numShards) / int64(blockSize)
Expand Down Expand Up @@ -118,7 +119,7 @@ func (c *Cache) ReadAt(
p = p[n:]

if invariants.Enabled {
if n != 0 && ofs%int64(c.blockSize) != 0 {
if n != 0 && c.bm.Remainder(ofs) != 0 {
panic(fmt.Sprintf("after non-zero read from cache, ofs is not block-aligned: %v %v", ofs, n))
}
}
Expand All @@ -132,15 +133,14 @@ func (c *Cache) ReadAt(

// We must do reads with offset & size that are multiples of the block size. Else
// later cache hits may return incorrect zeroed results from the cache.
firstBlockInd := ofs / int64(c.blockSize)
adjustedOfs := firstBlockInd * int64(c.blockSize)
firstBlockInd := c.bm.Block(ofs)
adjustedOfs := c.bm.BlockOffset(firstBlockInd)

// Take the length of what is left to read plus the length of the adjustment of
// the offset plus the size of a block minus one and divide by the size of a block
// to get the number of blocks to read from the object.
sizeOfOffAdjustment := int(ofs - adjustedOfs)
numBlocksToRead := ((len(p) + sizeOfOffAdjustment) + (c.blockSize - 1)) / c.blockSize
adjustedLen := numBlocksToRead * c.blockSize
adjustedLen := int(c.bm.RoundUp(int64(len(p) + sizeOfOffAdjustment)))
adjustedP := make([]byte, adjustedLen)

// Read the rest from the object. We may need to cap the length to avoid past EOF reads.
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ err
// If all of p is not written to the shard, set returns a non-nil error.
func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
if invariants.Enabled {
if ofs%int64(c.blockSize) != 0 || len(p)%c.blockSize != 0 {
if c.bm.Remainder(ofs) != 0 || c.bm.Remainder(int64(len(p))) != 0 {
panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
}
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
type shard struct {
file vfs.File
sizeInBlocks int64
blockSize int
bm blockMath
mu struct {
sync.Mutex
// TODO(josh): None of these datastructures are space-efficient.
Expand All @@ -268,13 +268,10 @@ type shard struct {
type whereMap map[logicalBlockID]cacheBlockIndex

type logicalBlockID struct {
filenum base.DiskFileNum
// Offset into the file, in units of shard.BlockSize.
offsetInUnitsOfBlocks int64
filenum base.DiskFileNum
cacheBlockIdx cacheBlockIndex
}

type cacheBlockIndex int64

type lockState int64

const (
Expand All @@ -298,7 +295,7 @@ func (s *shard) init(
if blockSize < 1024 || shardingBlockSize%blockSize != 0 {
return errors.Newf("invalid block size %d (must divide %d)", blockSize, shardingBlockSize)
}
s.blockSize = blockSize
s.bm = makeBlockMath(blockSize)
file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)))
if err != nil {
return err
Expand Down Expand Up @@ -356,11 +353,11 @@ func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ err
// in units of sstable block size.
for {
k := logicalBlockID{
filenum: fileNum,
offsetInUnitsOfBlocks: (ofs + int64(n)) / int64(s.blockSize),
filenum: fileNum,
cacheBlockIdx: s.bm.Block(ofs + int64(n)),
}
s.mu.Lock()
cacheBlockInd, ok := s.mu.where[k]
cacheBlockIdx, ok := s.mu.where[k]
// TODO(josh): Multiple reads within the same few milliseconds (anything that is smaller
// than blob storage read latency) that miss on the same logical block ID will not necessarily
// be rare. We may want to do only one read, with the later readers blocking on the first read
Expand All @@ -370,33 +367,31 @@ func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ err
s.mu.Unlock()
return n, nil
}
if s.mu.locks[cacheBlockInd] == writeLockTaken {
if s.mu.locks[cacheBlockIdx] == writeLockTaken {
// In practice, if we have two reads of the same SST block in close succession, we
// would expect the second to hit in the in-memory block cache. So it's not worth
// optimizing this case here.
s.mu.Unlock()
return n, nil
}
s.mu.locks[cacheBlockInd] += readLockTakenInc
s.mu.locks[cacheBlockIdx] += readLockTakenInc
s.mu.Unlock()

readAt := int64(cacheBlockInd) * int64(s.blockSize)
if n == 0 { // if first read
readAt += ofs % int64(s.blockSize)
}
readSize := s.blockSize
readAt := s.bm.BlockOffset(cacheBlockIdx)
readSize := s.bm.BlockSize()
if n == 0 { // if first read
// Cast to int safe since ofs is modded by block size.
readSize -= int(ofs % int64(s.blockSize))
rem := s.bm.Remainder(ofs)
readAt += rem
readSize -= int(rem)
}

if len(p[n:]) <= readSize {
numRead, err := s.file.ReadAt(p[n:], readAt)
s.dropReadLock(cacheBlockInd)
s.dropReadLock(cacheBlockIdx)
return n + numRead, err
}
numRead, err := s.file.ReadAt(p[n:n+readSize], readAt)
s.dropReadLock(cacheBlockInd)
s.dropReadLock(cacheBlockIdx)
if err != nil {
return 0, err
}
Expand All @@ -416,7 +411,7 @@ func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
if ofs/shardingBlockSize != (ofs+int64(len(p))-1)/shardingBlockSize {
panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
}
if ofs%int64(s.blockSize) != 0 || len(p)%s.blockSize != 0 {
if s.bm.Remainder(ofs) != 0 || s.bm.Remainder(int64(len(p))) != 0 {
panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
}
s.assertShardStateIsConsistent()
Expand All @@ -438,13 +433,13 @@ func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {

// If the logical block is already in the cache, we should skip doing a set.
k := logicalBlockID{
filenum: fileNum,
offsetInUnitsOfBlocks: (ofs + int64(n)) / int64(s.blockSize),
filenum: fileNum,
cacheBlockIdx: s.bm.Block(ofs + int64(n)),
}
s.mu.Lock()
if _, ok := s.mu.where[k]; ok {
s.mu.Unlock()
n += s.blockSize
n += s.bm.BlockSize()
continue
}

Expand Down Expand Up @@ -486,9 +481,9 @@ func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
s.mu.locks[cacheBlockInd] = writeLockTaken
s.mu.Unlock()

writeAt := int64(cacheBlockInd) * int64(s.blockSize)
writeAt := s.bm.BlockOffset(cacheBlockInd)

writeSize := s.blockSize
writeSize := s.bm.BlockSize()
if len(p[n:]) <= writeSize {
writeSize = len(p[n:])
}
Expand Down Expand Up @@ -568,3 +563,51 @@ func (s *shard) assertShardStateIsConsistent() {
}
}
}

// cacheBlockIndex is the index of a blockSize-aligned cache block.
type cacheBlockIndex int64

// blockMath is a helper type for performing conversions between offsets and
// block indexes.
type blockMath struct {
blockSizeBits int8
}

func makeBlockMath(blockSize int) blockMath {
bm := blockMath{
blockSizeBits: int8(bits.Len64(uint64(blockSize)) - 1),
}
if blockSize != (1 << bm.blockSizeBits) {
panic(fmt.Sprintf("blockSize %d is not a power of 2", blockSize))
}
return bm
}

func (bm blockMath) mask() int64 {
return (1 << bm.blockSizeBits) - 1
}

// BlockSize returns the block size.
func (bm blockMath) BlockSize() int {
return 1 << bm.blockSizeBits
}

// Block returns the block index containing the given offset.
func (bm blockMath) Block(offset int64) cacheBlockIndex {
return cacheBlockIndex(offset >> bm.blockSizeBits)
}

// Remainder returns the offset relative to the start of the cache block.
func (bm blockMath) Remainder(offset int64) int64 {
return offset & bm.mask()
}

// BlockOffset returns the object offset where the given block starts.
func (bm blockMath) BlockOffset(block cacheBlockIndex) int64 {
return int64(block) << bm.blockSizeBits
}

// RoundUp rounds up the given value to the closest multiple of block size.
func (bm blockMath) RoundUp(x int64) int64 {
return (x + bm.mask()) & ^(bm.mask())
}

0 comments on commit 2fed2e7

Please sign in to comment.