-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
store_split.go
357 lines (330 loc) · 15.5 KB
/
store_split.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"bytes"
"context"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
)
// splitPreApply is called when the raft command is applied. Any
// changes to the given ReadWriter will be written atomically with the
// split commit.
func splitPreApply(
ctx context.Context,
readWriter storage.ReadWriter,
split roachpb.SplitTrigger,
r *Replica,
// The closed timestamp used to initialize the RHS.
closedTS hlc.Timestamp,
) {
// Sanity check that the store is in the split.
//
// The exception to that is if the DisableEagerReplicaRemoval testing flag is
// enabled.
//
// TODO(ajwerner): rethink DisableEagerReplicaRemoval and remove this in
// 20.1 after there are no more preemptive snapshots.
_, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
_, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID())
if !hasRightDesc || !hasLeftDesc {
log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v",
r.StoreID(), split)
}
// Check on the RHS, we need to ensure that it exists and has a minReplicaID
// less than or equal to the replica we're about to initialize.
//
// The right hand side of the split was already created (and its raftMu
// acquired) in Replica.acquireSplitLock. It must be present here if it hasn't
// been removed in the meantime (handled below).
rightRepl, err := r.store.GetReplica(split.RightDesc.RangeID)
if roachpb.IsRangeNotFoundError(err) {
// The right hand side we were planning to populate has already been removed.
// We handle this below.
rightRepl = nil
} else if err != nil {
log.Fatalf(ctx, "failed to get RHS replica: %v", err)
}
// Check to see if we know that the RHS has already been removed from this
// store at the replica ID implied by the split.
if rightRepl == nil || rightRepl.isNewerThanSplit(&split) {
// We're in the rare case where we know that the RHS has been removed
// and re-added with a higher replica ID (and then maybe removed again).
//
// To apply the split, we need to "throw away" the data that would belong to
// the RHS, i.e. we clear the user data the RHS would have inherited from the
// LHS due to the split and additionally clear all of the range ID local state
// that the split trigger writes into the RHS.
//
// We know we've never processed a snapshot for the right range because the
// LHS prevents any incoming snapshots until the split has executed (i.e. now).
// It is important to preserve the HardState because we might however have
// already voted at a higher term. In general this shouldn't happen because
// we add learners and then promote them only after we snapshot but we're
// going to be extra careful in case future versions of cockroach somehow
// promote replicas without ensuring that a snapshot has been received.
//
// Rather than specifically deleting around the data we want to preserve
// we read the HardState to preserve it, clear everything and write back
// the HardState and tombstone. Note that we only do this if rightRepl
// exists; if it doesn't, there's no Raft state to massage (when rightRepl
// was removed, a tombstone was written instead).
var hs raftpb.HardState
if rightRepl != nil {
// Assert that the rightRepl is not initialized. We're about to clear out
// the data of the RHS of the split; we cannot have already accepted a
// snapshot to initialize this newer RHS.
if rightRepl.IsInitialized() {
log.Fatalf(ctx, "unexpectedly found initialized newer RHS of split: %v", rightRepl.Desc())
}
hs, err = rightRepl.raftMu.stateLoader.LoadHardState(ctx, readWriter)
if err != nil {
log.Fatalf(ctx, "failed to load hard state for removed rhs: %v", err)
}
}
const rangeIDLocalOnly = false
const mustUseClearRange = false
if err := clearRangeData(&split.RightDesc, readWriter, readWriter, rangeIDLocalOnly, mustUseClearRange); err != nil {
log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err)
}
if rightRepl != nil {
if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil {
log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err)
}
}
return
}
// Update the raft HardState with the new Commit value now that the
// replica is initialized (combining it with existing or default
// Term and Vote). This is the common case.
rsl := stateloader.Make(split.RightDesc.RangeID)
if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil {
log.Fatalf(ctx, "%v", err)
}
rsl.SetClosedTimestamp(ctx, readWriter, closedTS)
// The initialMaxClosed is assigned to the RHS replica to ensure that
// follower reads do not regress following the split. After the split occurs
// there will be no information in the closedts subsystem about the newly
// minted RHS range from its leaseholder's store. Furthermore, the RHS will
// have a lease start time equal to that of the LHS which might be quite
// old. This means that timestamps which follow the least StartTime for the
// LHS part are below the current closed timestamp for the LHS would no
// longer be readable on the RHS after the split.
//
// It is necessary for correctness that the call to MaxClosedTimestamp used to
// determine the current closed timestamp happens during the splitPreApply
// so that it uses a LAI that is _before_ the index at which this split is
// applied. If it were to refer to a LAI equal to or after the split then
// the value of initialMaxClosed might be unsafe.
//
// Concretely, any closed timestamp based on an LAI that is equal to or
// above the split index might be larger than the initial closed timestamp
// assigned to the RHS range's initial leaseholder. This is because the LHS
// range's leaseholder could continue closing out timestamps at the split's
// LAI after applying the split. Slow followers in that range could hear
// about these closed timestamp notifications before applying the split
// themselves. If these slow followers were allowed to pass these closed
// timestamps created after the split to the RHS replicas they create during
// the application of the split then these RHS replicas might end up with
// initialMaxClosed values above their current range's official closed
// timestamp. The leaseholder of the RHS range could then propose a write at
// a timestamp below this initialMaxClosed, violating the closed timestamp
// systems most important property.
//
// Using an LAI from before the index at which this split is applied avoids
// the hazard and ensures that no replica on the RHS is created with an
// initialMaxClosed that could be violated by a proposal on the RHS's
// initial leaseholder. See #44878.
initialMaxClosed, _ := r.MaxClosedTimestamp(ctx)
rightRepl.mu.Lock()
rightRepl.mu.initialMaxClosed = initialMaxClosed
// !!! initialize propBuf.closedTS here? Don't think so, splitPostApply() will do it through leasePostApply().
rightRepl.mu.Unlock()
}
// splitPostApply is the part of the split trigger which coordinates the actual
// split with the Store. Requires that Replica.raftMu is held. The deltaMS are
// the MVCC stats which apply to the RHS and have already been removed from the
// LHS.
func splitPostApply(
ctx context.Context, deltaMS enginepb.MVCCStats, split *roachpb.SplitTrigger, r *Replica,
) {
// rightReplOrNil will be nil if the RHS replica at the ID of the split is
// already known to be removed, generally because we know that this store has
// been re-added at a higher replica ID.
rightReplOrNil := prepareRightReplicaForSplit(ctx, split, r)
// Add the RHS replica to the store. This step atomically updates
// the EndKey of the LHS replica and also adds the RHS replica
// to the store's replica map.
if err := r.store.SplitRange(ctx, r, rightReplOrNil, split); err != nil {
// Our in-memory state has diverged from the on-disk state.
log.Fatalf(ctx, "%s: failed to update Store after split: %+v", r, err)
}
// Update store stats with difference in stats before and after split.
if rightReplOrNil != nil {
if tenantID, ok := rightReplOrNil.TenantID(); ok {
rightReplOrNil.store.metrics.addMVCCStats(ctx, tenantID, deltaMS)
} else {
log.Fatalf(ctx, "%s: found replica which is RHS of a split "+
"without a valid tenant ID", rightReplOrNil)
}
}
now := r.store.Clock().NowAsClockTimestamp()
// While performing the split, zone config changes or a newly created table
// might require the range to be split again. Enqueue both the left and right
// ranges to speed up such splits. See #10160.
r.store.splitQueue.MaybeAddAsync(ctx, r, now)
// If the range was not properly replicated before the split, the replicate
// queue may not have picked it up (due to the need for a split). Enqueue
// both the left and right ranges to speed up a potentially necessary
// replication. See #7022 and #7800.
r.store.replicateQueue.MaybeAddAsync(ctx, r, now)
if rightReplOrNil != nil {
r.store.splitQueue.MaybeAddAsync(ctx, rightReplOrNil, now)
r.store.replicateQueue.MaybeAddAsync(ctx, rightReplOrNil, now)
if len(split.RightDesc.Replicas().Descriptors()) == 1 {
// TODO(peter): In single-node clusters, we enqueue the right-hand side of
// the split (the new range) for Raft processing so that the corresponding
// Raft group is created. This shouldn't be necessary for correctness, but
// some tests rely on this (e.g. server.TestNodeStatusWritten).
r.store.enqueueRaftUpdateCheck(rightReplOrNil.RangeID)
}
}
}
// prepareRightReplicaForSplit a helper for splitPostApply.
// Requires that r.raftMu is held.
func prepareRightReplicaForSplit(
ctx context.Context, split *roachpb.SplitTrigger, r *Replica,
) (rightReplicaOrNil *Replica) {
// The right hand side of the split was already created (and its raftMu
// acquired) in Replica.acquireSplitLock. It must be present here.
rightRepl, err := r.store.GetReplica(split.RightDesc.RangeID)
// If the RHS replica at the point of the split was known to be removed
// during the application of the split then we may not find it here. That's
// fine, carry on. See also:
_, _ = r.acquireSplitLock, splitPostApply
if roachpb.IsRangeNotFoundError(err) {
return nil
}
if err != nil {
log.Fatalf(ctx, "unable to find RHS replica: %+v", err)
}
// Already holding raftMu, see above.
rightRepl.mu.Lock()
// If we know that the RHS has already been removed at this replica ID
// then we also know that its data has already been removed by the preApply
// so we skip initializing it as the RHS of the split.
if rightRepl.isNewerThanSplitRLocked(split) {
rightRepl.mu.Unlock()
return nil
}
// Finish initialization of the RHS.
err = rightRepl.loadRaftMuLockedReplicaMuLocked(&split.RightDesc)
rightRepl.mu.Unlock()
if err != nil {
log.Fatalf(ctx, "%v", err)
}
// Copy the minLeaseProposedTS from the LHS and grab the RHS's lease.
r.mu.RLock()
rightRepl.mu.Lock()
rightRepl.mu.minLeaseProposedTS = r.mu.minLeaseProposedTS
rightLease := *rightRepl.mu.state.Lease
rightRepl.mu.Unlock()
r.mu.RUnlock()
// We need to explicitly wake up the Raft group on the right-hand range or
// else the range could be underreplicated for an indefinite period of time.
//
// Specifically, suppose one of the replicas of the left-hand range never
// applies this split trigger, e.g., because it catches up via a snapshot that
// advances it past this split. That store won't create the right-hand replica
// until it receives a Raft message addressed to the right-hand range. But
// since new replicas start out quiesced, unless we explicitly awaken the
// Raft group, there might not be any Raft traffic for quite a while.
err = rightRepl.withRaftGroup(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
return true, nil
})
if err != nil {
log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %+v", err)
}
// Invoke the leasePostApply method to ensure we properly initialize
// the replica according to whether it holds the lease. This enables
// the txnWaitQueue.
rightRepl.mu.Lock()
defer rightRepl.mu.Unlock()
rightRepl.leasePostApplyLocked(ctx, rightLease, false /* permitJump */)
return rightRepl
}
// SplitRange shortens the original range to accommodate the new range. The new
// range is added to the ranges map and the replicasByKey btree. origRng.raftMu
// and newRng.raftMu must be held.
//
// This is only called from the split trigger in the context of the execution
// of a Raft command. Note that rightRepl will be nil if the replica described
// by rightDesc is known to have been removed.
func (s *Store) SplitRange(
ctx context.Context, leftRepl, rightReplOrNil *Replica, split *roachpb.SplitTrigger,
) error {
rightDesc := &split.RightDesc
newLeftDesc := &split.LeftDesc
oldLeftDesc := leftRepl.Desc()
if !bytes.Equal(oldLeftDesc.EndKey, rightDesc.EndKey) ||
bytes.Compare(oldLeftDesc.StartKey, rightDesc.StartKey) >= 0 {
return errors.Errorf("left range is not splittable by right range: %+v, %+v", oldLeftDesc, rightDesc)
}
s.mu.Lock()
defer s.mu.Unlock()
if exRng, ok := s.mu.uninitReplicas[rightDesc.RangeID]; rightReplOrNil != nil && ok {
// If we have an uninitialized replica of the new range we require pointer
// equivalence with rightRepl. See Store.splitTriggerPostApply().
if exRng != rightReplOrNil {
log.Fatalf(ctx, "found unexpected uninitialized replica: %s vs %s", exRng, rightReplOrNil)
}
// NB: We only remove from uninitReplicas and the replicaQueues maps here
// so that we don't leave open a window where a replica is temporarily not
// present in Store.mu.replicas.
delete(s.mu.uninitReplicas, rightDesc.RangeID)
s.replicaQueues.Delete(int64(rightDesc.RangeID))
}
leftRepl.setDescRaftMuLocked(ctx, newLeftDesc)
// Clear the LHS lock and txn wait-queues, to redirect to the RHS if
// appropriate. We do this after setDescWithoutProcessUpdate to ensure
// that no pre-split commands are inserted into the wait-queues after we
// clear them.
leftRepl.concMgr.OnRangeSplit()
// Clear the original range's request stats, since they include requests for
// spans that are now owned by the new range.
leftRepl.leaseholderStats.resetRequestCounts()
if rightReplOrNil == nil {
throwawayRightWriteStats := new(replicaStats)
leftRepl.writeStats.splitRequestCounts(throwawayRightWriteStats)
} else {
rightRepl := rightReplOrNil
leftRepl.writeStats.splitRequestCounts(rightRepl.writeStats)
if err := s.addReplicaInternalLocked(rightRepl); err != nil {
return errors.Errorf("unable to add replica %v: %s", rightRepl, err)
}
// Update the replica's cached byte thresholds. This is a no-op if the system
// config is not available, in which case we rely on the next gossip update
// to perform the update.
if err := rightRepl.updateRangeInfo(rightRepl.Desc()); err != nil {
return err
}
// Add the range to metrics and maybe gossip on capacity change.
s.metrics.ReplicaCount.Inc(1)
s.maybeGossipOnCapacityChange(ctx, rangeAddEvent)
}
return nil
}