Skip to content

Commit

Permalink
kvserver: use must in replica_raft.go
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jul 21, 2023
1 parent cd612cb commit 20bef10
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 111 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/mon",
"//pkg/util/must",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/client_manual_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type wgSyncCallback sync.WaitGroup

func (w *wgSyncCallback) OnLogSync(
ctx context.Context, messages []raftpb.Message, stats storage.BatchCommitStats,
) {
) error {
(*sync.WaitGroup)(w).Done()
return nil
}
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type LogStore struct {
// are associated with the MsgStorageAppend that triggered the fsync.
// commitStats is populated iff this was a non-blocking sync.
type SyncCallback interface {
OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats)
OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) error
}

func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
Expand Down Expand Up @@ -279,7 +279,9 @@ func (s *LogStore) storeEntriesAndCommitBatch(
if wantsSync {
logCommitEnd := stats.PebbleEnd
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{})
if err := cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{}); err != nil {
return RaftState{}, err
}
}
}
stats.Sync = wantsSync
Expand Down Expand Up @@ -340,12 +342,13 @@ type nonBlockingSyncWaiterCallback struct {
}

// run is the callback's logic. It is executed on the SyncWaiterLoop goroutine.
func (cb *nonBlockingSyncWaiterCallback) run() {
func (cb *nonBlockingSyncWaiterCallback) run() error {
dur := timeutil.Since(cb.logCommitBegin).Nanoseconds()
cb.metrics.RaftLogCommitLatency.RecordValue(dur)
commitStats := cb.batch.CommitStats()
cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats)
err := cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats)
cb.release()
return err
}

func (cb *nonBlockingSyncWaiterCallback) release() {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func (b *discardBatch) Commit(bool) error {

type noopSyncCallback struct{}

func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) {}
func (noopSyncCallback) OnLogSync(
context.Context, []raftpb.Message, storage.BatchCommitStats,
) error {
return nil
}

func BenchmarkLogStore_StoreEntries(b *testing.B) {
defer log.Scope(b).Close(b)
Expand Down
24 changes: 0 additions & 24 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,30 +224,6 @@ func MaybeInlineSideloadedRaftCommand(
return &ent, nil
}

// AssertSideloadedRaftCommandInlined asserts that if the provided entry is a
// sideloaded entry, then its payload has already been inlined. Doing so
// requires unmarshalling the raft command, so this assertion should be kept out
// of performance critical paths.
func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) {
typ, err := raftlog.EncodingOf(*ent)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
if !typ.IsSideloaded() {
return
}

e, err := raftlog.NewEntry(*ent)
if err != nil {
log.Fatalf(ctx, "%v", err)
}

if len(e.Cmd.ReplicatedEvalResult.AddSSTable.Data) == 0 {
// The entry is "thin", which is what this assertion is checking for.
log.Fatalf(ctx, "found thin sideloaded raft command: %+v", e.Cmd)
}
}

// maybePurgeSideloaded removes [firstIndex, ..., lastIndex] at the given term
// and returns the total number of bytes removed. Nonexistent entries are
// silently skipped over.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/logstore/sync_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ syncWaiter = storage.Batch(nil)
// then pool the allocation of that object.
type syncWaiterCallback interface {
// run executes the callback.
run()
run() error
}

// SyncWaiterLoop waits on a sequence of in-progress disk writes, notifying
Expand Down Expand Up @@ -95,7 +95,9 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
if err := w.wg.SyncWait(); err != nil {
log.Fatalf(ctx, "SyncWait error: %+v", err)
}
w.cb.run()
if err := w.cb.run(); err != nil {
log.Fatalf(ctx, "%+v", err)
}
w.wg.Close()
case <-stopper.ShouldQuiesce():
return
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/logstore/sync_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ func (c chanSyncWaiter) Close() {}
// funcSyncWaiterCallback implements the syncWaiterCallback interface.
type funcSyncWaiterCallback func()

func (f funcSyncWaiterCallback) run() { f() }
func (f funcSyncWaiterCallback) run() error {
f()
return nil
}
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
// described above, this will cause raft to keep asking for a snap and at
// some point the snapshot lock above will be released and we'll fall
// through to the logic below.
repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err)
return false, nil
if reportErr := repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err); reportErr != nil {
return false, reportErr
}
return false, err
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2782,7 +2782,9 @@ func (r *Replica) sendSnapshotUsingDelegate(
defer func() {
// Report the snapshot status to Raft, which expects us to do this once we
// finish sending the snapshot.
r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr)
if err := r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr); err != nil {
retErr = err
}
}()

r.mu.RLock()
Expand Down
Loading

0 comments on commit 20bef10

Please sign in to comment.