Skip to content

Commit 61f7b3c

Browse files
committed
kvserver: implement raftMu-based raft log snapshot
This commit implements the raft.LogStorageSnapshot interface in kvserver. The implementation behaves like a consistent log storage snapshot while Replica.raftMu is held, relying on the fact that all raft log storage writes are synchronized via this mutex. Epic: none Release note: none
1 parent 4994648 commit 61f7b3c

File tree

2 files changed

+100
-18
lines changed

2 files changed

+100
-18
lines changed

pkg/kv/kvserver/replica_raftlog.go

+100-15
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,108 @@ func (r *Replica) GetFirstIndex() kvpb.RaftIndex {
175175
}
176176

177177
// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
178-
// Requires that r.raftMu is held for writing, and r.mu for reading.
178+
//
179+
// Requires that r.raftMu is held for writing, and r.mu for reading. In
180+
// addition, r.raftMu must be held continuously throughout the lifetime of the
181+
// returned snapshot.
179182
func (r *replicaLogStorage) LogSnapshot() raft.LogStorageSnapshot {
180183
r.raftMu.AssertHeld()
181184
r.mu.AssertRHeld()
182-
// TODO(pav-kv): return a wrapper which, in all methods, checks that the log
183-
// storage hasn't been written to. A more relaxed version of it should assert
184-
// that only the relevant part of the log hasn't been overwritten, e.g. a new
185-
// term leader hasn't appended a log slice that truncated the log, or the log
186-
// hasn't been wiped.
187-
//
188-
// This would require auditing and integrating with the write paths. Today,
189-
// this type implements only reads, and writes are in various places like the
190-
// logstore.LogStore type, or the code in the split handler which creates an
191-
// empty range state.
192-
//
193-
// We don't need a fully fledged Pebble snapshot here. For our purposes, we
194-
// can also make sure that raftMu is held for the entire period of using the
195-
// LogSnapshot - this should guarantee its immutability.
185+
return (*replicaRaftMuLogSnap)(r)
186+
}
187+
188+
// replicaRaftMuLogSnap implements the raft.LogStorageSnapshot interface.
189+
//
190+
// The type implements a limited version of a raft log storage snapshot, without
191+
// needing a storage engine snapshot. It relies on r.raftMu being held
192+
// throughout the raft.LogStorageSnapshot lifetime. Since raft writes are
193+
// blocked while r.raftMu is held, this type behaves like a consistent storage
194+
// snapshot until r.raftMu is released.
195+
//
196+
// TODO(pav-kv): equip this wrapper with correctness checks, e.g. that the log
197+
// storage hasn't been written to while we hold a snapshot. A more relaxed
198+
// version of it should assert that only the relevant part of the log hasn't
199+
// been overwritten, e.g. a new term leader hasn't appended a log slice that
200+
// truncates the log and overwrites log indices in our snapshot.
201+
//
202+
// This would require auditing and integrating with the write paths. Today, this
203+
// type implements only reads, and writes are in various places like the
204+
// logstore.LogStore type, or applySnapshot.
205+
type replicaRaftMuLogSnap replicaLogStorage
206+
207+
// Entries implements the raft.LogStorageSnapshot interface.
208+
// Requires that r.raftMu is held.
209+
func (r *replicaRaftMuLogSnap) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
210+
entries, err := r.entriesRaftMuLocked(
211+
kvpb.RaftIndex(lo), kvpb.RaftIndex(hi), maxBytes)
212+
if err != nil {
213+
(*replicaLogStorage)(r).reportRaftStorageError(err)
214+
}
215+
return entries, err
216+
}
217+
218+
// entriesRaftMuLocked implements the Entries() call.
219+
func (r *replicaRaftMuLogSnap) entriesRaftMuLocked(
220+
lo, hi kvpb.RaftIndex, maxBytes uint64,
221+
) ([]raftpb.Entry, error) {
222+
// NB: writes to the storage engine and the sideloaded storage are made under
223+
// raftMu only, so we are not racing with new writes. In addition, raft never
224+
// tries to read "unstable" entries that correspond to ongoing writes.
225+
r.raftMu.AssertHeld()
226+
// TODO(pav-kv): de-duplicate this code and the one where r.mu must be held.
227+
entries, _, loadedSize, err := logstore.LoadEntries(
228+
r.AnnotateCtx(context.TODO()),
229+
r.raftMu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
230+
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes,
231+
&r.raftMu.bytesAccount,
232+
)
233+
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
234+
return entries, err
235+
}
236+
237+
// Term implements the raft.LogStorageSnapshot interface.
238+
// Requires that r.raftMu is held.
239+
func (r *replicaRaftMuLogSnap) Term(i uint64) (uint64, error) {
240+
term, err := r.termRaftMuLocked(kvpb.RaftIndex(i))
241+
if err != nil {
242+
(*replicaLogStorage)(r).reportRaftStorageError(err)
243+
}
244+
return uint64(term), err
245+
}
246+
247+
// termRaftMuLocked implements the Term() call.
248+
func (r *replicaRaftMuLogSnap) termRaftMuLocked(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
249+
r.raftMu.AssertHeld()
250+
// NB: the r.mu fields accessed here are always written under both r.raftMu
251+
// and r.mu, and the reads are safe under r.raftMu.
252+
if r.mu.lastIndexNotDurable == i && r.mu.lastTermNotDurable != invalidLastTerm {
253+
return r.mu.lastTermNotDurable, nil
254+
}
255+
return logstore.LoadTerm(r.AnnotateCtx(context.TODO()),
256+
r.raftMu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
257+
r.store.raftEntryCache, i,
258+
)
259+
}
260+
261+
// LastIndex implements the raft.LogStorageSnapshot interface.
262+
// Requires that r.raftMu is held.
263+
func (r *replicaRaftMuLogSnap) LastIndex() uint64 {
264+
// NB: lastIndexNotDurable is updated under both r.raftMu and r.mu, so it is
265+
// safe to access while holding any of these mutexes. We enforce raftMu
266+
// because this is a raftMu-based snapshot.
267+
r.raftMu.AssertHeld()
268+
return uint64(r.mu.lastIndexNotDurable)
269+
}
270+
271+
// FirstIndex implements the raft.LogStorageSnapshot interface.
272+
// Requires that r.raftMu is held.
273+
func (r *replicaRaftMuLogSnap) FirstIndex() uint64 {
274+
r.raftMu.AssertHeld()
275+
// r.mu.state is mutated both under r.raftMu and r.mu, so the access is safe.
276+
return uint64(r.mu.state.TruncatedState.Index + 1)
277+
}
278+
279+
// LogSnapshot implements the raft.LogStorageSnapshot interface.
280+
func (r *replicaRaftMuLogSnap) LogSnapshot() raft.LogStorageSnapshot {
196281
return r
197282
}

pkg/kv/kvserver/replica_raftstorage.go

-3
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,6 @@ var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
110110
// means that both the original and the snapshot RawNode remain consistent. The
111111
// shallow copy represents a valid past state of the RawNode.
112112
//
113-
// TODO(pav-kv): the snapshotting with only r.raftMu held is not implemented,
114-
// but should be done soon.
115-
//
116113
// All the implementation methods assume that the required locks are held, and
117114
// don't acquire them. The specific locking requirements are noted in each
118115
// method's comment. The method names do not follow our "Locked" naming

0 commit comments

Comments
 (0)