Skip to content

Commit

Permalink
storage: refactor log truncation index computation
Browse files Browse the repository at this point in the history
This makes it a lot easier to log descriptive debug messages indicating
how a truncation decision was arrived at, and in particular allows
pointing the finger at truncations that lead to Raft snapshots, which
is relevant in the context of cockroachdb#32046.

Release note: None
  • Loading branch information
tbg committed Nov 12, 2018
1 parent b085fa4 commit 4ac90bb
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 122 deletions.
241 changes: 168 additions & 73 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@ package storage

import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"

"github.com/pkg/errors"
"go.etcd.io/etcd/raft"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const (
Expand All @@ -55,12 +58,15 @@ const (
type raftLogQueue struct {
*baseQueue
db *client.DB

logSnapshots util.EveryN
}

// newRaftLogQueue returns a new instance of raftLogQueue.
func newRaftLogQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *raftLogQueue {
rlq := &raftLogQueue{
db: db,
db: db,
logSnapshots: util.Every(10 * time.Second),
}
rlq.baseQueue = newBaseQueue(
"raftlog", rlq, store, gossip,
Expand All @@ -79,29 +85,11 @@ func newRaftLogQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *raftLo
return rlq
}

func shouldTruncate(truncatableIndexes uint64, raftLogSize int64) bool {
return truncatableIndexes >= RaftLogQueueStaleThreshold ||
(truncatableIndexes > 0 && raftLogSize >= RaftLogQueueStaleSize)
}

// getTruncatableIndexes returns the number of truncatable indexes, the oldest
// index that cannot be truncated, and the current Raft log size. See
// computeTruncatableIndex.
func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int64, error) {
// newTruncateDecision returns a truncateDecision for the given Replica if no
// error occurs. If no truncation can be carried out, a zero decision is
// returned.
func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, error) {
rangeID := r.RangeID
raftStatus := r.RaftStatus()
if raftStatus == nil {
if log.V(6) {
log.Infof(ctx, "the raft group doesn't exist for r%d", rangeID)
}
return 0, 0, 0, nil
}

// Is this the raft leader? We only perform log truncation on the raft leader
// which has the up to date info on followers.
if raftStatus.RaftState != raft.StateLeader {
return 0, 0, 0, nil
}

r.mu.Lock()
raftLogSize := r.mu.raftLogSize
Expand All @@ -119,21 +107,129 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int
if targetSize > r.store.cfg.RaftLogTruncationThreshold {
targetSize = r.store.cfg.RaftLogTruncationThreshold
}
raftStatus := r.raftStatusRLocked()

firstIndex, err := r.raftFirstIndexLocked()
pendingSnapshotIndex := r.mu.pendingSnapshotIndex
lastIndex := r.mu.lastIndex
r.mu.Unlock()

if err != nil {
return 0, 0, 0, errors.Errorf("error retrieving first index for r%d: %s", rangeID, err)
return nil, errors.Errorf("error retrieving first index for r%d: %s", rangeID, err)
}

if raftStatus == nil {
if log.V(6) {
log.Infof(ctx, "the raft group doesn't exist for r%d", rangeID)
}
return &truncateDecision{}, nil
}

// Is this the raft leader? We only perform log truncation on the raft leader
// which has the up to date info on followers.
if raftStatus.RaftState != raft.StateLeader {
return &truncateDecision{}, nil
}

input := truncateDecisionInput{
RaftStatus: raftStatus,
LogSize: raftLogSize,
MaxLogSize: targetSize,
FirstIndex: firstIndex,
LastIndex: lastIndex,
PendingPreemptiveSnapshotIndex: pendingSnapshotIndex,
}

decision := computeTruncateDecision(input)
return &decision, nil
}

const (
truncatableIndexChosenViaQuorumIndex = "quorum"
truncatableIndexChosenViaFollowers = "followers"
truncatableIndexChosenViaPendingSnap = "pending snapshot"
truncatableIndexChosenViaFirstIndex = "first index"
truncatableIndexChosenViaLastIndex = "last index"
)

type truncateDecisionInput struct {
RaftStatus *raft.Status // never nil
LogSize, MaxLogSize int64
FirstIndex, LastIndex uint64
PendingPreemptiveSnapshotIndex uint64
}

func (input truncateDecisionInput) LogTooLarge() bool {
return input.LogSize > input.MaxLogSize
}

type truncateDecision struct {
Input truncateDecisionInput
QuorumIndex uint64 // largest index known to be present on quorum

NewFirstIndex uint64 // first index of the resulting log after truncation
ChosenVia string
}

func (td *truncateDecision) raftSnapshotsForIndex(index uint64) int {
var n int
for _, p := range td.Input.RaftStatus.Progress {
if p.Match < index {
n++
}
}
if td.Input.PendingPreemptiveSnapshotIndex != 0 && td.Input.PendingPreemptiveSnapshotIndex < index {
n++
}

return n
}

truncatableIndex := computeTruncatableIndex(
raftStatus, raftLogSize, targetSize, firstIndex, lastIndex, pendingSnapshotIndex)
// Return the number of truncatable indexes.
return truncatableIndex - firstIndex, truncatableIndex, raftLogSize, nil
func (td *truncateDecision) NumNewRaftSnapshots() int {
return td.raftSnapshotsForIndex(td.NewFirstIndex) - td.raftSnapshotsForIndex(td.Input.FirstIndex)
}

// computeTruncatableIndex returns the oldest index that cannot be
func (td *truncateDecision) String() string {
var buf strings.Builder
_, _ = fmt.Fprintf(
&buf,
"truncate %d entries to first index %d (chosen via: %s)",
td.NumTruncatableIndexes(), td.NewFirstIndex, td.ChosenVia,
)
if td.Input.LogTooLarge() {
_, _ = fmt.Fprintf(
&buf,
"; log too large (%s > %s)",
humanizeutil.IBytes(td.Input.LogSize),
humanizeutil.IBytes(td.Input.MaxLogSize),
)
}
if n := td.NumNewRaftSnapshots(); n > 0 {
_, _ = fmt.Fprintf(&buf, "; implies %d Raft snapshot%s", n, util.Pluralize(int64(n)))
}

return buf.String()
}

func (td *truncateDecision) NumTruncatableIndexes() int {
if td.NewFirstIndex < td.Input.FirstIndex {
log.Fatalf(
context.Background(),
"invalid truncate decision: first index would move from %d to %d",
td.Input.FirstIndex,
td.NewFirstIndex,
)
}
return int(td.NewFirstIndex - td.Input.FirstIndex)
}

func (td *truncateDecision) ShouldTruncate() bool {
n := td.NumTruncatableIndexes()
return n >= RaftLogQueueStaleThreshold ||
(n > 0 && td.Input.LogSize >= RaftLogQueueStaleSize)
}

// computeTruncateDecision returns the oldest index that cannot be
// truncated. If there is a behind node, we want to keep old raft logs so it
// can catch up without having to send a full snapshot. However, if a node down
// is down long enough, sending a snapshot is more efficient and we should
Expand All @@ -150,53 +246,57 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int
// the behind node will be caught up to a point behind the current first index
// and thus require another snapshot, likely entering a never ending loop of
// snapshots. See #8629.
func computeTruncatableIndex(
raftStatus *raft.Status,
raftLogSize int64,
targetSize int64,
firstIndex uint64,
lastIndex uint64,
pendingSnapshotIndex uint64,
) uint64 {
quorumIndex := getQuorumIndex(raftStatus, pendingSnapshotIndex)
truncatableIndex := quorumIndex

if raftLogSize <= targetSize {
func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision := truncateDecision{Input: input}
decision.QuorumIndex = getQuorumIndex(input.RaftStatus, input.PendingPreemptiveSnapshotIndex)

decision.NewFirstIndex = decision.QuorumIndex
decision.ChosenVia = truncatableIndexChosenViaQuorumIndex

if !input.LogTooLarge() {
// Only truncate to one of the follower indexes if the raft log is less
// than the target size. If the raft log is greater than the target size we
// always truncate to the quorum commit index.
for _, progress := range raftStatus.Progress {
for _, progress := range input.RaftStatus.Progress {
index := progress.Match
if truncatableIndex > index {
truncatableIndex = index
if decision.NewFirstIndex > index {
decision.NewFirstIndex = index
decision.ChosenVia = truncatableIndexChosenViaFollowers
}
}
// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range. We don't want to truncate the log in a
// way that will require that new replica to be caught up via a Raft
// snapshot.
if pendingSnapshotIndex > 0 && truncatableIndex > pendingSnapshotIndex {
truncatableIndex = pendingSnapshotIndex
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}
}

if truncatableIndex < firstIndex {
truncatableIndex = firstIndex
}
// Never truncate past the quorum commit index (this can only occur if
// firstIndex > quorumIndex).
if truncatableIndex > quorumIndex {
truncatableIndex = quorumIndex
// Advance to the first index, but never truncate past the quorum commit
// index.
if decision.NewFirstIndex < input.FirstIndex && input.FirstIndex <= decision.QuorumIndex {
decision.NewFirstIndex = input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaFirstIndex
}
// Never truncate past the last index. Naively, you would expect lastIndex to
// never be smaller than quorumIndex, but RaftStatus.Progress.Match is
// updated on the leader when a command is proposed and in a single replica
// Raft group this also means that RaftStatus.Commit is updated at propose
// time.
if truncatableIndex > lastIndex {
truncatableIndex = lastIndex
if decision.NewFirstIndex > input.LastIndex {
decision.NewFirstIndex = input.LastIndex
decision.ChosenVia = truncatableIndexChosenViaLastIndex
}

// If new first index dropped below first index, make them equal (resulting
// in a no-op).
if decision.NewFirstIndex < decision.Input.FirstIndex {
decision.NewFirstIndex = decision.Input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaFirstIndex
}
return truncatableIndex
return decision
}

// getQuorumIndex returns the index which a quorum of the nodes have
Expand Down Expand Up @@ -229,45 +329,40 @@ func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64
func (rlq *raftLogQueue) shouldQueue(
ctx context.Context, now hlc.Timestamp, r *Replica, _ *config.SystemConfig,
) (shouldQ bool, priority float64) {
truncatableIndexes, _, raftLogSize, err := getTruncatableIndexes(ctx, r)
decision, err := newTruncateDecision(ctx, r)
if err != nil {
log.Warning(ctx, err)
return false, 0
}

return shouldTruncate(truncatableIndexes, raftLogSize), float64(raftLogSize)
return decision.ShouldTruncate(), float64(decision.Input.LogSize)
}

// process truncates the raft log of the range if the replica is the raft
// leader and if the total number of the range's raft log's stale entries
// exceeds RaftLogQueueStaleThreshold.
func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.SystemConfig) error {
truncatableIndexes, oldestIndex, raftLogSize, err := getTruncatableIndexes(ctx, r)
decision, err := newTruncateDecision(ctx, r)
if err != nil {
return err
}

// Can and should the raft logs be truncated?
if shouldTruncate(truncatableIndexes, raftLogSize) {
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
lastIndex := r.mu.lastIndex
r.mu.Unlock()

if log.V(1) {
log.Infof(ctx, "truncating raft log entries [%d-%d], resulting in log [%d,%d], reclaiming ~%s",
oldestIndex-truncatableIndexes, oldestIndex-1, oldestIndex, lastIndex, humanizeutil.IBytes(raftLogSize))
if decision.ShouldTruncate() {
if n := decision.NumNewRaftSnapshots(); log.V(1) || n > 0 && rlq.logSnapshots.ShouldProcess(timeutil.Now()) {
log.Info(ctx, decision)
} else {
log.VEvent(ctx, 1, decision.String())
}
b := &client.Batch{}
b.AddRawRequest(&roachpb.TruncateLogRequest{
RequestHeader: roachpb.RequestHeader{Key: r.Desc().StartKey.AsRawKey()},
Index: oldestIndex,
Index: decision.NewFirstIndex,
RangeID: r.RangeID,
})
if err := rlq.db.Run(ctx, b); err != nil {
return err
}
r.store.metrics.RaftLogTruncated.Inc(int64(truncatableIndexes))
r.store.metrics.RaftLogTruncated.Inc(int64(decision.NumTruncatableIndexes()))
}
return nil
}
Expand Down
Loading

0 comments on commit 4ac90bb

Please sign in to comment.