Skip to content

Commit

Permalink
kv: batch enqueue Ranges in Raft scheduler for coalesced heartbeats
Browse files Browse the repository at this point in the history
Relates to cockroachdb#56851.

In cockroachdb#56851, we saw that all of the Raft transport's receiving goroutines
were stuck in the Raft scheduler, attempting to enqueue Ranges in
response to coalesced heartbeats. We saw this in stacktraces like:
```
goroutine 321096 [semacquire]:
sync.runtime_SemacquireMutex(0xc00007099c, 0xc005822a00, 0x1)
	/usr/local/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000070998)
	/usr/local/go/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
	/usr/local/go/src/sync/mutex.go:81
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*raftScheduler).enqueue1(0xc000070980, 0x4, 0x19d8cb, 0x1)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/scheduler.go:261 +0xb0
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*raftScheduler).EnqueueRaftRequest(0xc000070980, 0x19d8cb)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/scheduler.go:299 +0x3e
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*Store).HandleRaftUncoalescedRequest(0xc001136700, 0x4becc00, 0xc019f31b60, 0xc01288e5c0, 0x4ba44c0, 0xc014ff2b40, 0x0)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/store_raft.go:175 +0x201
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*Store).uncoalesceBeats(0xc001136700, 0x4becc00, 0xc019f31b60, 0xc035790a80, 0x37, 0x43, 0x100000001, 0x29b00000000, 0x0, 0x400000004, ...)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/store_raft.go:110 +0x33b
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*Store).HandleRaftRequest(0xc001136700, 0x4becc00, 0xc019f31b60, 0xc02be585f0, 0x4ba44c0, 0xc014ff2b40, 0x0)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/store_raft.go:130 +0x1be
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*RaftTransport).handleRaftRequest(0xc000188780, 0x4becc00, 0xc019f31b60, 0xc02be585f0, 0x4ba44c0, 0xc014ff2b40, 0x0)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/raft_transport.go:299 +0xab
github.com/cockroachdb/cockroach/pkg/kv/kvserver.(*RaftTransport).RaftMessageBatch.func1.1.1(0x4c3fac0, 0xc00d3ccdf0, 0xc000188780, 0x4becc00, 0xc019f31b60, 0x95fe98, 0x40c5720)
	/go/src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/raft_transport.go:370 +0x199
```

In that issue, we also saw that too much concurrency on the Raft
scheduler's Mutex had caused the mutex to collapse (get stuck in the
slow path, in the OS kernel) and hundreds of goroutines to pile up on
it.

We suspect that part of the problem here was that each of the coalesced
heartbeats was locking the Raft scheduler once per Range. So a coalesced
heartbeat that contained 10k ranges would lock the scheduler 10k times
on the receiver.

The commit attempts to alleviate this issue by batch enqueuing Ranges in
the Raft scheduler in response to coalesced heartbeats. This has a
slight fixed overhead (i.e. the need for a slice) but in response,
reduces the load that coalesced heartbeats place on the Raft scheduler's
mutex by a factor of 128 (`enqueueChunkSize`). This should reduce the
impact that a large number of Ranges have on contention in the Raft
scheduler.

Release note (performance improvement): Interactions between Raft heartbeats
and the Raft goroutine pool scheduler are now more efficient and avoid excessive
mutex contention. This was observed to prevent instability on large machines
(32+ vCPU) in clusters with many ranges (50k+ per node).
  • Loading branch information
nvanbenschoten committed Nov 20, 2020
1 parent 63573e0 commit a355438
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
13 changes: 11 additions & 2 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,13 @@ func (s *raftScheduler) enqueueN(addState raftScheduleState, ids ...roachpb.Rang
// Enqueue the ids in chunks to avoid hold raftScheduler.mu for too long.
const enqueueChunkSize = 128

var count int
// Avoid locking for 0 new ranges.
if len(ids) == 0 {
return 0
}

s.mu.Lock()
var count int
for i, id := range ids {
count += s.enqueue1Locked(addState, id)
if (i+1)%enqueueChunkSize == 0 {
Expand Down Expand Up @@ -299,6 +304,10 @@ func (s *raftScheduler) EnqueueRaftRequest(id roachpb.RangeID) {
s.signal(s.enqueue1(stateRaftRequest, id))
}

func (s *raftScheduler) EnqueueRaftTick(ids ...roachpb.RangeID) {
func (s *raftScheduler) EnqueueRaftRequests(ids ...roachpb.RangeID) {
s.signal(s.enqueueN(stateRaftRequest, ids...))
}

func (s *raftScheduler) EnqueueRaftTicks(ids ...roachpb.RangeID) {
s.signal(s.enqueueN(stateRaftTick, ids...))
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestSchedulerLoop(t *testing.T) {
ctx := context.Background()
defer stopper.Stop(ctx)
s.Start(ctx, stopper)
s.EnqueueRaftTick(1, 2, 3)
s.EnqueueRaftTicks(1, 2, 3)

testutils.SucceedsSoon(t, func() error {
const expected = "ready=[] request=[] tick=[1:1,2:1,3:1]"
Expand Down
33 changes: 17 additions & 16 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *Store) uncoalesceBeats(
log.Infof(ctx, "uncoalescing %d beats of type %v: %+v", len(beats), msgT, beats)
}
beatReqs := make([]RaftMessageRequest, len(beats))
var toEnqueue []roachpb.RangeID
for i, beat := range beats {
msg := raftpb.Message{
Type: msgT,
Expand Down Expand Up @@ -105,10 +106,12 @@ func (s *Store) uncoalesceBeats(
log.Infof(ctx, "uncoalesced beat: %+v", beatReqs[i])
}

if err := s.HandleRaftUncoalescedRequest(ctx, &beatReqs[i], respStream); err != nil {
log.Errorf(ctx, "could not handle uncoalesced heartbeat %s", err)
enqueue := s.HandleRaftUncoalescedRequest(ctx, &beatReqs[i], respStream)
if enqueue {
toEnqueue = append(toEnqueue, beat.RangeID)
}
}
s.scheduler.EnqueueRaftRequests(toEnqueue...)
}

// HandleRaftRequest dispatches a raft message to the appropriate Replica. It
Expand All @@ -128,15 +131,19 @@ func (s *Store) HandleRaftRequest(
s.uncoalesceBeats(ctx, req.HeartbeatResps, req.FromReplica, req.ToReplica, raftpb.MsgHeartbeatResp, respStream)
return nil
}
return s.HandleRaftUncoalescedRequest(ctx, req, respStream)
enqueue := s.HandleRaftUncoalescedRequest(ctx, req, respStream)
if enqueue {
s.scheduler.EnqueueRaftRequest(req.RangeID)
}
return nil
}

// HandleRaftUncoalescedRequest dispatches a raft message to the appropriate
// Replica. It requires that s.mu is not held.
// Replica. The method returns whether the Range needs to be enqueued in the
// Raft scheduler. It requires that s.mu is not held.
func (s *Store) HandleRaftUncoalescedRequest(
ctx context.Context, req *RaftMessageRequest, respStream RaftMessageResponseStream,
) *roachpb.Error {

) (enqueue bool) {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
log.Fatalf(ctx, "HandleRaftUncoalescedRequest cannot be given coalesced heartbeats or heartbeat responses, received %s", req)
}
Expand All @@ -151,28 +158,22 @@ func (s *Store) HandleRaftUncoalescedRequest(
}
q := (*raftRequestQueue)(value)
q.Lock()
defer q.Unlock()
if len(q.infos) >= replicaRequestQueueSize {
q.Unlock()
// TODO(peter): Return an error indicating the request was dropped. Note
// that dropping the request is safe. Raft will retry.
s.metrics.RaftRcvdMsgDropped.Inc(1)
return nil
return false
}
q.infos = append(q.infos, raftRequestInfo{
req: req,
respStream: respStream,
})
first := len(q.infos) == 1
q.Unlock()

// processRequestQueue will process all infos in the slice each time it
// runs, so we only need to schedule a Raft request event if we added the
// first info in the slice. Everyone else can rely on the request that added
// the first info already having scheduled a Raft request event.
if first {
s.scheduler.EnqueueRaftRequest(req.RangeID)
}
return nil
return len(q.infos) == 1 /* enqueue */
}

// withReplicaForRequest calls the supplied function with the (lazily
Expand Down Expand Up @@ -601,7 +602,7 @@ func (s *Store) raftTickLoop(ctx context.Context) {
}
s.unquiescedReplicas.Unlock()

s.scheduler.EnqueueRaftTick(rangeIDs...)
s.scheduler.EnqueueRaftTicks(rangeIDs...)
s.metrics.RaftTicks.Inc(1)

case <-s.stopper.ShouldStop():
Expand Down

0 comments on commit a355438

Please sign in to comment.