Skip to content

Commit

Permalink
Merge pull request #64568 from nvanbenschoten/backport20.1-56860
Browse files Browse the repository at this point in the history
release-20.1: kv: improve Raft scheduler behavior under CPU starvation
  • Loading branch information
nvanbenschoten authored May 3, 2021
2 parents ae2ef96 + 145b011 commit 1a0b6fa
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 10 deletions.
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3286,3 +3286,23 @@ func makeReplicationTargets(ids ...int) (targets []roachpb.ReplicationTarget) {
}
return targets
}

func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Determine the node liveness range ID.
livenessRepl := store.LookupReplica(roachpb.RKey(keys.NodeLivenessPrefix))
livenessRangeID := livenessRepl.RangeID

// Assert that the node liveness range is prioritized.
priorityID := store.RaftSchedulerPriorityID()
require.Equal(t, livenessRangeID, priorityID)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
}

// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range.
func (s *Store) RaftSchedulerPriorityID() roachpb.RangeID {
return s.scheduler.PriorityID()
}

// ClearClosedTimestampStorage clears the closed timestamp storage of all
// knowledge about closed timestamps.
func (s *Store) ClearClosedTimestampStorage() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
package kvserver

import (
"bytes"
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
Expand Down Expand Up @@ -312,4 +314,10 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
r.mu.state.Desc = desc

// Prioritize the NodeLiveness Range in the Raft scheduler above all other
// Ranges to ensure that liveness never sees high Raft scheduler latency.
if bytes.HasPrefix(desc.StartKey, keys.NodeLivenessPrefix) {
r.store.scheduler.SetPriorityID(desc.RangeID)
}
}
54 changes: 48 additions & 6 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,31 @@ func (c *rangeIDChunk) Len() int {
// amortizing the allocation/GC cost. Using a chunk queue avoids any copying
// that would occur if a slice were used (the copying would occur on slice
// reallocation).
//
// The queue has a naive understanding of priority and fairness. For the most
// part, it implements a FIFO queueing policy with no prioritization of some
// ranges over others. However, the queue can be configured with up to one
// high-priority range, which will always be placed at the front when added.
type rangeIDQueue struct {
len int

// Default priority.
chunks list.List
len int

// High priority.
priorityID roachpb.RangeID
priorityQueued bool
}

func (q *rangeIDQueue) PushBack(id roachpb.RangeID) {
func (q *rangeIDQueue) Push(id roachpb.RangeID) {
q.len++
if q.priorityID == id {
q.priorityQueued = true
return
}
if q.chunks.Len() == 0 || q.back().WriteCap() == 0 {
q.chunks.PushBack(&rangeIDChunk{})
}
q.len++
if !q.back().PushBack(id) {
panic(fmt.Sprintf(
"unable to push rangeID to chunk: len=%d, cap=%d",
Expand All @@ -81,13 +96,17 @@ func (q *rangeIDQueue) PopFront() (roachpb.RangeID, bool) {
if q.len == 0 {
return 0, false
}
q.len--
if q.priorityQueued {
q.priorityQueued = false
return q.priorityID, true
}
frontElem := q.chunks.Front()
front := frontElem.Value.(*rangeIDChunk)
id, ok := front.PopFront()
if !ok {
panic("encountered empty chunk")
}
q.len--
if front.Len() == 0 && front.WriteCap() == 0 {
q.chunks.Remove(frontElem)
}
Expand All @@ -98,6 +117,15 @@ func (q *rangeIDQueue) Len() int {
return q.len
}

func (q *rangeIDQueue) SetPriorityID(id roachpb.RangeID) {
if q.priorityID != 0 && q.priorityID != id {
panic(fmt.Sprintf(
"priority range ID already set: old=%d, new=%d",
q.priorityID, id))
}
q.priorityID = id
}

func (q *rangeIDQueue) back() *rangeIDChunk {
return q.chunks.Back().Value.(*rangeIDChunk)
}
Expand Down Expand Up @@ -172,6 +200,20 @@ func (s *raftScheduler) Wait(context.Context) {
s.done.Wait()
}

// SetPriorityID configures the single range that the scheduler will prioritize
// above others. Once set, callers are not permitted to change this value.
func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) {
s.mu.Lock()
s.mu.queue.SetPriorityID(id)
s.mu.Unlock()
}

func (s *raftScheduler) PriorityID() roachpb.RangeID {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.queue.priorityID
}

func (s *raftScheduler) worker(ctx context.Context) {
defer s.done.Done()

Expand Down Expand Up @@ -235,7 +277,7 @@ func (s *raftScheduler) worker(ctx context.Context) {
} else {
// There was a concurrent call to one of the Enqueue* methods. Queue the
// range ID for further processing.
s.mu.queue.PushBack(id)
s.mu.queue.Push(id)
s.mu.cond.Signal()
}
}
Expand All @@ -251,7 +293,7 @@ func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.Ra
if newState&stateQueued == 0 {
newState |= stateQueued
queued++
s.mu.queue.PushBack(id)
s.mu.queue.Push(id)
}
s.mu.state[id] = newState
return queued
Expand Down
39 changes: 38 additions & 1 deletion pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestRangeIDChunk(t *testing.T) {
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestRangeIDQueue(t *testing.T) {

const count = 3 * rangeIDChunkSize
for i := 1; i <= count; i++ {
q.PushBack(roachpb.RangeID(i))
q.Push(roachpb.RangeID(i))
if e := i; e != q.Len() {
t.Fatalf("expected %d, but found %d", e, q.Len())
}
Expand All @@ -119,6 +121,41 @@ func TestRangeIDQueue(t *testing.T) {
}
}

func TestRangeIDQueuePrioritization(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var q rangeIDQueue
for _, withPriority := range []bool{false, true} {
if withPriority {
q.SetPriorityID(3)
}

// Push 5 ranges in order, then pop them off.
for i := 1; i <= 5; i++ {
q.Push(roachpb.RangeID(i))
require.Equal(t, i, q.Len())
}
var popped []int
for i := 5; ; i-- {
require.Equal(t, i, q.Len())
id, ok := q.PopFront()
if !ok {
require.Equal(t, i, 0)
break
}
popped = append(popped, int(id))
}

// Assert pop order.
if withPriority {
require.Equal(t, []int{3, 1, 2, 4, 5}, popped)
} else {
require.Equal(t, []int{1, 2, 3, 4, 5}, popped)
}
}
}

type testProcessor struct {
mu struct {
syncutil.Mutex
Expand Down
20 changes: 17 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,18 @@ const (
)

var storeSchedulerConcurrency = envutil.EnvOrDefaultInt(
"COCKROACH_SCHEDULER_CONCURRENCY", 8*runtime.NumCPU())
// For small machines, we scale the scheduler concurrency by the number of
// CPUs. 8*runtime.NumCPU() was determined in 9a68241 (April 2017) as the
// optimal concurrency level on 8 CPU machines. For larger machines, we've
// seen (#56851) that this scaling curve can be too aggressive and lead to
// too much contention in the Raft scheduler, so we cap the concurrency
// level at 96.
//
// As of November 2020, this default value could be re-tuned.
"COCKROACH_SCHEDULER_CONCURRENCY", min(8*runtime.NumCPU(), 96))

var logSSTInfoTicks = envutil.EnvOrDefaultInt(
"COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60,
)
"COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60)

// bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter.
var bulkIOWriteLimit = settings.RegisterPublicByteSizeSetting(
Expand Down Expand Up @@ -2641,3 +2648,10 @@ func ReadClusterVersion(
func init() {
tracing.RegisterTagRemapping("s", "store")
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

0 comments on commit 1a0b6fa

Please sign in to comment.