Skip to content

Commit

Permalink
kvserver: shard Raft scheduler
Browse files Browse the repository at this point in the history
The Raft scheduler mutex can become very contended on machines with many
cores and high range counts. This patch shards the scheduler by
allocating ranges and workers to individual scheduler shards.

By default, we create a new shard for every 12 workers, and distribute
workers evenly. We spin up 8 workers per CPU core, capped at 96, so 12
is equivalent to 1.5 CPUs per shard, or a maximum of 8 shards. This
relieves contention by an order of magnitude at high core counts, while
also avoiding starvation by excessive sharding. The shard size can be
adjusted via `COCKROACH_SCHEDULER_SHARD_SIZE`.

This results in a substantial performance improvement on high-CPU nodes:

```
name                                    old ops/sec  new ops/sec  delta
kv0/enc=false/nodes=3/cpu=96/cloud=aws   40.7k ± 2%   63.4k ± 1%  +55.90%  (p=0.008 n=5+5)
kv0/enc=false/nodes=3/cpu=96/cloud=gce   36.3k ± 2%   44.5k ± 1%  +22.54%  (p=0.008 n=5+5)

name                                    old p50      new p50      delta
kv0/enc=false/nodes=3/cpu=96/cloud=aws    3.00 ± 0%    2.00 ± 0%     ~     (p=0.079 n=4+5)
kv0/enc=false/nodes=3/cpu=96/cloud=gce    4.70 ± 0%    3.98 ± 3%  -15.32%  (p=0.008 n=5+5)

name                                    old p95      new p95      delta
kv0/enc=false/nodes=3/cpu=96/cloud=aws    12.1 ± 0%     6.8 ± 0%  -43.80%  (p=0.008 n=5+5)
kv0/enc=false/nodes=3/cpu=96/cloud=gce    10.0 ± 0%     8.7 ± 3%  -13.00%  (p=0.008 n=5+5)

name                                    old p99      new p99      delta
kv0/enc=false/nodes=3/cpu=96/cloud=aws    14.2 ± 0%    12.1 ± 0%     ~     (p=0.079 n=4+5)
kv0/enc=false/nodes=3/cpu=96/cloud=gce    14.2 ± 0%    13.1 ± 0%   -7.75%  (p=0.008 n=5+5)
```

Furthermore, on an idle 24-core 3-node cluster with 50.000 unquiesced
ranges, this reduced CPU usage from 12% to 10%.

The basic cost of enqueueing ranges in the scheduler (without workers or
contention) only increases slightly in absolute terms, thanks to
`raftSchedulerBatch` pre-sharding the enqueued ranges:

```
SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=8-24         457ns ± 2%    565ns ± 3%   +23.65%  (p=0.000 n=7+8)
SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=16-24        461ns ± 3%    561ns ± 2%   +21.71%  (p=0.000 n=8+8)
SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=32-24        459ns ± 2%    679ns ± 3%   +47.91%  (p=0.000 n=8+8)
SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=64-24        455ns ± 0%   1086ns ± 2%  +138.97%  (p=0.001 n=6+7)
SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=128-24       456ns ± 2%   1838ns ± 1%  +303.51%  (p=0.000 n=8+8)
SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=8-24    7.15ms ± 1%   8.09ms ± 1%   +13.05%  (p=0.000 n=8+7)
SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=16-24   7.13ms ± 1%   8.06ms ± 1%   +13.06%  (p=0.000 n=8+8)
SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=32-24   7.12ms ± 2%  10.07ms ± 0%   +41.34%  (p=0.001 n=8+6)
SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=64-24   7.20ms ± 1%  12.59ms ± 1%   +74.87%  (p=0.000 n=8+8)
SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=128-24  7.12ms ± 2%  17.10ms ± 1%  +140.14%  (p=0.000 n=8+8)
```

Epic: none
Release note (performance improvement): The Raft scheduler is now
sharded to relieve contention during range Raft processing, which can
significantly improve performance at high CPU core counts.
  • Loading branch information
erikgrinaker committed Mar 20, 2023
1 parent 91be4d3 commit b529480
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 100 deletions.
226 changes: 150 additions & 76 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,51 +178,101 @@ type raftScheduleState struct {
ticks int
}

var raftSchedulerBatchPool = sync.Pool{
New: func() interface{} {
return new(raftSchedulerBatch)
},
}

// raftSchedulerBatch is a batch of range IDs to enqueue. It enables
// efficient per-shard enqueueing.
type raftSchedulerBatch [][]roachpb.RangeID // by shard

func newRaftSchedulerBatch(numShards int) raftSchedulerBatch {
b := raftSchedulerBatchPool.Get().(*raftSchedulerBatch)
if len(*b) != numShards {
*b = make([][]roachpb.RangeID, numShards)
}
return *b
}

func (b raftSchedulerBatch) Add(id roachpb.RangeID) {
shard := int(id) % len(b)
b[shard] = append(b[shard], id)
}

func (b raftSchedulerBatch) Reset() {
for i := range b {
b[i] = b[i][:0]
}
}

func (b raftSchedulerBatch) Close() {
b.Reset()
raftSchedulerBatchPool.Put(&b)
}

type raftScheduler struct {
ambientContext log.AmbientContext
processor raftProcessor
latency metric.IHistogram
numWorkers int
maxTicks int

mu struct {
syncutil.Mutex
cond *sync.Cond
queue rangeIDQueue
state map[roachpb.RangeID]raftScheduleState
stopped bool
}
shards []*raftSchedulerShard // RangeID % len(shards)
done sync.WaitGroup
}

done sync.WaitGroup
type raftSchedulerShard struct {
syncutil.Mutex
cond *sync.Cond
queue rangeIDQueue
state map[roachpb.RangeID]raftScheduleState
numWorkers int
maxTicks int
stopped bool
}

func newRaftScheduler(
ambient log.AmbientContext,
metrics *StoreMetrics,
processor raftProcessor,
numWorkers int,
shardSize int,
maxTicks int,
) *raftScheduler {
s := &raftScheduler{
ambientContext: ambient,
processor: processor,
latency: metrics.RaftSchedulerLatency,
numWorkers: numWorkers,
maxTicks: maxTicks,
}
s.mu.cond = sync.NewCond(&s.mu.Mutex)
s.mu.state = make(map[roachpb.RangeID]raftScheduleState)
numShards := 1
if shardSize > 0 && numWorkers > shardSize {
numShards = (numWorkers-1)/shardSize + 1
}
for i := 0; i < numShards; i++ {
shardWorkers := numWorkers / numShards
if i < numWorkers%numShards { // distribute remainder
shardWorkers++
}
shard := &raftSchedulerShard{
state: map[roachpb.RangeID]raftScheduleState{},
numWorkers: shardWorkers,
maxTicks: maxTicks,
}
shard.cond = sync.NewCond(&shard.Mutex)
s.shards = append(s.shards, shard)
}
return s
}

func (s *raftScheduler) Start(stopper *stop.Stopper) {
ctx := s.ambientContext.AnnotateCtx(context.Background())
waitQuiesce := func(context.Context) {
<-stopper.ShouldQuiesce()
s.mu.Lock()
s.mu.stopped = true
s.mu.Unlock()
s.mu.cond.Broadcast()
for _, shard := range s.shards {
shard.Lock()
shard.stopped = true
shard.Unlock()
shard.cond.Broadcast()
}
}
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
Expand All @@ -235,17 +285,23 @@ func (s *raftScheduler) Start(stopper *stop.Stopper) {
waitQuiesce(ctx)
}

s.done.Add(s.numWorkers)
for i := 0; i < s.numWorkers; i++ {
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "raft-worker",
// This task doesn't reference a parent because it runs for the server's
// lifetime.
SpanOpt: stop.SterileRootSpan,
},
s.worker); err != nil {
s.done.Done()
for _, shard := range s.shards {
s.done.Add(shard.numWorkers)
shard := shard // pin loop variable
for i := 0; i < shard.numWorkers; i++ {
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "raft-worker",
// This task doesn't reference a parent because it runs for the server's
// lifetime.
SpanOpt: stop.SterileRootSpan,
},
func(ctx context.Context) {
s.worker(ctx, shard)
},
); err != nil {
s.done.Done()
}
}
}
}
Expand All @@ -257,18 +313,20 @@ func (s *raftScheduler) Wait(context.Context) {
// SetPriorityID configures the single range that the scheduler will prioritize
// above others. Once set, callers are not permitted to change this value.
func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) {
s.mu.Lock()
s.mu.queue.SetPriorityID(id)
s.mu.Unlock()
for _, shard := range s.shards {
shard.Lock()
shard.queue.SetPriorityID(id)
shard.Unlock()
}
}

func (s *raftScheduler) PriorityID() roachpb.RangeID {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.queue.priorityID
s.shards[0].Lock()
defer s.shards[0].Unlock()
return s.shards[0].queue.priorityID
}

func (s *raftScheduler) worker(ctx context.Context) {
func (s *raftScheduler) worker(ctx context.Context, shard *raftSchedulerShard) {
defer s.done.Done()

// We use a sync.Cond for worker notification instead of a buffered
Expand All @@ -278,27 +336,27 @@ func (s *raftScheduler) worker(ctx context.Context) {
// signaling a sync.Cond is significantly faster than selecting and sending
// on a buffered channel.

s.mu.Lock()
shard.Lock()
for {
var id roachpb.RangeID
for {
if s.mu.stopped {
s.mu.Unlock()
if shard.stopped {
shard.Unlock()
return
}
var ok bool
if id, ok = s.mu.queue.PopFront(); ok {
if id, ok = shard.queue.PopFront(); ok {
break
}
s.mu.cond.Wait()
shard.cond.Wait()
}

// Grab and clear the existing state for the range ID. Note that we leave
// the range ID marked as "queued" so that a concurrent Enqueue* will not
// queue the range ID again.
state := s.mu.state[id]
s.mu.state[id] = raftScheduleState{flags: stateQueued}
s.mu.Unlock()
state := shard.state[id]
shard.state[id] = raftScheduleState{flags: stateQueued}
shard.Unlock()

// Record the scheduling latency for the range.
lat := nowNanos() - state.begin
Expand Down Expand Up @@ -333,12 +391,12 @@ func (s *raftScheduler) worker(ctx context.Context) {
s.processor.processReady(id)
}

s.mu.Lock()
state = s.mu.state[id]
shard.Lock()
state = shard.state[id]
if state.flags == stateQueued {
// No further processing required by the range ID, clear it from the
// state map.
delete(s.mu.state, id)
delete(shard.state, id)
} else {
// There was a concurrent call to one of the Enqueue* methods. Queue
// the range ID for further processing.
Expand All @@ -361,48 +419,57 @@ func (s *raftScheduler) worker(ctx context.Context) {
// and the worker does not go back to sleep between the current
// iteration and the next iteration, so no change to num_signals
// is needed.
s.mu.queue.Push(id)
shard.queue.Push(id)
}
}
}

func (s *raftScheduler) enqueue1Locked(
// NewEnqueueBatch creates a new range ID batch for enqueueing via
// EnqueueRaft(Ticks|Requests). The caller must call Close() on the batch when
// done.
func (s *raftScheduler) NewEnqueueBatch() raftSchedulerBatch {
return newRaftSchedulerBatch(len(s.shards))
}

func (ss *raftSchedulerShard) enqueue1Locked(
addFlags raftScheduleFlags, id roachpb.RangeID, now int64,
) int {
ticks := int((addFlags & stateRaftTick) / stateRaftTick) // 0 or 1

prevState := s.mu.state[id]
prevState := ss.state[id]
if prevState.flags&addFlags == addFlags && ticks == 0 {
return 0
}
var queued int
newState := prevState
newState.flags = newState.flags | addFlags
newState.ticks += ticks
if newState.ticks > s.maxTicks {
newState.ticks = s.maxTicks
if newState.ticks > ss.maxTicks {
newState.ticks = ss.maxTicks
}
if newState.flags&stateQueued == 0 {
newState.flags |= stateQueued
queued++
s.mu.queue.Push(id)
ss.queue.Push(id)
}
if newState.begin == 0 {
newState.begin = now
}
s.mu.state[id] = newState
ss.state[id] = newState
return queued
}

func (s *raftScheduler) enqueue1(addFlags raftScheduleFlags, id roachpb.RangeID) int {
func (s *raftScheduler) enqueue1(addFlags raftScheduleFlags, id roachpb.RangeID) {
now := nowNanos()
s.mu.Lock()
defer s.mu.Unlock()
return s.enqueue1Locked(addFlags, id, now)
shard := s.shards[int(id)%len(s.shards)]
shard.Lock()
n := shard.enqueue1Locked(addFlags, id, now)
shard.Unlock()
shard.signal(n)
}

func (s *raftScheduler) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.RangeID) int {
// Enqueue the ids in chunks to avoid hold raftScheduler.mu for too long.
func (ss *raftSchedulerShard) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.RangeID) int {
// Enqueue the ids in chunks to avoid holding mutex for too long.
const enqueueChunkSize = 128

// Avoid locking for 0 new ranges.
Expand All @@ -411,44 +478,51 @@ func (s *raftScheduler) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.Rang
}

now := nowNanos()
s.mu.Lock()
ss.Lock()
var count int
for i, id := range ids {
count += s.enqueue1Locked(addFlags, id, now)
count += ss.enqueue1Locked(addFlags, id, now)
if (i+1)%enqueueChunkSize == 0 {
s.mu.Unlock()
ss.Unlock()
now = nowNanos()
s.mu.Lock()
ss.Lock()
}
}
s.mu.Unlock()
ss.Unlock()
return count
}

func (s *raftScheduler) signal(count int) {
if count >= s.numWorkers {
s.mu.cond.Broadcast()
func (s *raftScheduler) enqueueBatch(addFlags raftScheduleFlags, batch raftSchedulerBatch) {
for i, ids := range batch {
count := s.shards[i].enqueueN(addFlags, ids...)
s.shards[i].signal(count)
}
}

func (ss *raftSchedulerShard) signal(count int) {
if count >= ss.numWorkers {
ss.cond.Broadcast()
} else {
for i := 0; i < count; i++ {
s.mu.cond.Signal()
ss.cond.Signal()
}
}
}

func (s *raftScheduler) EnqueueRaftReady(id roachpb.RangeID) {
s.signal(s.enqueue1(stateRaftReady, id))
s.enqueue1(stateRaftReady, id)
}

func (s *raftScheduler) EnqueueRaftRequest(id roachpb.RangeID) {
s.signal(s.enqueue1(stateRaftRequest, id))
s.enqueue1(stateRaftRequest, id)
}

func (s *raftScheduler) EnqueueRaftRequests(ids ...roachpb.RangeID) {
s.signal(s.enqueueN(stateRaftRequest, ids...))
func (s *raftScheduler) EnqueueRaftRequests(batch raftSchedulerBatch) {
s.enqueueBatch(stateRaftRequest, batch)
}

func (s *raftScheduler) EnqueueRaftTicks(ids ...roachpb.RangeID) {
s.signal(s.enqueueN(stateRaftTick, ids...))
func (s *raftScheduler) EnqueueRaftTicks(batch raftSchedulerBatch) {
s.enqueueBatch(stateRaftTick, batch)
}

func nowNanos() int64 {
Expand Down
Loading

0 comments on commit b529480

Please sign in to comment.