Skip to content

Commit

Permalink
Switch sharding struct from array to map
Browse files Browse the repository at this point in the history
  • Loading branch information
meroton-benjamin committed Feb 20, 2025
1 parent 07366c6 commit 4011c29
Show file tree
Hide file tree
Showing 13 changed files with 474 additions and 349 deletions.
2 changes: 1 addition & 1 deletion internal/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ gomock(
gomock(
name = "blobstore_sharding",
out = "blobstore_sharding.go",
interfaces = ["ShardPermuter"],
interfaces = ["ShardSelector"],
library = "//pkg/blobstore/sharding",
mockgen_model_library = "@org_uber_go_mock//mockgen/model",
mockgen_tool = "@org_uber_go_mock//mockgen",
Expand Down
47 changes: 25 additions & 22 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,41 +85,44 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
DigestKeyFormat: slow.DigestKeyFormat,
}, "read_caching", nil
case *pb.BlobAccessConfiguration_Sharding:
backends := make([]blobstore.BlobAccess, 0, len(backend.Sharding.Shards))
weights := make([]uint32, 0, len(backend.Sharding.Shards))
backends := make([]sharding.ShardBackend, 0, len(backend.Sharding.ShardMap))
shards := make([]sharding.Shard, 0, len(backend.Sharding.ShardMap))
keys := make([]string, 0, len(backend.Sharding.ShardMap))
var combinedDigestKeyFormat *digest.KeyFormat
for _, shard := range backend.Sharding.Shards {
if shard.Backend == nil {
// Drained backend.
backends = append(backends, nil)
for key, shard := range backend.Sharding.ShardMap {
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, sharding.ShardBackend{Backend: backend.BlobAccess, Key: key })
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
// Undrained backend.
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}

if shard.Weight == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shards must have positive weights")
}
weights = append(weights, shard.Weight)
shards = append(shards, sharding.Shard{
Key: key,
Weight: shard.Weight,
})
keys = append(keys, key)
}
if combinedDigestKeyFormat == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any undrained backends")
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any backends")
}
shardSelector, err := sharding.NewRendezvousShardSelector(shards)
if err != nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Could not create rendezvous shard selector")
}
return BlobAccessInfo{
BlobAccess: sharding.NewShardingBlobAccess(
backends,
sharding.NewWeightedShardPermuter(weights),
backend.Sharding.HashInitialization),
shardSelector,
),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
case *pb.BlobAccessConfiguration_Mirrored:
Expand Down
7 changes: 3 additions & 4 deletions pkg/blobstore/sharding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sharding",
srcs = [
"shard_permuter.go",
"shard_selector.go",
"sharding_blob_access.go",
"weighted_shard_permuter.go",
"rendezvous_shard_selector.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/sharding",
visibility = ["//visibility:public"],
Expand All @@ -16,7 +16,6 @@ go_library(
"//pkg/digest",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_lazybeaver_xorshift//:xorshift",
"@org_golang_x_sync//errgroup",
],
)
Expand All @@ -25,7 +24,7 @@ go_test(
name = "sharding_test",
srcs = [
"sharding_blob_access_test.go",
"weighted_shard_permuter_test.go",
"rendezvous_shard_selector_test.go",
],
deps = [
":sharding",
Expand Down
146 changes: 146 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package sharding

import (
"encoding/binary"
"fmt"
"crypto/sha256"
"math/bits"
"sort"
)

type rendezvousShard struct {
weight uint32
index int
hash uint64
}

type rendezvousShardSelector struct {
shards []rendezvousShard
}

func hashServer(key string) uint64 {
h := sha256.Sum256([]byte(key))
return binary.BigEndian.Uint64(h[:8])
}

func NewRendezvousShardSelector(shards []Shard) (*rendezvousShardSelector, error) {
if len(shards) == 0 {
return nil, fmt.Errorf("RendezvousShardSelector must have shards to be defined")
}
internalShards := make([]rendezvousShard, 0, len(shards))
keyMap := make(map[uint64]string, len(shards))
for index, shard := range shards {
hash := hashServer(shard.Key)
if collision, exists := keyMap[hash]; exists {
return nil, fmt.Errorf("Hash collision between shards: %s and %s", shard.Key, collision)
}
keyMap[hash] = shard.Key
internalShards = append(internalShards, rendezvousShard{
index: index,
weight: shard.Weight,
hash: hash,
})
}
sort.Slice(internalShards, func(i, j int) bool {
return internalShards[i].hash < internalShards[j].hash
})
return &rendezvousShardSelector{ shards: internalShards }, nil
}

func score(x uint64, weight uint32) uint64 {
// The mathematical formula we are approximating is -weight/log(X) where X
// is a uniform random number between ]0,1[. For stability and performance
// reasons we are foregoing any floating point operations and approximating
// the logarithm.
//
// Since we are interested in the relative ordering rather than the absolute
// value of the score we can pick log2 as our desired implementation. Log2
// is simple to approximate numerically.
//
// x is already random and uniform, we can turn it into a number between 0
// (inclusive) and 1 (exclusive) by simply dividing by MaxUint64+1. By the
// properties of the logarithm we can simplify -log2(x/(MaxUint64+1)) to
// log2(MaxUint64+1)-log2(x), which will be 64-log2(x)
logFixed := uint64(64)<<16 - Log2Fixed(x)
// Replace weight with fixed point representation of weight. We're not using
// floating point math so we relative size of the weight to be as big as
// possible compared to the log. Since weight is 32 bit it is safe to shift
// it by an additional 32 bits.
weightFixed := uint64(weight)<<32
return weightFixed/logFixed
}

const (
LUT_ENTRY_BITS = 6
)
// Lookup table used for the log2 fraction, it is a fixed point representation
// of log2(x) for x between [1,2] which is a a value between 0 and 1. It uses 16
// bits of precision containing 1<<LUT_ENTRY_BITS+1 entries. The entry is picked
// by truncating to the remaining LUT_ENTRY_BITS of precision. We add the last
// value to simplify interpolation logic.
var lut = [(1<<LUT_ENTRY_BITS)+1]uint16 {
0x0000, 0x05ba, 0x0b5d, 0x10eb, 0x1664, 0x1bc8, 0x2119, 0x2656,
0x2b80, 0x3098, 0x359f, 0x3a94, 0x3f78, 0x444c, 0x4910, 0x4dc5,
0x526a, 0x5700, 0x5b89, 0x6003, 0x646f, 0x68ce, 0x6d20, 0x7165,
0x759d, 0x79ca, 0x7dea, 0x81ff, 0x8608, 0x8a06, 0x8dfa, 0x91e2,
0x95c0, 0x9994, 0x9d5e, 0xa11e, 0xa4d4, 0xa881, 0xac24, 0xafbe,
0xb350, 0xb6d9, 0xba59, 0xbdd1, 0xc140, 0xc4a8, 0xc807, 0xcb5f,
0xceaf, 0xd1f7, 0xd538, 0xd872, 0xdba5, 0xded0, 0xe1f5, 0xe513,
0xe82a, 0xeb3b, 0xee45, 0xf149, 0xf446, 0xf73e, 0xfa2f, 0xfd1a,
0x0000, // the overflow of 0x10000, cancels out when interpolating
}

func Log2Fixed(x uint64) uint64 {
// Fixed point approximation of log2 with a lookup table for deterministic
// math. 16 bits of precision represents the fractional value. Calculates
// the logarithm as the sum of three pieces:
//
// 1. The integer value, which is calculated by counting number of bits.
//
// 2. A value calculated by a lookup table of LUT_ENTRY_BITS
//
// 3. The linearly interpolated value between the lookup table and the next
// value.
//
// Since log2(x) = N+log2(x/2^N) we can easily remove the integer part of
// the logaritm. We calculate that exactly by counting the number of bits in
// the number. log(x/2^N) will then be a number between 0 and 1 for which we
// can use a lookup table to get precomputed values.
//
// In contrast with mathematical logarithm, this function is defined for x=0
// removing the need for conditionals the maximum value this function
// produces is 64 << 16 - 1.
msb := bits.Len64(x >> 1)
var bitfield = x << (64 - msb)
index := bitfield >> (64 - LUT_ENTRY_BITS)
interp := bitfield << LUT_ENTRY_BITS >> 16
base := lut[index]
next := lut[index + 1]
delta := uint64(next - base)
frac := uint64(base) << 48 + (delta * interp)
return (uint64(msb) << 16) | uint64(frac) >> 48
}

// A very fast PRNG with strong mixing properties
func splitmix64(x uint64) uint64 {
x ^= x >> 30
x *= 0xbf58476d1ce4e5b9
x ^= x >> 27
x *= 0x94d049bb133111eb
x ^= x >> 31
return x
}

func (s *rendezvousShardSelector) GetShard(hash uint64) int {
var best uint64
var bestIndex int
for _, shard := range s.shards {
mixed := splitmix64(shard.hash^hash)
current := score(mixed, shard.weight)
if current > best {
best = current
bestIndex = shard.index
}
}
return bestIndex
}
92 changes: 92 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package sharding

import (
"fmt"
"math"
"testing"

"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
"github.com/stretchr/testify/require"
)

func TestLog2Fixed(t *testing.T) {
bits := 16
// test all powers of 2 (answer should be exact)
for i := 0; i < 64; i++ {
expected := uint64(i)<<bits
actual := sharding.Log2Fixed(uint64(1)<<i)
require.Equal(t, expected, actual, "Power of two should give exact result")
}
// test numbers < 100_000, expect less than 0.01% relative error from true result
for i := 2; i < 100_000; i++ {
expected := math.Log2(float64(i))
actual := float64(sharding.Log2Fixed(uint64(i))) / math.Pow(2,float64(bits))
require.InEpsilon(t, expected, actual, 1e-5, fmt.Sprintf("Error is too high for %d", i))
}
}

const COUNT = 10_000_000
var precomputedResults = [20]int{3, 2, 0, 3, 3, 3, 0, 0, 1, 3, 0, 3, 1, 2, 2, 2, 3, 3, 1, 3}
var precomputedOccurrences = [5]int{668687, 1332248, 2666353, 4666342, 666370}

func TestRendezvousShardSelectorDistribution(t *testing.T) {
// Distribution across multiple backends
weights := []sharding.Shard{
{Key: "a", Weight: 1},
{Key: "b", Weight: 2},
{Key: "c", Weight: 4},
{Key: "d", Weight: 7},
{Key: "e", Weight: 1},
}
s, err := sharding.NewRendezvousShardSelector(weights)
require.NoError(t, err, "Selector construction should succeed")
results := make([]int, len(precomputedResults))
occurrences := make([]int, len(weights))

// Request the shard for a very large amount of blobs
for i := 0; i < COUNT; i++ {
result := s.GetShard(uint64(i))
if i < len(results) {
results[i] = result
}
occurrences[result] += 1
}

t.Run("Distribution Error", func(t *testing.T) {
// Requests should be fanned out with a small error margin.
weightSum := uint32(0)
for _, shard := range weights {
weightSum += shard.Weight
}
for index, shard := range weights {
require.InEpsilon(t, shard.Weight*COUNT/weightSum, occurrences[index], 1e-2)
}
})

t.Run("Distribution Shape", func(t *testing.T) {
shapeError := "The sharding algorithm has produced unexpected results, changing this distribution is a breaking change to buildbarn"
require.Equal(t, precomputedResults[:], results, shapeError)
require.Equal(t, precomputedOccurrences[:], occurrences, shapeError)
})

t.Run("Stability Test", func(t *testing.T) {
// Removing a shard should only affect the shard that is removed
results = make([]int, 10000)
for i := 0; i < len(results); i++ {
results[i] = s.GetShard(uint64(i))
}
// drop the last shard in the slice
weightsSubset := weights[:len(weights)-1]
sharder, err := sharding.NewRendezvousShardSelector(weightsSubset)
require.NoError(t, err, "Selector construction should succeed")
for i := 0; i < len(results); i++ {
result := sharder.GetShard(uint64(i))
if results[i] == len(weights)-1 {
continue
}
// result should be unchanged for all slices which did not resolve
// to the dropped one
require.Equal(t, results[i], result, "Dropping a shard should not effect other shards")
}
})
}
19 changes: 0 additions & 19 deletions pkg/blobstore/sharding/shard_permuter.go

This file was deleted.

20 changes: 20 additions & 0 deletions pkg/blobstore/sharding/shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sharding

// ShardSelector is an algorithm that for a hash resolves into an index which
// corresponds to the specific backend for that shard.
//
// The algorithm must be stable, the removal of an unavailable backend should
// not result in the reshuffling of any other blobs. It must also be
// numerically stable so that it produces the same result no matter the
// architecture.
type ShardSelector interface {
GetShard(hash uint64) int
}

// A description of shard. The shard selector will resolve to the same shard
// independent of the order of shards, but the returned index will correspond
// to the index sent to the ShardSelectors constructor.
type Shard struct {
Key string
Weight uint32
}
Loading

0 comments on commit 4011c29

Please sign in to comment.