Skip to content

Commit

Permalink
improve distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Sep 25, 2024
1 parent d8ed814 commit e443e65
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 80 deletions.
2 changes: 1 addition & 1 deletion pkg/experiment/distributor/distribution_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// NewTenantServiceDatasetKey build a distribution key, where
func NewTenantServiceDatasetKey(tenant string, labels []*v1.LabelPair) placement.Key {
func NewTenantServiceDatasetKey(tenant string, labels ...*v1.LabelPair) placement.Key {
dataset := phlaremodel.Labels(labels).Get(phlaremodel.LabelNameServiceName)
return placement.Key{
TenantID: tenant,
Expand Down
175 changes: 104 additions & 71 deletions pkg/experiment/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package distributor

import (
"fmt"
"math/rand"
"slices"
"strings"
"sync"
Expand Down Expand Up @@ -35,6 +36,9 @@ func NewDistributor(placementStrategy placement.Strategy) *Distributor {
}

func (d *Distributor) Distribute(k placement.Key, r ring.ReadRing) (*placement.Placement, error) {
if p := d.placement.Place(k); p != nil {
return p, nil
}
if err := d.updateDistribution(r, d.RingUpdateInterval); err != nil {
return nil, err
}
Expand All @@ -61,17 +65,12 @@ func (d *Distributor) updateDistribution(r ring.ReadRing, maxAge time.Duration)
if x != nil && !x.isExpired(maxAge) {
return nil
}

// Initial capacity.
const defaultShards = 64
if x == nil {
x = newDistribution(defaultShards)
x = newDistribution()
}

if err := x.readRing(r); err != nil {
return fmt.Errorf("failed to read ring: %w", err)
}

d.distribution = x
return nil
}
Expand All @@ -91,44 +90,42 @@ func (d *Distributor) distribute(k placement.Key) *placement.Placement {
// When we create subrings, we need to ensure that each of them has at
// least p shards. However, the data distribution must be restricted
// according to the limits.
allShards := newSubring(s)
tenantShards := allShards.subring(k.Tenant, max(p, tenantSize), s)
datasetShards := tenantShards.subring(k.Dataset, max(p, datasetSize), tenantSize)
all := newSubring(s)
tenant := all.subring(k.Tenant, tenantSize)
dataset := tenant.subring(k.Dataset, datasetSize)
// We pick a shard from the dataset subring: its index is relative
// to the dataset subring.
offset := d.placement.PickShard(k, datasetSize)
// Next we want to find p instances eligible to host the key.
// The choice must be limited to the dataset / tenant subring.
// The iterator is used to iterate over the tenant instances that
// can be used to host the key. In case if the instance is
// unavailable, the caller should try the next one.
loc := &location{
d: d.distribution,
ring: datasetShards,
off: offset,
n: p,
}
// The choice must be limited to the dataset / tenant subring,
// but extended if needed. Note that the instances are not unique.
// We could collect instances lazily and pull them from the iterator,
// however that would complicate the code due to concurrent updates.
instances := make([]ring.InstanceDesc, 0, p)
// Collect instances from the dataset subring.
instances = d.distribution.collect(instances, dataset, offset, p)
// Collect remaining instances from the tenant subring.
instances = d.distribution.collect(instances, tenant, dataset.offset()+dataset.size(), p-len(instances))
// Collect remaining instances from the top level ring.
instances = d.distribution.collect(instances, all, tenant.offset()+tenant.size(), p-len(instances))
return &placement.Placement{
Shard: loc.shard().id,
Instances: loc.instances(),
Shard: uint32(dataset.at(offset)) + 1, // 0 shard ID is a sentinel.
Instances: iter.NewSliceIterator(instances),
}
}

// [0 0 0 1 1 1 2 2 0 1 2 2]
type distribution struct {
timestamp time.Time
shards []shard
shards []uint32 // Shard ID -> Instance ID.
desc []ring.InstanceDesc
perm *perm
}

type shard struct {
id uint32 // 0 shard ID is used as a sentinel (zero value is invalid).
instance uint32 // references the instance in shards.desc.
}

func newDistribution(shards int) *distribution {
func newDistribution() *distribution {
return &distribution{
shards: make([]shard, 0, shards),
timestamp: time.Now(),
perm: new(perm),
}
}

Expand All @@ -151,25 +148,53 @@ func (d *distribution) readRing(r ring.ReadRing) error {
if len(all.Instances) == 0 {
return ring.ErrEmptyRing
}
d.timestamp = time.Now()
d.desc = all.Instances
d.shards = d.shards[:0]
// Jump consistent hashing requires a deterministic order of instances.
// Moreover, instances can be only added to the end, otherwise this may
// cause massive relocations.
slices.SortFunc(d.desc, func(a, b ring.InstanceDesc) int {
return strings.Compare(a.Id, b.Id)
})
i := uint32(0)
// Now we create a mapping of shards to instances.
var tmp [256]uint32 // Stack alloc.
instances := tmp[:0]
for j := range d.desc {
for range all.Instances[j].Tokens {
i++
d.shards = append(d.shards, shard{
id: i,
instance: uint32(j),
})
instances = append(instances, uint32(j))
}
}
d.timestamp = time.Now()
// We use shuffling to avoid hotspots: a contiguous range of shards
// is distributed over instances in a pseudo-random fashion.
// Given that the number of shards and instances is known in advance,
// we maintain a deterministic permutation that perturbs as little as
// possible, when the number of shards or instances changes: only the
// delta moves.
size := len(instances)
d.perm.resize(size)
d.shards = slices.Grow(d.shards, max(0, size-len(d.shards)))[:size]
for j := range d.shards {
d.shards[j] = instances[d.perm.v[j]]
}
return nil
}

// collect n instances from the subring r starting from the offset off.
func (d *distribution) collect(instances []ring.InstanceDesc, r subring, off, n int) []ring.InstanceDesc {
if n <= 0 {
return instances
}
size := r.size()
var added int
for i := off; added < size && added < n; i++ {
a := r.at(i)
s := d.shards[a]
instances = append(instances, d.desc[s])
added++
}
return instances
}

// The inputs are a key and the number of buckets.
// It outputs a bucket number in the range [0, buckets).
//
Expand All @@ -193,67 +218,75 @@ func jump(key uint64, buckets int) int {
// Subring is a utility to calculate the subring
// for a given key withing the available space:
//
// |<---------n----------->| Available space.
// | . a---|---------b . . | Ring.
// | . . . c-----d . . . . | Subring.
//
// [ . a-------|-----b . . ]
// [ . . . . . c-----|---d ]
//
// [ . a-------|-----b . . ]
// [ . . . . . c-----|-x-d ]
//
// [ . a-------|-----b . . ]
// [ . |-x-d . c-----| . . ]
//
// Note that this is not a recursive implementation,
// but a more straightforward one, optimized for the
// case where there can be up to two nested rings.
type subring struct {
// |<---------n----------->| Available space.
// | . a---|---------b . . | Ring.
// | . . . c-----d . . . . | Subring.
n, a, b, c, d int
// For testing purposes jump function can be replaced.
jump func(k uint64, n int) int
}

func newSubring(n int) subring { return subring{n: n, d: n, jump: jump} }
func newSubring(n int) subring { return subring{n: n, b: n, d: n} }

// The function creates a subring of the specified size for the given key.
// The subring offset is calculated with the jump function and is limited
// to m options (sequentially, from the ring beginning).
func (s subring) subring(k uint64, size, m int) subring {
func (s subring) subring(k uint64, size int) subring {
n := s
n.a, n.b = n.c, n.d
n.c = n.a + s.jump(k, min(m, n.b-n.a))
n.c = n.a + jump(k, n.b-n.a)
n.d = n.c + size
return n
}

// The function returns the absolute offset of the relative n.
func (s subring) at(n int) int {
// [ . a-------|-----b . . ]
// [ . . . . . c-----|-x-d ]
//
// [ . a-------|-----b . . ]
// [ . |-x-d . c-----| . . ]
n %= s.d - s.c
x := s.c + n
x = (x - s.a) % (s.b - s.a)
p := (x + s.a) % s.n
return p
}

type location struct {
d *distribution
ring subring
off int
n int
}
func (s subring) offset() int { return s.c - s.a }

func (s subring) size() int { return s.d - s.c }

// Fisher–Yates shuffle with predefined steps.
// Rand source with a seed is not enough as we
// can't guarantee the same sequence of calls
// with identical arguments, which would make
// the state of two instances incoherent.
type perm struct{ v []uint32 }

func (l *location) shard() shard {
a := l.ring.at(l.off)
return l.d.shards[a]
func (p *perm) resize(n int) {
d := max(0, n-len(p.v))
p.v = slices.Grow(p.v, d)[:n]
// We do want to start with 0 (in contrast to the standard
// implementation) as this is required for the n == 1 case:
// we need to zero v[0].
for i := 0; i < n; i++ {
j := steps[i]
p.v[i], p.v[j] = p.v[j], uint32(i)
}
}

func (l *location) instances() iter.Iterator[ring.InstanceDesc] {
instances := make([]ring.InstanceDesc, l.n)
for i := 0; i < l.n; i++ {
a := l.ring.at(l.off + i)
s := l.d.shards[a]
instances[i] = l.d.desc[s.instance]
// The value is a random generated with a crypto/rand.Read,
// and decoded as a little-endian uint64. No fancy math here.
const randSeed = 4349576827832984783

var steps [4 << 10]uint32

func init() {
r := rand.New(rand.NewSource(randSeed))
for i := range steps {
steps[i] = uint32(r.Intn(i + 1))
}
return iter.NewSliceIterator(instances)
}
Loading

0 comments on commit e443e65

Please sign in to comment.