-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathasserter.go
414 lines (372 loc) · 14 KB
/
asserter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package apply
import (
fmt "fmt"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
// Asserter is a test utility that tracks application of Raft commands, and
// primarily asserts that:
//
// - A request is only applied once, at a single log index and lease applied
// index across all replicas (i.e. no double-applies or replays).
//
// - Commands do not regress the Raft index/term, lease applied index, and
// closed timestamp (assuming no node restarts).
//
// - All replicas apply the same commands in the same order at the same
// positions.
type Asserter struct {
syncutil.Mutex
ranges map[roachpb.RangeID]*rangeAsserter
}
// rangeAsserter tracks and asserts application for an individual range. We
// don't make cross-range assertions.
type rangeAsserter struct {
syncutil.Mutex
rangeID roachpb.RangeID
// log tracks applied commands by their Raft log index. It may have gaps,
// represented by empty entries -- typically rejected commands that don't
// apply, or noop commands on Raft leader changes.
log []applyState
// proposedCmds tracks proposed commands by ID, along with the proposer. LAI
// reproposals under new command IDs are considered separate proposals here.
proposedCmds map[kvserverbase.CmdIDKey]roachpb.ReplicaID
// seedCmds maps a LAI reproposal command ID back to its original seed proposal.
seedCmds map[kvserverbase.CmdIDKey]kvserverbase.CmdIDKey
// seedAppliedAs tracks which command actually applied an original seed
// command, for LAI reproposals which use separate command IDs. It is tracked
// for all seed proposals, regardless of whether they are reproposed.
seedAppliedAs map[kvserverbase.CmdIDKey]kvserverbase.CmdIDKey
// appliedCmds tracks the log index at which a command is applied. LAI
// reproposals under new command IDs are tracked as separate commands here
// (only one should apply).
appliedCmds map[kvserverbase.CmdIDKey]kvpb.RaftIndex
// replicaAppliedIndex tracks the applied index of each replica.
replicaAppliedIndex map[roachpb.ReplicaID]kvpb.RaftIndex
}
type applyState struct {
cmdID kvserverbase.CmdIDKey
raftIndex kvpb.RaftIndex
raftTerm kvpb.RaftTerm
leaseAppliedIndex kvpb.LeaseAppliedIndex
closedTS hlc.Timestamp
}
var gap = applyState{}
func (l applyState) String() string {
if l == (applyState{}) {
return "<empty>"
}
return fmt.Sprintf("cmd %q index %d term %d (LAI=%d CTS=%s)",
l.cmdID, l.raftIndex, l.raftTerm, l.leaseAppliedIndex, l.closedTS)
}
// NewAsserter creates a new asserter.
func NewAsserter() *Asserter {
return &Asserter{
ranges: map[roachpb.RangeID]*rangeAsserter{},
}
}
// forRange retrieves or creates a rangeAsserter for the given range.
func (a *Asserter) forRange(rangeID roachpb.RangeID) *rangeAsserter {
a.Lock()
defer a.Unlock()
if r := a.ranges[rangeID]; r != nil {
return r
}
r := &rangeAsserter{
rangeID: rangeID,
proposedCmds: map[kvserverbase.CmdIDKey]roachpb.ReplicaID{},
seedCmds: map[kvserverbase.CmdIDKey]kvserverbase.CmdIDKey{},
seedAppliedAs: map[kvserverbase.CmdIDKey]kvserverbase.CmdIDKey{},
appliedCmds: map[kvserverbase.CmdIDKey]kvpb.RaftIndex{},
replicaAppliedIndex: map[roachpb.ReplicaID]kvpb.RaftIndex{},
}
a.ranges[rangeID] = r
return r
}
// stateAt reconstructs the applied state at the given index, by finding the
// latest non-empty value of each field at the given index. In particular, it
// finds the latest LAI and closed timestamp, which may be omitted by individual
// entries (e.g. lease requests). Gaps are skipped, so the returned raftIndex
// may be lower than the given index.
//
// In the common case, this only needs to look at the previous entry.
func (r *rangeAsserter) stateAt(index kvpb.RaftIndex) applyState {
if int(index) >= len(r.log) {
panic(fmt.Sprintf("index %d is beyond end of log at %d", index, len(r.log)-1))
}
// All entries (except gaps) have cmdID, raftIndex, and raftTerm, so we're
// done once we also find a LAI and closed timestamp.
var s applyState
for i := int(index); i >= 0 && (s.leaseAppliedIndex == 0 || s.closedTS.IsEmpty()); i-- {
e := r.log[i]
if s.cmdID == "" {
s.cmdID = e.cmdID
}
if s.raftIndex == 0 {
s.raftIndex = e.raftIndex
}
if s.raftTerm == 0 {
s.raftTerm = e.raftTerm
}
if s.leaseAppliedIndex == 0 {
s.leaseAppliedIndex = e.leaseAppliedIndex
}
if s.closedTS.IsEmpty() {
s.closedTS = e.closedTS
}
}
return s
}
// Propose tracks and asserts command proposals.
func (a *Asserter) Propose(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
cmdID, seedID kvserverbase.CmdIDKey,
cmd *kvserverpb.RaftCommand,
req *kvpb.BatchRequest,
) {
a.forRange(rangeID).propose(replicaID, cmdID, seedID, cmd, req)
}
func (r *rangeAsserter) propose(
replicaID roachpb.ReplicaID,
cmdID, seedID kvserverbase.CmdIDKey,
cmd *kvserverpb.RaftCommand,
req *kvpb.BatchRequest,
) {
fail := func(msg string, args ...interface{}) {
panic(fmt.Sprintf("r%d/%d cmd %s: %s (%s)",
r.rangeID, replicaID, cmdID, fmt.Sprintf(msg, args...), req))
}
// INVARIANT: all proposals have a command ID.
if len(cmdID) == 0 {
fail("proposed command without ID")
}
if req.IsSingleRequestLeaseRequest() {
// INVARIANT: lease requests never set a LAI or closed timestamp. These have
// their own replay protection (using a conditional write).
//
// TODO(erikgrinaker): consider assertions around lease request replays.
if cmd.MaxLeaseIndex != 0 || cmd.ClosedTimestamp != nil {
fail("lease request proposal with LAI=%s CTS=%s", cmd.MaxLeaseIndex, cmd.ClosedTimestamp)
}
} else {
// INVARIANT: all non-lease-request proposals have a LAI.
if cmd.MaxLeaseIndex == 0 {
fail("proposal without LAI")
}
}
r.Lock()
defer r.Unlock()
// INVARIANT: a command can only be proposed by a single replica. The same
// command may be reproposed under the same ID by the same replica.
if proposedBy, ok := r.proposedCmds[cmdID]; ok && replicaID != proposedBy {
fail("originally proposed by different replica %d", proposedBy)
}
r.proposedCmds[cmdID] = replicaID
// Check and track LAI reproposals. These use a different command ID than the
// original seed proposal.
if seedID != "" {
if seedProposedBy, ok := r.proposedCmds[seedID]; !ok {
// INVARIANT: a LAI reproposal must reference a previous seed proposal.
fail("unknown seed proposal %s", seedID)
} else if seedProposedBy != replicaID {
// INVARIANT: a LAI reproposal must be made by the seed replica.
fail("seed proposal %s by different replica %d", seedID, seedProposedBy)
}
if s, ok := r.seedCmds[cmdID]; ok && s != seedID {
// INVARIANT: a reproposal of a LAI reproposal must always reference the
// same seed proposal.
fail("expected seed proposal %s, got %s", s, seedID)
}
r.seedCmds[cmdID] = seedID
}
}
// Apply tracks and asserts command application. Rejected commands (e.g. via LAI
// checks) are not considered applied. The command must be reported via
// Propose() first.
func (a *Asserter) Apply(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
cmdID kvserverbase.CmdIDKey,
entry raftpb.Entry,
leaseAppliedIndex kvpb.LeaseAppliedIndex,
closedTS hlc.Timestamp,
) {
a.forRange(rangeID).apply(replicaID, cmdID, entry, leaseAppliedIndex, closedTS)
}
func (r *rangeAsserter) apply(
replicaID roachpb.ReplicaID,
cmdID kvserverbase.CmdIDKey,
raftEntry raftpb.Entry,
leaseAppliedIndex kvpb.LeaseAppliedIndex,
closedTS hlc.Timestamp,
) {
entry := applyState{
cmdID: cmdID,
raftIndex: kvpb.RaftIndex(raftEntry.Index),
raftTerm: kvpb.RaftTerm(raftEntry.Term),
leaseAppliedIndex: leaseAppliedIndex,
closedTS: closedTS,
}
fail := func(msg string, args ...interface{}) {
panic(fmt.Sprintf("r%d/%d: %s (%s)\ndata: %x",
r.rangeID, replicaID, fmt.Sprintf(msg, args...), entry, raftEntry.Data))
}
// INVARIANT: all commands have a command ID. etcd/raft may commit noop
// proposals on leader changes that do not have a command ID, but we skip
// these during application.
if len(cmdID) == 0 {
fail("applied command without ID")
}
r.Lock()
defer r.Unlock()
// INVARIANT: a command must be proposed before it can apply.
if _, ok := r.proposedCmds[cmdID]; !ok {
fail("command was not proposed")
}
if int(entry.raftIndex) < len(r.log) {
// INVARIANT: all replicas must apply the same log entry at the same index.
if e := r.log[int(entry.raftIndex)]; e != entry {
fail("applied entry differs from existing log entry: %s", e)
}
} else {
// The entry is not yet tracked in the log.
// INVARIANT: entries may not regress the applied state.
if len(r.log) > 0 {
s := r.stateAt(kvpb.RaftIndex(len(r.log) - 1))
// INVARIANT: the Raft index must progress.
//
// This is trivially true since we're appending to the log, which is
// indexed by the Raft log index. We assert it anyway, as documentation.
if entry.raftIndex <= s.raftIndex {
fail("Raft index regression %d -> %d", s.raftIndex, entry.raftIndex)
}
// INVARIANT: the Raft term must not regress.
if entry.raftTerm < s.raftTerm {
fail("Raft term regression %d -> %d", s.raftTerm, entry.raftTerm)
}
// INVARIANT: the lease applied index must progress. Lease requests don't
// carry a LAI (asserted in Propose).
if entry.leaseAppliedIndex > 0 && entry.leaseAppliedIndex <= s.leaseAppliedIndex {
fail("lease applied index regression %d -> %d",
s.leaseAppliedIndex, entry.leaseAppliedIndex)
}
// INVARIANT: the closed timestamp must not regress. Lease requests don't
// carry a closed timestamp (asserted in Propose).
if entry.closedTS.IsSet() && entry.closedTS.Less(s.closedTS) {
fail("closed timestamp regression %s -> %s", s.closedTS, entry.closedTS)
}
}
// Append the entry, and insert gaps as necessary -- e.g. due to rejected
// commands or etcd/raft noop commands on leader changes.
for i := len(r.log); i < int(entry.raftIndex); i++ {
r.log = append(r.log, applyState{}) // insert gap
}
r.log = append(r.log, entry)
}
// INVARIANT: the replica's applied index must progress.
if i := r.replicaAppliedIndex[replicaID]; entry.raftIndex < i {
fail("applied index regression %d -> %d", i, entry.raftIndex)
}
// INVARIANT: the replica must apply all commands, sequentially (except
// when applying a snapshot).
for i := r.replicaAppliedIndex[replicaID] + 1; i < entry.raftIndex; i++ {
if e := r.log[i]; e != gap { // ignore gaps
fail("replica skipped log entry: %s", e)
}
}
r.replicaAppliedIndex[replicaID] = entry.raftIndex
// INVARIANT: a given command must at most apply at a single index.
if appliedIndex, ok := r.appliedCmds[cmdID]; ok && appliedIndex != entry.raftIndex {
fail("command already applied at %s", r.log[appliedIndex])
}
r.appliedCmds[cmdID] = entry.raftIndex
// INVARIANT: a command is only applied under a single command ID, even across
// multiple LAI reproposals using different command IDs.
seedID, ok := r.seedCmds[cmdID]
if !ok {
// This is not a LAI reproposal, so it's a seed proposal. It may or may not
// have seen LAI reproposals.
seedID = cmdID
}
if appliedAs, ok := r.seedAppliedAs[seedID]; ok && appliedAs != cmdID {
fail("command already applied as %s at %s", appliedAs, r.log[r.appliedCmds[appliedAs]])
}
r.seedAppliedAs[seedID] = cmdID
}
// ApplySnapshot tracks and asserts snapshot application.
func (a *Asserter) ApplySnapshot(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
sender roachpb.ReplicaID,
index kvpb.RaftIndex,
term kvpb.RaftTerm,
leaseAppliedIndex kvpb.LeaseAppliedIndex,
closedTS hlc.Timestamp,
) {
a.forRange(rangeID).applySnapshot(replicaID, sender, index, term, leaseAppliedIndex, closedTS)
}
func (r *rangeAsserter) applySnapshot(
replicaID roachpb.ReplicaID,
sender roachpb.ReplicaID,
index kvpb.RaftIndex,
term kvpb.RaftTerm,
leaseAppliedIndex kvpb.LeaseAppliedIndex,
closedTS hlc.Timestamp,
) {
state := applyState{
raftIndex: index,
raftTerm: term,
leaseAppliedIndex: leaseAppliedIndex,
closedTS: closedTS,
}
fail := func(msg string, args ...interface{}) {
panic(fmt.Sprintf("r%d/%d snapshot from %d at index %d: %s (%s)",
r.rangeID, replicaID, sender, index, fmt.Sprintf(msg, args...), state))
}
r.Lock()
defer r.Unlock()
// INVARIANT: we can't have a snapshot without any applied log entries.
if len(r.log) == 0 {
fail("snapshot before any log entries applied")
}
// INVARIANT: a snapshot must progress the replica's applied index.
if ri := r.replicaAppliedIndex[replicaID]; index <= ri {
fail("replica applied index regression: %d -> %d", ri, index)
}
r.replicaAppliedIndex[replicaID] = index
// INVARIANT: a snapshot must match the applied state at the given Raft index.
//
// The snapshot may point beyond the log or to a gap in our log because of
// rejected or noop commands, in which case we match it against the state of
// the latest applied command before the snapshot index.
logIndex := index
if lastIndex := kvpb.RaftIndex(len(r.log) - 1); logIndex > lastIndex {
logIndex = lastIndex
}
logState := r.stateAt(logIndex)
logState.cmdID = "" // not known for snapshot, ignore it
if state.raftIndex > logState.raftIndex && state.raftTerm >= logState.raftTerm {
// Snapshot pointed to an unknown entry, most likely a rejected command.
// Compare it with the latest applied command.
state.raftIndex = logState.raftIndex
state.raftTerm = logState.raftTerm
}
if state != logState {
fail("state differs from log state: %s", state)
}
}