Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.1: kv: improve Raft scheduler behavior under CPU starvation #64568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3286,3 +3286,23 @@ func makeReplicationTargets(ids ...int) (targets []roachpb.ReplicationTarget) {
}
return targets
}

func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Determine the node liveness range ID.
livenessRepl := store.LookupReplica(roachpb.RKey(keys.NodeLivenessPrefix))
livenessRangeID := livenessRepl.RangeID

// Assert that the node liveness range is prioritized.
priorityID := store.RaftSchedulerPriorityID()
require.Equal(t, livenessRangeID, priorityID)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
}

// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range.
func (s *Store) RaftSchedulerPriorityID() roachpb.RangeID {
return s.scheduler.PriorityID()
}

// ClearClosedTimestampStorage clears the closed timestamp storage of all
// knowledge about closed timestamps.
func (s *Store) ClearClosedTimestampStorage() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
package kvserver

import (
"bytes"
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
Expand Down Expand Up @@ -312,4 +314,10 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
r.mu.state.Desc = desc

// Prioritize the NodeLiveness Range in the Raft scheduler above all other
// Ranges to ensure that liveness never sees high Raft scheduler latency.
if bytes.HasPrefix(desc.StartKey, keys.NodeLivenessPrefix) {
r.store.scheduler.SetPriorityID(desc.RangeID)
}
}
54 changes: 48 additions & 6 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,31 @@ func (c *rangeIDChunk) Len() int {
// amortizing the allocation/GC cost. Using a chunk queue avoids any copying
// that would occur if a slice were used (the copying would occur on slice
// reallocation).
//
// The queue has a naive understanding of priority and fairness. For the most
// part, it implements a FIFO queueing policy with no prioritization of some
// ranges over others. However, the queue can be configured with up to one
// high-priority range, which will always be placed at the front when added.
type rangeIDQueue struct {
len int

// Default priority.
chunks list.List
len int

// High priority.
priorityID roachpb.RangeID
priorityQueued bool
}

func (q *rangeIDQueue) PushBack(id roachpb.RangeID) {
func (q *rangeIDQueue) Push(id roachpb.RangeID) {
q.len++
if q.priorityID == id {
q.priorityQueued = true
return
}
if q.chunks.Len() == 0 || q.back().WriteCap() == 0 {
q.chunks.PushBack(&rangeIDChunk{})
}
q.len++
if !q.back().PushBack(id) {
panic(fmt.Sprintf(
"unable to push rangeID to chunk: len=%d, cap=%d",
Expand All @@ -81,13 +96,17 @@ func (q *rangeIDQueue) PopFront() (roachpb.RangeID, bool) {
if q.len == 0 {
return 0, false
}
q.len--
if q.priorityQueued {
q.priorityQueued = false
return q.priorityID, true
}
frontElem := q.chunks.Front()
front := frontElem.Value.(*rangeIDChunk)
id, ok := front.PopFront()
if !ok {
panic("encountered empty chunk")
}
q.len--
if front.Len() == 0 && front.WriteCap() == 0 {
q.chunks.Remove(frontElem)
}
Expand All @@ -98,6 +117,15 @@ func (q *rangeIDQueue) Len() int {
return q.len
}

func (q *rangeIDQueue) SetPriorityID(id roachpb.RangeID) {
if q.priorityID != 0 && q.priorityID != id {
panic(fmt.Sprintf(
"priority range ID already set: old=%d, new=%d",
q.priorityID, id))
}
q.priorityID = id
}

func (q *rangeIDQueue) back() *rangeIDChunk {
return q.chunks.Back().Value.(*rangeIDChunk)
}
Expand Down Expand Up @@ -172,6 +200,20 @@ func (s *raftScheduler) Wait(context.Context) {
s.done.Wait()
}

// SetPriorityID configures the single range that the scheduler will prioritize
// above others. Once set, callers are not permitted to change this value.
func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) {
s.mu.Lock()
s.mu.queue.SetPriorityID(id)
s.mu.Unlock()
}

func (s *raftScheduler) PriorityID() roachpb.RangeID {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.queue.priorityID
}

func (s *raftScheduler) worker(ctx context.Context) {
defer s.done.Done()

Expand Down Expand Up @@ -235,7 +277,7 @@ func (s *raftScheduler) worker(ctx context.Context) {
} else {
// There was a concurrent call to one of the Enqueue* methods. Queue the
// range ID for further processing.
s.mu.queue.PushBack(id)
s.mu.queue.Push(id)
s.mu.cond.Signal()
}
}
Expand All @@ -251,7 +293,7 @@ func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.Ra
if newState&stateQueued == 0 {
newState |= stateQueued
queued++
s.mu.queue.PushBack(id)
s.mu.queue.Push(id)
}
s.mu.state[id] = newState
return queued
Expand Down
39 changes: 38 additions & 1 deletion pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestRangeIDChunk(t *testing.T) {
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestRangeIDQueue(t *testing.T) {

const count = 3 * rangeIDChunkSize
for i := 1; i <= count; i++ {
q.PushBack(roachpb.RangeID(i))
q.Push(roachpb.RangeID(i))
if e := i; e != q.Len() {
t.Fatalf("expected %d, but found %d", e, q.Len())
}
Expand All @@ -119,6 +121,41 @@ func TestRangeIDQueue(t *testing.T) {
}
}

func TestRangeIDQueuePrioritization(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var q rangeIDQueue
for _, withPriority := range []bool{false, true} {
if withPriority {
q.SetPriorityID(3)
}

// Push 5 ranges in order, then pop them off.
for i := 1; i <= 5; i++ {
q.Push(roachpb.RangeID(i))
require.Equal(t, i, q.Len())
}
var popped []int
for i := 5; ; i-- {
require.Equal(t, i, q.Len())
id, ok := q.PopFront()
if !ok {
require.Equal(t, i, 0)
break
}
popped = append(popped, int(id))
}

// Assert pop order.
if withPriority {
require.Equal(t, []int{3, 1, 2, 4, 5}, popped)
} else {
require.Equal(t, []int{1, 2, 3, 4, 5}, popped)
}
}
}

type testProcessor struct {
mu struct {
syncutil.Mutex
Expand Down
20 changes: 17 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,18 @@ const (
)

var storeSchedulerConcurrency = envutil.EnvOrDefaultInt(
"COCKROACH_SCHEDULER_CONCURRENCY", 8*runtime.NumCPU())
// For small machines, we scale the scheduler concurrency by the number of
// CPUs. 8*runtime.NumCPU() was determined in 9a68241 (April 2017) as the
// optimal concurrency level on 8 CPU machines. For larger machines, we've
// seen (#56851) that this scaling curve can be too aggressive and lead to
// too much contention in the Raft scheduler, so we cap the concurrency
// level at 96.
//
// As of November 2020, this default value could be re-tuned.
"COCKROACH_SCHEDULER_CONCURRENCY", min(8*runtime.NumCPU(), 96))

var logSSTInfoTicks = envutil.EnvOrDefaultInt(
"COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60,
)
"COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60)

// bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter.
var bulkIOWriteLimit = settings.RegisterPublicByteSizeSetting(
Expand Down Expand Up @@ -2641,3 +2648,10 @@ func ReadClusterVersion(
func init() {
tracing.RegisterTagRemapping("s", "store")
}

func min(a, b int) int {
if a < b {
return a
}
return b
}