Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: introduce concurrent Raft proposal buffer #38343

Merged
merged 6 commits into from
Jun 26, 2019

Conversation

nvanbenschoten
Copy link
Member

This change introduces a new multi-producer, single-consumer buffer for Raft proposal ingestion into the Raft replication pipeline. This buffer becomes the new coordination point between "above Raft" goroutines, who have just finished evaluation and want to replicate a command, and a Replica's "below Raft" goroutine, which collects these commands and begins the replication process.

The structure improves upon the current approach to this interaction in three important ways. The first is that the structure supports concurrent insertion of proposals by multiple proposer goroutines. This significantly increases the amount of concurrency for non-conflicting writes within a single Range. The proposal buffer does this without exclusive locking using atomics to index into an array. This is complicated by the strong desire for proposals to be proposed in the same order that their MaxLeaseIndex is assigned. The buffer addresses this by selecting a slot in its array and selecting a MaxLeaseIndex for a proposal in a single atomic operation.

The second improvement is that the new structure allows RaftCommand marshaling to be lifted entirely out of any critical section. Previously, the allocation, marshaling, and encoding processes for a RaftCommand was performed under the exclusive Replica lock. Before 91abab1, there was even a second allocation and a copy under this lock. This locking interacted poorly with both "above Raft" processing (which repeatedly acquires a shared lock) and "below Raft" processing (which occasionally acquires an exclusive lock). The new concurrent Raft proposal buffer structure is able to push this allocation and marshaling completely outside of the exclusive or shared Replica lock. It does so despite the fact that the MaxLeaseIndex of the RaftCommand has not been assigned yet by splitting marshaling into two steps and using a new "footer" proto. The first step is to allocate and marshal the majority of the encoded Raft command outside of any lock. The second step is to marshal just the small "footer" proto with the MaxLeaseIndex field into the same byte slice, which has been pre-sized with a small amount of extra capacity, after the MaxLeaseIndex has been selected. This approach lifts a major expense out of the Replica mutex.

The final improvement is to increase the amount of batching performed between Raft proposals. This reduces the number of messages required to coordinate their replication throughout the entire replication pipeline. To start, batching allows multiple Raft entries to be sent in the same MsgApp from the leader to followers. Doing so then results in only a single MsgAppResp being sent for all of these entries back to the leader, instead of one per entry. Finally, a single MsgAppResp results in only a single empty MsgApp with the new commit index being sent from the leader to followers. All of this is made possible by Stepping the Raft RawNode with a MsgProp containing multiple entries instead of using the Propose API directly, which internally Steps the Raft RawNode with a MsgProp containing only one entry. Doing so demonstrated a very large improvement in rafttoy and is showing a similar win here. The proposal buffer provides a clean place to perform this batching, so this is a natural time to introduce it.

Benchmark Results

name                             old ops/sec  new ops/sec  delta
kv95/seq=false/cores=16/nodes=3   67.5k ± 1%   67.2k ± 1%     ~     (p=0.421 n=5+5)
kv95/seq=false/cores=36/nodes=3    144k ± 1%    143k ± 1%     ~     (p=0.320 n=5+5)
kv0/seq=false/cores=16/nodes=3    41.2k ± 2%   42.3k ± 3%   +2.49%  (p=0.000 n=10+10)
kv0/seq=false/cores=36/nodes=3    66.8k ± 2%   69.1k ± 2%   +3.35%  (p=0.000 n=10+10)
kv95/seq=true/cores=16/nodes=3    59.3k ± 1%   62.1k ± 2%   +4.83%  (p=0.008 n=5+5)
kv95/seq=true/cores=36/nodes=3     100k ± 1%    125k ± 1%  +24.37%  (p=0.008 n=5+5)
kv0/seq=true/cores=16/nodes=3     16.1k ± 2%   21.8k ± 4%  +35.21%  (p=0.000 n=9+10)
kv0/seq=true/cores=36/nodes=3     18.4k ± 3%   24.8k ± 2%  +35.29%  (p=0.000 n=10+10)

name                             old p50(ms)  new p50(ms)  delta
kv95/seq=false/cores=16/nodes=3    0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv95/seq=false/cores=36/nodes=3    0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv0/seq=false/cores=16/nodes=3     2.86 ± 2%    2.80 ± 0%   -2.10%  (p=0.011 n=10+10)
kv0/seq=false/cores=36/nodes=3     3.87 ± 2%    3.80 ± 0%   -1.81%  (p=0.003 n=10+10)
kv95/seq=true/cores=16/nodes=3     0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv95/seq=true/cores=36/nodes=3     0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv0/seq=true/cores=16/nodes=3      7.97 ± 2%    5.86 ± 2%  -26.44%  (p=0.000 n=9+10)
kv0/seq=true/cores=36/nodes=3      15.7 ± 0%    11.7 ± 4%  -25.61%  (p=0.000 n=8+10)

name                             old p99(ms)  new p99(ms)  delta
kv95/seq=false/cores=16/nodes=3    2.90 ± 0%    2.94 ± 2%     ~     (p=0.444 n=5+5)
kv95/seq=false/cores=36/nodes=3    3.90 ± 0%    3.98 ± 3%     ~     (p=0.444 n=5+5)
kv0/seq=false/cores=16/nodes=3     8.90 ± 0%    8.40 ± 0%   -5.62%  (p=0.000 n=10+8)
kv0/seq=false/cores=36/nodes=3     11.0 ± 0%    10.4 ± 3%   -5.91%  (p=0.000 n=10+10)
kv95/seq=true/cores=16/nodes=3     4.50 ± 0%    3.18 ± 4%  -29.33%  (p=0.000 n=4+5)
kv95/seq=true/cores=36/nodes=3     11.2 ± 3%     4.7 ± 0%  -58.04%  (p=0.008 n=5+5)
kv0/seq=true/cores=16/nodes=3      11.5 ± 0%     9.4 ± 0%  -18.26%  (p=0.000 n=9+9)
kv0/seq=true/cores=36/nodes=3      19.9 ± 0%    15.3 ± 2%  -22.86%  (p=0.000 n=9+10)

As expected, the majority of the improvement from this change comes when writing to a single Range (i.e. a write hotspot). In those cases, this change (and those in the following two commits) improves performance by up to 35%.

NOTE: the Raft proposal buffer hooks into the rest of the Storage package through a fairly small and well-defined interface. The primary reason for doing so was to make the structure easy to move to a storage/replication package if/when we move in that direction.

@nvanbenschoten nvanbenschoten requested review from ajwerner, tbg and a team June 21, 2019 04:51
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@awoods187
Copy link
Contributor

this is awesome!!

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r1, 1 of 1 files at r2, 15 of 15 files at r3, 7 of 7 files at r4, 1 of 1 files at r5.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @nvanbenschoten)


pkg/storage/replica_proposal_buf.go, line 110 at r3 (raw file):

from that in which


pkg/storage/replica_proposal_buf.go, line 117 at r3 (raw file):
Add a paragraph about the life cycle of a proposal.

Proposals enter the buffer via Insert() and are moved into Raft when the buffer fills up, or when XXX, whichever happens earlier.


pkg/storage/replica_proposal_buf.go, line 163 at r3 (raw file):

}

// LastAssignedLeaseIndex returns the last assigned lease index.

mismatched comment


pkg/storage/replica_proposal_buf.go, line 190 at r3 (raw file):

	isLease := p.Request.IsLeaseRequest()
	req := makePropBufCntReq(!isLease)

nit: you only need the rlock after this line.


pkg/storage/replica_proposal_buf.go, line 238 at r3 (raw file):

// ReinsertLocked inserts a command that has already passed through the proposal
// buffer back into the buffer to be reproposed at a new Raft log index. Unlike
// insert, it does not modify the command or assign a new maximum lease index.

Why are we required to hold an exclusive lock here?


pkg/storage/replica_proposal_buf.go, line 269 at r3 (raw file):

// all racing to flush at the same time.
//
// The method expects that either the proposer's read lock or write lock is held.

Maybe I'm reading this wrong, but we use b.full.Wait() below which I think is tied to rlocker(), so if we enter this method with the exclusive lock, won't the call to Wait unlock an unlocked mutex?


pkg/storage/replica_proposal_buf.go, line 278 at r3 (raw file):

		// NB: We need to check whether the proposer is destroyed before each
		// iteration in case the proposer has been destroyed between the initial
		// check and the current acquisition of the read lock. Failure to do so

Has the caller already checked? If so you could move this to after .Wait() returns.


pkg/storage/replica_proposal_buf.go, line 343 at r3 (raw file):

is non-nil


pkg/storage/replica_proposal_buf.go, line 347 at r3 (raw file):

func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
	// Before returning, make sure to forward the lease index base to at
	// least the proposer's currently applied lease index.

Can you explain what this is for? Or rather, what would go wrong if we didn't do it? I think it's to make sure that should the LAI advance outside of this proposer's control (i.e. other leaseholder commits some stuff and then we get the lease back), something needs to feed it back to the propBuf, and that's this line.

Is this case tested? I'll probably see it when I looked at the tests but asking the question in case I forget.


pkg/storage/replica_proposal_buf.go, line 362 at r3 (raw file):

At least one writer has tried to allocate on top of the full buffer, so notify them to try again.


pkg/storage/replica_proposal_buf.go, line 365 at r3 (raw file):

		// for it to be flushed.
		used = b.arr.len()
		b.full.Broadcast()

Should we defer this until the mutex is "almost released"?


pkg/storage/replica_proposal_buf.go, line 372 at r3 (raw file):

	b.forwardLeaseIndexBase(b.liBase + res.leaseIndexOffset())

	// Iterate through the proposals in the buffer and propose them to Raft.

This looks like it's doing "heavy lifting", do you think it's worth taking this out of the hot path for adding new proposals? Basically swap out a pair of buffers here and have another goroutine work the full buffer into Raft (blocking here only if the bufferer catches up with the Raft-pusher).


pkg/storage/replica_proposal_buf.go, line 431 at r3 (raw file):

		} else {
			// Add to the batch of entries that will soon be proposed.
			ents = append(ents, raftpb.Entry{

Are we unconcerned with how much we stuff into a single batch due to the quota pool limiting what we can expect here to <1mb? It'd be nice to point that out. I also worry that we might end up in a situation in which somehow we get more than Raft will accept into one batch and then it all gets dropped, and on reproposal everything shows up in the same batch again and gets dropped again. This is pretty unlikely because Raft will not apply a limit when there's only one thing in flight (so the situation would resolve itself probabilistically, and probably really quickly) but it'd be good to have these corner cases sussed out and explained.


pkg/storage/replica_proposal_buf.go, line 467 at r3 (raw file):

// FlushLockedWithoutProposing is like FlushLockedWithRaftGroup but it does not
// attempt to propose any of the commands that it is flushing. Instead, it is
// used exclusively to flush all entries in the buffer into the proposals map.

Add the intended usage. I'm confused by who would call this right now. I looked ahead in the review and saw that tryAbandon calls it, which I don't understand because it seems that it will take from all batched up proposals the chance to actually make it into Raft, where it was only supposed to flush out one specific proposal (the one being tryAbandoned.


pkg/storage/replica_proposal_buf.go, line 479 at r3 (raw file):

repeatedly


pkg/storage/replica_proposal_buf.go, line 507 at r3 (raw file):

too large


pkg/storage/replica_proposal_buf.go, line 508 at r3 (raw file):

	case used <= cur/4:
		// The array is too small. Shrink it if possible.
		if cur == propBufArrayMinSize {

Just use <= which makes it easier for the reader to believe that this just works.


pkg/storage/replica_proposal_buf.go, line 518 at r3 (raw file):

			a.shrink = 0
			next := cur / 2
			if next == propBufArrayMinSize {

<=


pkg/storage/replica_proposal_buf.go, line 538 at r3 (raw file):

// replicaProposer is an implementation of proposer that wraps a Replica.
type replicaProposer struct {

Can't you use type replicaProposer Replica here to avoid an extra pointer chase?


pkg/storage/replica_proposal_buf.go, line 567 at r3 (raw file):

func (rp *replicaProposer) withGroupLocked(fn func(*raft.RawNode) error) error {
	return rp.r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {

comment on the true and why it's true.


pkg/storage/replica_proposal_buf_test.go, line 118 at r3 (raw file):

	// Insert another proposal. This causes the buffer to flush. Doing so
	// results in a lease applied index being skipped, which is harmless.

Is that necessary? I agree that it's harmless but it'd be nice if we didn't have frequent gaps built-in. This happens whenever a proposal runs into a full buffer, right? Hmm, doesn't seem trivial to fix so we'll live with it.


pkg/storage/replica_raft.go, line 232 at r3 (raw file):

		r.mu.Lock()
		defer r.mu.Unlock()
		r.mu.proposalBuf.FlushLockedWithoutProposing()

Confused as pointed out in a comment above. Doesn't this wreck other commands that just haoppen to be inflight right now?


pkg/storage/store.go, line 3591 at r3 (raw file):

			}

			repl.raftMu.Lock()

Why is this needed now?

@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/propPipeline2 branch 2 times, most recently from dd3cf40 to 20b0c08 Compare June 25, 2019 00:54
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this pull request Jun 25, 2019
I saw the following race in https://teamcity.cockroachdb.com/viewLog.html?buildId=1356733:

```
Race detected!

------- Stdout: -------
==================
Write at 0x00c001d9c2a8 by goroutine 63:
  github.com/cockroachdb/cockroach/pkg/storage.(*Replica).handleRaftReadyRaftMuLocked()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_raft.go:637 +0x77a
  github.com/cockroachdb/cockroach/pkg/storage.(*Replica).handleRaftReady()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_raft.go:386 +0x16a
  github.com/cockroachdb/cockroach/pkg/storage.(*Store).processReady()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/store.go:3742 +0x171
  github.com/cockroachdb/cockroach/pkg/storage.(*raftScheduler).worker()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/scheduler.go:227 +0x33e
  github.com/cockroachdb/cockroach/pkg/storage.(*raftScheduler).Start.func2()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/scheduler.go:161 +0x55
  github.com/cockroachdb/cockroach/pkg/util/stop.(*Stopper).RunWorker.func1()
      /go/src/github.com/cockroachdb/cockroach/pkg/util/stop/stopper.go:196 +0x15f

Previous read at 0x00c001d9c2a8 by goroutine 154:
  github.com/cockroachdb/cockroach/pkg/storage.TestSnapshotRaftLogLimit.func1()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_raftstorage.go:299 +0xa1
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:865 +0x163
```

I don't see an obvious reason why cockroachdb#38343 would have caused this, but maybe
it caused a new handleRaftReady iteration that allowed the race detector
to observe the issue. Either way, the test was broken and this fixes it.

Release note: None
craig bot pushed a commit that referenced this pull request Jun 25, 2019
38388: storage: avoid data race in TestSnapshotRaftLogLimit r=nvanbenschoten a=nvanbenschoten

I saw the following race in https://teamcity.cockroachdb.com/viewLog.html?buildId=1356733:

```
Race detected!

------- Stdout: -------
==================
Write at 0x00c001d9c2a8 by goroutine 63:
  github.com/cockroachdb/cockroach/pkg/storage.(*Replica).handleRaftReadyRaftMuLocked()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_raft.go:637 +0x77a
  github.com/cockroachdb/cockroach/pkg/storage.(*Replica).handleRaftReady()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_raft.go:386 +0x16a
  github.com/cockroachdb/cockroach/pkg/storage.(*Store).processReady()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/store.go:3742 +0x171
  github.com/cockroachdb/cockroach/pkg/storage.(*raftScheduler).worker()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/scheduler.go:227 +0x33e
  github.com/cockroachdb/cockroach/pkg/storage.(*raftScheduler).Start.func2()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/scheduler.go:161 +0x55
  github.com/cockroachdb/cockroach/pkg/util/stop.(*Stopper).RunWorker.func1()
      /go/src/github.com/cockroachdb/cockroach/pkg/util/stop/stopper.go:196 +0x15f

Previous read at 0x00c001d9c2a8 by goroutine 154:
  github.com/cockroachdb/cockroach/pkg/storage.TestSnapshotRaftLogLimit.func1()
      /go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_raftstorage.go:299 +0xa1
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:865 +0x163
```

I don't see an obvious reason why #38343 would have caused this, but maybe
it caused a new handleRaftReady iteration that allowed the race detector
to observe the issue. Either way, the test was broken and this fixes it.

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This should reduce lock contention in the Raft scheduler. The impact
isn't noticeable, but this still seems like a good change to make.

This commit also adds two TODOs that I'll be exploring.

Release note: None
This hasn't been testing anything since c063211.

Release note: None
Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR!

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @petermattis, and @tbg)


pkg/storage/replica_proposal_buf.go, line 110 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

from that in which

Done.


pkg/storage/replica_proposal_buf.go, line 117 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Add a paragraph about the life cycle of a proposal.

Proposals enter the buffer via Insert() and are moved into Raft when the buffer fills up, or when XXX, whichever happens earlier.

Done.


pkg/storage/replica_proposal_buf.go, line 163 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

mismatched comment

Done.


pkg/storage/replica_proposal_buf.go, line 190 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

nit: you only need the rlock after this line.

Done.


pkg/storage/replica_proposal_buf.go, line 238 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Why are we required to hold an exclusive lock here?

Because we provide flushLocked as the flush function to handleCounterRequestRLocked. It's a little awkward, but we only need this method when we're already holding an exclusive lock and without re-entrant locks, I'm not aware of a way to generalize the interface.


pkg/storage/replica_proposal_buf.go, line 269 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Maybe I'm reading this wrong, but we use b.full.Wait() below which I think is tied to rlocker(), so if we enter this method with the exclusive lock, won't the call to Wait unlock an unlocked mutex?

That's why we pass alwaysFlush = true to the function when holding the exclusive lock. I added to the comment where we do that.


pkg/storage/replica_proposal_buf.go, line 278 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Has the caller already checked? If so you could move this to after .Wait() returns.

No, the caller hasn't already checked in all cases. This should be quick anyway.


pkg/storage/replica_proposal_buf.go, line 343 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

is non-nil

Done.


pkg/storage/replica_proposal_buf.go, line 347 at r3 (raw file):

I think it's to make sure that should the LAI advance outside of this proposer's control (i.e. other leaseholder commits some stuff and then we get the lease back), something needs to feed it back to the propBuf, and that's this line.

Yes, exactly. I added to the comment.

Is this case tested

Yes, see "Increase the proposer's applied lease index and flush".


pkg/storage/replica_proposal_buf.go, line 362 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

At least one writer has tried to allocate on top of the full buffer, so notify them to try again.

Done.


pkg/storage/replica_proposal_buf.go, line 365 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Should we defer this until the mutex is "almost released"?

I've always been unclear about the performance implications of signaling/broadcasting on condition variables before vs. after releasing the corresponding lock. In theory, if we do so before then we'll wake up goroutines who will then need to immediately wait on the mutex. If we do so after then we might avoid them needing to sleep again. However, I've never actually seen a benchmark compare the two and demonstrate an improvement from the latter approach, so I'm not sure where on the scale of "good to do whenever possible" vs. "never worth it if it obscures code" the concern lies. @petermattis do you have a rule of thumb about this?

Still, happy to defer this. Done.


pkg/storage/replica_proposal_buf.go, line 372 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This looks like it's doing "heavy lifting", do you think it's worth taking this out of the hot path for adding new proposals? Basically swap out a pair of buffers here and have another goroutine work the full buffer into Raft (blocking here only if the bufferer catches up with the Raft-pusher).

That's an interesting idea. In general, the process of writing this up highlighted how beneficial it would be to make a clear split between above Raft and below Raft processing. This includes removing below-Raft state from the protection of the Replica mutex. A storage/replication package will require this kind of split, but we don't necessarily need to wait for it. For instance, I've been playing around with moving Replica.mu.proposals from under the Replica mutex to under the Raft mutex and giving Raft "complete ownership" of the map. Doing so avoids the need to lock the Replica mutex when applying entries and pulling out the corresponding ProposalData, which improved perf for these benchmarks by another ~2%.

Until we get to that point though, Replica.mu.raftGroup is currently protected by the Replica mutex, which makes the kind of change you're proposing tricky to coordinate.


pkg/storage/replica_proposal_buf.go, line 431 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Are we unconcerned with how much we stuff into a single batch due to the quota pool limiting what we can expect here to <1mb? It'd be nice to point that out. I also worry that we might end up in a situation in which somehow we get more than Raft will accept into one batch and then it all gets dropped, and on reproposal everything shows up in the same batch again and gets dropped again. This is pretty unlikely because Raft will not apply a limit when there's only one thing in flight (so the situation would resolve itself probabilistically, and probably really quickly) but it'd be good to have these corner cases sussed out and explained.

Good point. I should have been clear here about why I'm not concerned. I added to the comment.


pkg/storage/replica_proposal_buf.go, line 467 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Add the intended usage. I'm confused by who would call this right now. I looked ahead in the review and saw that tryAbandon calls it, which I don't understand because it seems that it will take from all batched up proposals the chance to actually make it into Raft, where it was only supposed to flush out one specific proposal (the one being tryAbandoned.

Done. The main use case for this was in cancelPendingCommandsLocked. It shouldn't have been used in tryAbandon. Does the additional commentary make sense?


pkg/storage/replica_proposal_buf.go, line 479 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

repeatedly

Done.


pkg/storage/replica_proposal_buf.go, line 507 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

too large

Done.


pkg/storage/replica_proposal_buf.go, line 508 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Just use <= which makes it easier for the reader to believe that this just works.

Done.


pkg/storage/replica_proposal_buf.go, line 518 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

<=

Done.


pkg/storage/replica_proposal_buf.go, line 538 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Can't you use type replicaProposer Replica here to avoid an extra pointer chase?

Good point, done.


pkg/storage/replica_proposal_buf.go, line 567 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

comment on the true and why it's true.

Done.


pkg/storage/replica_proposal_buf_test.go, line 118 at r3 (raw file):

This happens whenever a proposal runs into a full buffer, right?

Yes. I took a look at trying to remove it, but it's not easy or obviously possible if we avoid assigning new max lease indexes to lease requests. If we removed that special case then it would be possible by subtracting the overflow, but that would be pretty nasty. As is, the gaps seem harmless to me.


pkg/storage/replica_raft.go, line 232 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Confused as pointed out in a comment above. Doesn't this wreck other commands that just haoppen to be inflight right now?

Removed all of this. See the new commit about simplifying command abandonment.


pkg/storage/store.go, line 3591 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Why is this needed now?

It's not. I was working on a change to push the ownership of the proposal's map completely under the Raft mutex, but that's not part of this PR. Removed.

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:, very nice!

Reviewed 18 of 18 files at r6, 1 of 1 files at r7, 3 of 3 files at r8, 14 of 14 files at r9, 7 of 7 files at r10, 1 of 1 files at r11.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @nvanbenschoten, and @petermattis)


pkg/storage/replica_proposal_buf.go, line 269 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

That's why we pass alwaysFlush = true to the function when holding the exclusive lock. I added to the comment where we do that.

Oh, didn't see that we short circuit before calling .Wait(). Nevertheless, the real semantics are that the read lock must be held when alwaysFlush is false, and the write lock otherwise.


pkg/storage/replica_proposal_buf.go, line 365 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I've always been unclear about the performance implications of signaling/broadcasting on condition variables before vs. after releasing the corresponding lock. In theory, if we do so before then we'll wake up goroutines who will then need to immediately wait on the mutex. If we do so after then we might avoid them needing to sleep again. However, I've never actually seen a benchmark compare the two and demonstrate an improvement from the latter approach, so I'm not sure where on the scale of "good to do whenever possible" vs. "never worth it if it obscures code" the concern lies. @petermattis do you have a rule of thumb about this?

Still, happy to defer this. Done.

Also curious what the general guidance here is, if it doesn't make a measurable difference not deferring is easier to think about.


pkg/storage/replica_proposal_buf.go, line 467 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Done. The main use case for this was in cancelPendingCommandsLocked. It shouldn't have been used in tryAbandon. Does the additional commentary make sense?

Yep, thanks.


pkg/storage/replica_proposal_buf.go, line 113 at r9 (raw file):

are moved


pkg/storage/replica_proposal_buf.go, line 114 at r9 (raw file):

//
// Proposals enter the buffer via Insert() or ReinsertLocked(). They moved into
// Raft when the buffer fills up, or during the next handleRaftReady iteration,

Mention how they are moved into Raft (i.e. which method is called in the process).


pkg/storage/replica_raft.go, line 232 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Removed all of this. See the new commit about simplifying command abandonment.

I think this is safe as written, but it makes me nervous. proposal must only be accessed under both mutexes, so it feels wrong to pull it out and pass it to a method later even though that method requires the right locks. If there isn't a downside to inlining this code back in and making the closure do the map lookup I'd prefer that for clarity.


pkg/storage/replica_raft.go, line 438 at r8 (raw file):

}

// abandonProposal attempt to abandon the provided proposal. When called, it

attempts

Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @petermattis, and @tbg)


pkg/storage/replica_proposal_buf.go, line 269 at r3 (raw file):

the real semantics are that the read lock must be held when alwaysFlush is false, and the write lock otherwise.

Good point. I made this more explicit, which I hope makes it easier to read.


pkg/storage/replica_proposal_buf.go, line 113 at r9 (raw file):

Previously, tbg (Tobias Grieger) wrote…

are moved

Done.


pkg/storage/replica_proposal_buf.go, line 114 at r9 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Mention how they are moved into Raft (i.e. which method is called in the process).

Done.


pkg/storage/replica_raft.go, line 232 at r3 (raw file):

If there isn't a downside to inlining this code back in and making the closure do the map lookup I'd prefer that for clarity.

The proposal won't necessarily be in the map though, which is what was causing issues. I can inline abandonProposal if you'd like, although I don't necessarily think that makes it any cleaner.


pkg/storage/replica_raft.go, line 438 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

attempts

Done.

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 16 of 16 files at r12, 14 of 14 files at r13, 7 of 7 files at r14, 1 of 1 files at r15.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @nvanbenschoten)


pkg/storage/replica_raft.go, line 232 at r3 (raw file):

The proposal won't necessarily be in the map though

Oh, right. Can you point that out? Personally I'd inline this just because I find that these random one-caller-and-only-valid-in-one-particular-situation methods just add a cognitive burden, but I'll defer to your preference.

4da0e1a fixed a bug by ensuring that Raft proposal abandonment synchronized with
below Raft machinery. That is somewhat unfortunate, but necessary to resolve a
use-after-free bug. Now that abandoning proposals requires holding the Raft mutex,
it is always permitted to simply replace a proposal's context without concern that
the proposal is in the middle of being applied (see 2e97e6a). This simplifies the
code.

Release note: None
This change introduces a new multi-producer, single-consumer buffer
for Raft proposal ingestion into the Raft replication pipeline. This
buffer becomes the new coordination point between "above Raft" goroutines,
who have just finished evaluation and want to replicate a command, and
a Replica's "below Raft" goroutine, which collects these commands and
begins the replication process.

The structure improves upon the current approach to this interaction in
three important ways. The first is that the structure supports concurrent
insertion of proposals by multiple proposer goroutines. This significantly
increases the amount of concurrency for non-conflicting writes within a
single Range. The proposal buffer does this without exclusive locking using
atomics to index into an array. This is complicated by the strong desire for
proposals to be proposed in the same order that their MaxLeaseIndex is assigned.
The buffer addresses this by selecting a slot in its array and selecting a
MaxLeaseIndex for a proposal in a single atomic operation.

The second improvement is that the new structure allows RaftCommand marshaling
to be lifted entirely out of any critical section. Previously, the allocation,
marshaling, and encoding processes for a RaftCommand was performed under the
exclusive Replica lock. Before 91abab1, there was even a second allocation and
a copy under this lock. This locking interacted poorly with both "above Raft"
processing (which repeatedly acquires a shared lock) and "below Raft" processing
(which occasionally acquires an exclusive lock). The new concurrent Raft proposal
buffer structure is able to push this allocation and marshaling completely outside
of the exclusive or shared Replica lock. It does so despite the fact that the
MaxLeaseIndex of the RaftCommand has not been assigned yet by splitting marshaling
into two steps and using a new "footer" proto. The first step is to allocate and
marshal the majority of the encoded Raft command outside of any lock. The second
step is to marshal just the small "footer" proto with the MaxLeaseIndex field into
the same byte slice, which has been pre-sized with a small amount of extra capacity,
after the MaxLeaseIndex has been selected. This approach lifts a major expense out
of the Replica mutex.

The final improvement is to increase the amount of batching performed between
Raft proposals. This reduces the number of messages required to coordinate their
replication throughout the entire replication pipeline. To start, batching allows
multiple Raft entries to be sent in the same MsgApp from the leader to followers.
Doing so then results in only a single MsgAppResp being sent for all of these entries
back to the leader, instead of one per entry. Finally, a single MsgAppResp results
in only a single empty MsgApp with the new commit index being sent from the leader
to followers. All of this is made possible by `Step`ping the Raft `RawNode` with a
`MsgProp` containing multiple entries instead of using the `Propose` API directly,
which internally `Step`s the Raft `RawNode` with a `MsgProp` containing only one
entry. Doing so demonstrated a very large improvement in `rafttoy` and is showing
a similar win here. The proposal buffer provides a clean place to perform this
batching, so this is a natural time to introduce it.

\### Benchmark Results

```
name                             old ops/sec  new ops/sec  delta
kv95/seq=false/cores=16/nodes=3   67.5k ± 1%   67.2k ± 1%     ~     (p=0.421 n=5+5)
kv95/seq=false/cores=36/nodes=3    144k ± 1%    143k ± 1%     ~     (p=0.320 n=5+5)
kv0/seq=false/cores=16/nodes=3    41.2k ± 2%   42.3k ± 3%   +2.49%  (p=0.000 n=10+10)
kv0/seq=false/cores=36/nodes=3    66.8k ± 2%   69.1k ± 2%   +3.35%  (p=0.000 n=10+10)
kv95/seq=true/cores=16/nodes=3    59.3k ± 1%   62.1k ± 2%   +4.83%  (p=0.008 n=5+5)
kv95/seq=true/cores=36/nodes=3     100k ± 1%    125k ± 1%  +24.37%  (p=0.008 n=5+5)
kv0/seq=true/cores=16/nodes=3     16.1k ± 2%   21.8k ± 4%  +35.21%  (p=0.000 n=9+10)
kv0/seq=true/cores=36/nodes=3     18.4k ± 3%   24.8k ± 2%  +35.29%  (p=0.000 n=10+10)

name                             old p50(ms)  new p50(ms)  delta
kv95/seq=false/cores=16/nodes=3    0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv95/seq=false/cores=36/nodes=3    0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv0/seq=false/cores=16/nodes=3     2.86 ± 2%    2.80 ± 0%   -2.10%  (p=0.011 n=10+10)
kv0/seq=false/cores=36/nodes=3     3.87 ± 2%    3.80 ± 0%   -1.81%  (p=0.003 n=10+10)
kv95/seq=true/cores=16/nodes=3     0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv95/seq=true/cores=36/nodes=3     0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv0/seq=true/cores=16/nodes=3      7.97 ± 2%    5.86 ± 2%  -26.44%  (p=0.000 n=9+10)
kv0/seq=true/cores=36/nodes=3      15.7 ± 0%    11.7 ± 4%  -25.61%  (p=0.000 n=8+10)

name                             old p99(ms)  new p99(ms)  delta
kv95/seq=false/cores=16/nodes=3    2.90 ± 0%    2.94 ± 2%     ~     (p=0.444 n=5+5)
kv95/seq=false/cores=36/nodes=3    3.90 ± 0%    3.98 ± 3%     ~     (p=0.444 n=5+5)
kv0/seq=false/cores=16/nodes=3     8.90 ± 0%    8.40 ± 0%   -5.62%  (p=0.000 n=10+8)
kv0/seq=false/cores=36/nodes=3     11.0 ± 0%    10.4 ± 3%   -5.91%  (p=0.000 n=10+10)
kv95/seq=true/cores=16/nodes=3     4.50 ± 0%    3.18 ± 4%  -29.33%  (p=0.000 n=4+5)
kv95/seq=true/cores=36/nodes=3     11.2 ± 3%     4.7 ± 0%  -58.04%  (p=0.008 n=5+5)
kv0/seq=true/cores=16/nodes=3      11.5 ± 0%     9.4 ± 0%  -18.26%  (p=0.000 n=9+9)
kv0/seq=true/cores=36/nodes=3      19.9 ± 0%    15.3 ± 2%  -22.86%  (p=0.000 n=9+10)
```

As expected, the majority of the improvement from this change comes when writing
to a single Range (i.e. a write hotspot). In those cases, this change (and those
in the following two commits) improves performance by up to **35%**.

NOTE: the Raft proposal buffer hooks into the rest of the Storage package through
a fairly small and well-defined interface. The primary reason for doing so was
to make the structure easy to move to a `storage/replication` package if/when
we move in that direction.

Release note (performance improvement): Introduced new concurrent Raft
proposal buffer, which increases the degree of write concurrency supported
on a single Range.
This commit addresses a TODO added in the previous commit to remove
the commandSizes map in favor of using the quota size field tracked
directly on the ProposalData object. This allows us to avoid an extra
exclusive lock on each applied command, reducing below Raft lock contention
with above Raft processing (which often holds a shared lock).

Release note: None
This was expensive and unnecessary in order to accomplish what the code
wanted to accomplish. This reduces below Raft lock contention with above
Raft processing (which often holds a shared lock).

Release note: None
Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @tbg)


pkg/storage/replica_raft.go, line 232 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

The proposal won't necessarily be in the map though

Oh, right. Can you point that out? Personally I'd inline this just because I find that these random one-caller-and-only-valid-in-one-particular-situation methods just add a cognitive burden, but I'll defer to your preference.

Done.

@nvanbenschoten
Copy link
Member Author

bors r=tbg

craig bot pushed a commit that referenced this pull request Jun 26, 2019
38343: storage: introduce concurrent Raft proposal buffer r=tbg a=nvanbenschoten

This change introduces a new multi-producer, single-consumer buffer for Raft proposal ingestion into the Raft replication pipeline. This buffer becomes the new coordination point between "above Raft" goroutines, who have just finished evaluation and want to replicate a command, and a Replica's "below Raft" goroutine, which collects these commands and begins the replication process.

The structure improves upon the current approach to this interaction in three important ways. The first is that the structure supports concurrent insertion of proposals by multiple proposer goroutines. This significantly increases the amount of concurrency for non-conflicting writes within a single Range. The proposal buffer does this without exclusive locking using atomics to index into an array. This is complicated by the strong desire for proposals to be proposed in the same order that their MaxLeaseIndex is assigned. The buffer addresses this by selecting a slot in its array and selecting a MaxLeaseIndex for a proposal in a single atomic operation.

The second improvement is that the new structure allows RaftCommand marshaling to be lifted entirely out of any critical section. Previously, the allocation, marshaling, and encoding processes for a RaftCommand was performed under the exclusive Replica lock. Before 91abab1, there was even a second allocation and a copy under this lock. This locking interacted poorly with both "above Raft" processing (which repeatedly acquires a shared lock) and "below Raft" processing (which occasionally acquires an exclusive lock). The new concurrent Raft proposal buffer structure is able to push this allocation and marshaling completely outside of the exclusive or shared Replica lock. It does so despite the fact that the MaxLeaseIndex of the RaftCommand has not been assigned yet by splitting marshaling into two steps and using a new "footer" proto. The first step is to allocate and marshal the majority of the encoded Raft command outside of any lock. The second step is to marshal just the small "footer" proto with the MaxLeaseIndex field into the same byte slice, which has been pre-sized with a small amount of extra capacity, after the MaxLeaseIndex has been selected. This approach lifts a major expense out of the Replica mutex.

The final improvement is to increase the amount of batching performed between Raft proposals. This reduces the number of messages required to coordinate their replication throughout the entire replication pipeline. To start, batching allows multiple Raft entries to be sent in the same MsgApp from the leader to followers. Doing so then results in only a single MsgAppResp being sent for all of these entries back to the leader, instead of one per entry. Finally, a single MsgAppResp results in only a single empty MsgApp with the new commit index being sent from the leader to followers. All of this is made possible by `Step`ping the Raft `RawNode` with a `MsgProp` containing multiple entries instead of using the `Propose` API directly, which internally `Step`s the Raft `RawNode` with a `MsgProp` containing only one entry. Doing so demonstrated a very large improvement in `rafttoy` and is showing a similar win here. The proposal buffer provides a clean place to perform this batching, so this is a natural time to introduce it.

### Benchmark Results

```
name                             old ops/sec  new ops/sec  delta
kv95/seq=false/cores=16/nodes=3   67.5k ± 1%   67.2k ± 1%     ~     (p=0.421 n=5+5)
kv95/seq=false/cores=36/nodes=3    144k ± 1%    143k ± 1%     ~     (p=0.320 n=5+5)
kv0/seq=false/cores=16/nodes=3    41.2k ± 2%   42.3k ± 3%   +2.49%  (p=0.000 n=10+10)
kv0/seq=false/cores=36/nodes=3    66.8k ± 2%   69.1k ± 2%   +3.35%  (p=0.000 n=10+10)
kv95/seq=true/cores=16/nodes=3    59.3k ± 1%   62.1k ± 2%   +4.83%  (p=0.008 n=5+5)
kv95/seq=true/cores=36/nodes=3     100k ± 1%    125k ± 1%  +24.37%  (p=0.008 n=5+5)
kv0/seq=true/cores=16/nodes=3     16.1k ± 2%   21.8k ± 4%  +35.21%  (p=0.000 n=9+10)
kv0/seq=true/cores=36/nodes=3     18.4k ± 3%   24.8k ± 2%  +35.29%  (p=0.000 n=10+10)

name                             old p50(ms)  new p50(ms)  delta
kv95/seq=false/cores=16/nodes=3    0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv95/seq=false/cores=36/nodes=3    0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv0/seq=false/cores=16/nodes=3     2.86 ± 2%    2.80 ± 0%   -2.10%  (p=0.011 n=10+10)
kv0/seq=false/cores=36/nodes=3     3.87 ± 2%    3.80 ± 0%   -1.81%  (p=0.003 n=10+10)
kv95/seq=true/cores=16/nodes=3     0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv95/seq=true/cores=36/nodes=3     0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kv0/seq=true/cores=16/nodes=3      7.97 ± 2%    5.86 ± 2%  -26.44%  (p=0.000 n=9+10)
kv0/seq=true/cores=36/nodes=3      15.7 ± 0%    11.7 ± 4%  -25.61%  (p=0.000 n=8+10)

name                             old p99(ms)  new p99(ms)  delta
kv95/seq=false/cores=16/nodes=3    2.90 ± 0%    2.94 ± 2%     ~     (p=0.444 n=5+5)
kv95/seq=false/cores=36/nodes=3    3.90 ± 0%    3.98 ± 3%     ~     (p=0.444 n=5+5)
kv0/seq=false/cores=16/nodes=3     8.90 ± 0%    8.40 ± 0%   -5.62%  (p=0.000 n=10+8)
kv0/seq=false/cores=36/nodes=3     11.0 ± 0%    10.4 ± 3%   -5.91%  (p=0.000 n=10+10)
kv95/seq=true/cores=16/nodes=3     4.50 ± 0%    3.18 ± 4%  -29.33%  (p=0.000 n=4+5)
kv95/seq=true/cores=36/nodes=3     11.2 ± 3%     4.7 ± 0%  -58.04%  (p=0.008 n=5+5)
kv0/seq=true/cores=16/nodes=3      11.5 ± 0%     9.4 ± 0%  -18.26%  (p=0.000 n=9+9)
kv0/seq=true/cores=36/nodes=3      19.9 ± 0%    15.3 ± 2%  -22.86%  (p=0.000 n=9+10)
```

As expected, the majority of the improvement from this change comes when writing to a single Range (i.e. a write hotspot). In those cases, this change (and those in the following two commits) improves performance by up to **35%**.

NOTE: the Raft proposal buffer hooks into the rest of the Storage package through a fairly small and well-defined interface. The primary reason for doing so was to make the structure easy to move to a `storage/replication` package if/when we move in that direction.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@craig
Copy link
Contributor

craig bot commented Jun 26, 2019

Build succeeded

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants