Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
mukeshjc authored Nov 19, 2024
2 parents fb360eb + 017ad34 commit 8368671
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CHANGES

go-msgpack v2.1.1 is by default binary compatible with v0.5.5 ("non-builtin" encoding of `time.Time`), but can decode messages produced by v1.1.5 as well ("builtin" encoding of `time.Time`).

However, if users of this libary overrode the version of go-msgpack (especially to v1), this **could break** compatibility if raft nodes are running a mix of versions.
However, if users of this library overrode the version of go-msgpack (especially to v1), this **could break** compatibility if raft nodes are running a mix of versions.

This compatibility can be configured at runtime in Raft using `NetworkTransportConfig.MsgpackUseNewTimeFormat` -- the default is `false`, which maintains compatibility with `go-msgpack` v0.5.5, but if set to `true`, will be compatible with `go-msgpack` v1.1.5.

Expand Down
6 changes: 3 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
ErrEnqueueTimeout = errors.New("timed out enqueuing operation")

// ErrNothingNewToSnapshot is returned when trying to create a snapshot
// but there's nothing new commited to the FSM since we started.
// but there's nothing new committed to the FSM since we started.
ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")

// ErrUnsupportedProtocol is returned when an operation is attempted
Expand Down Expand Up @@ -1099,12 +1099,12 @@ func (r *Raft) State() RaftState {
// lose it.
//
// Receivers can expect to receive a notification only if leadership
// transition has occured.
// transition has occurred.
//
// If receivers aren't ready for the signal, signals may drop and only the
// latest leadership transition. For example, if a receiver receives subsequent
// `true` values, they may deduce that leadership was lost and regained while
// the the receiver was processing first leadership transition.
// the receiver was processing first leadership transition.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
}
Expand Down
2 changes: 1 addition & 1 deletion bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func StoreLogs(b *testing.B, store raft.LogStore) {
func DeleteRange(b *testing.B, store raft.LogStore) {
// Create some fake data. In this case, we create 3 new log entries for each
// test case, and separate them by index in multiples of 10. This allows
// some room so that we can test deleting ranges with "extra" logs to
// some room so that we can test deleting ranges with "extra" logs
// to ensure we stop going to the database once our max index is hit.
var logs []*raft.Log
for n := 0; n < b.N; n++ {
Expand Down
2 changes: 1 addition & 1 deletion configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func hasVote(configuration Configuration, id ServerID) bool {
return false
}

// inConfiguration returns true if the server identified by 'id' is in in the
// inConfiguration returns true if the server identified by 'id' is in the
// provided Configuration.
func inConfiguration(configuration Configuration, id ServerID) bool {
for _, server := range configuration.Servers {
Expand Down
10 changes: 5 additions & 5 deletions docs/apply.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Raft Apply

Apply is the primary operation provided by raft. A client calls `raft.Apply` to apply
a command to the FSM. A command will first be commited, i.e., durably stored on a
a command to the FSM. A command will first be committed, i.e., durably stored on a
quorum of raft nodes. Then, the committed command is applied to fsm.

This sequence diagram shows the steps involved in a `raft.Apply` operation. Each box
Expand Down Expand Up @@ -63,7 +63,7 @@ leader's lastIndex). Another parameter to AppendEntries is the LeaderCommitIndex
is some examples:

```
AppenEntries(Log: 1..5, LeaderCommitIndex: 0) // Replicating log entries 1..5,
AppendEntries(Log: 1..5, LeaderCommitIndex: 0) // Replicating log entries 1..5,
// the leader hasn't committed any log entry;
AppendEntries(Log: 6..8, LeaderCommitIndex: 4) // Replicating log entries 6..8,
// log 0..4 are committed after the leader receives
Expand Down Expand Up @@ -92,7 +92,7 @@ Therefore, it's possible that a very small window of time exists when all follow
committed the log to disk, the write has been realized in the FSM of the leader but the
followers have not yet applied the log to their FSM.

7. The peer applies the commited entries to the FSM.
7. The peer applies the committed entries to the FSM.

8. If all went well, the follower responds success (`resp.Success = true`) to the
`appendEntries` RPC call.
Expand All @@ -108,9 +108,9 @@ grouping the entries that can be applied to the fsm.

11. `processLogs` applies all the committed entries that haven't been applied by batching the log entries and forwarding them through the `fsmMutateCh` channel to fsm.

12. The actual place applying the commited log entries is in the main loop of `runFSM()`.
12. The actual place applying the committed log entries is in the main loop of `runFSM()`.

13. After the log entries that contains the client req are applied to the fsm, the fsm
module will set the reponses to the client request (`req.future.respond(nil)`). From the
module will set the responses to the client request (`req.future.respond(nil)`). From the
client's point of view, the future returned by `raft.Apply` should now be unblocked and
calls to `Error()` or `Response()` should return the data at this point.
2 changes: 1 addition & 1 deletion fuzzy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *cluster) Stop(t *testing.T, maxWait time.Duration) {
}

// WaitTilUptoDate blocks until all nodes in the cluster have gotten their
// commitedIndex upto the Index from the last successful call to Apply
// committedIndex upto the Index from the last successful call to Apply
func (c *cluster) WaitTilUptoDate(t *testing.T, maxWait time.Duration) {
idx := c.lastApplySuccess.Index()
start := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion fuzzy/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (v *appendEntriesVerifier) PreAppendEntries(src, target string, req *raft.A
if ldr != src {
v.Lock()
defer v.Unlock()
v.errors = append(v.errors, fmt.Sprintf("Node %v sent an appendEnties request for term %d that said the leader was some other node %v", src, term, ldr))
v.errors = append(v.errors, fmt.Sprintf("Node %v sent an appendEntries request for term %d that said the leader was some other node %v", src, term, ldr))
}
v.RLock()
tl, exists := v.leaderForTerm[term]
Expand Down
2 changes: 1 addition & 1 deletion log_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewLogCache(capacity int, store LogStore) (*LogCache, error) {
}

// IsMonotonic implements the MonotonicLogStore interface. This is a shim to
// expose the underyling store as monotonically indexed or not.
// expose the underlying store as monotonically indexed or not.
func (c *LogCache) IsMonotonic() bool {
if store, ok := c.store.(MonotonicLogStore); ok {
return store.IsMonotonic()
Expand Down
2 changes: 1 addition & 1 deletion net_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestNetworkTransport_AppendEntriesPipeline_MaxRPCsInFlight(t *testing.T) {

for i := 0; i < expectedMax-1; i++ {
// We should be able to send `max - 1` rpcs before `AppendEntries`
// blocks. It blocks on the `max` one because it it sends before pushing
// blocks. It blocks on the `max` one because it sends before pushing
// to the chan. It will block forever when it does because nothing is
// responding yet.
out := new(AppendEntriesResponse)
Expand Down
6 changes: 3 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ func (r *Raft) runCandidate() {
}

// Make sure the leadership transfer flag is reset after each run. Having this
// flag will set the field LeadershipTransfer in a RequestVoteRequst to true,
// flag will set the field LeadershipTransfer in a RequestVoteRequest to true,
// which will make other servers vote even though they have a leader already.
// It is important to reset that flag, because this priviledge could be abused
// It is important to reset that flag, because this privilege could be abused
// otherwise.
defer func() { r.candidateFromLeadershipTransfer.Store(false) }()

Expand Down Expand Up @@ -474,7 +474,7 @@ func (r *Raft) runLeader() {

// Store the notify chan. It's not reloadable so shouldn't change before the
// defer below runs, but this makes sure we always notify the same chan if
// ever for both gaining and loosing leadership.
// ever for both gaining and losing leadership.
notify := r.config().NotifyCh

// Push to the notify channel if given
Expand Down
14 changes: 7 additions & 7 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func TestRaft_AddKnownPeer(t *testing.T) {
newConfig := configReq.configurations.committed
newConfigIdx := configReq.configurations.committedIndex
if newConfigIdx <= startingConfigIdx {
t.Fatalf("AddVoter should have written a new config entry, but configurations.commitedIndex still %d", newConfigIdx)
t.Fatalf("AddVoter should have written a new config entry, but configurations.committedIndex still %d", newConfigIdx)
}
if !reflect.DeepEqual(newConfig, startingConfig) {
t.Fatalf("[ERR} AddVoter with existing peer shouldn't have changed config, was %#v, but now %#v", startingConfig, newConfig)
Expand Down Expand Up @@ -946,7 +946,7 @@ func TestRaft_RemoveUnknownPeer(t *testing.T) {
newConfig := configReq.configurations.committed
newConfigIdx := configReq.configurations.committedIndex
if newConfigIdx <= startingConfigIdx {
t.Fatalf("RemoveServer should have written a new config entry, but configurations.commitedIndex still %d", newConfigIdx)
t.Fatalf("RemoveServer should have written a new config entry, but configurations.committedIndex still %d", newConfigIdx)
}
if !reflect.DeepEqual(newConfig, startingConfig) {
t.Fatalf("[ERR} RemoveServer with unknown peer shouldn't of changed config, was %#v, but now %#v", startingConfig, newConfig)
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res
expected = preIndex + 2
} else {
// restoring onto a new cluster should always have a last index based
// off of the snaphsot meta index
// off of the snapshot meta index
expected = meta.Index + 2
}

Expand All @@ -1527,7 +1527,7 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res
// Ensure raft logs are removed for monotonic log stores but remain
// untouched for non-monotic (BoltDB) logstores.
// When first index = 1, then logs have remained untouched.
// When first indext is set to the next commit index / last index, then
// When first index is set to the next commit index / last index, then
// it means logs have been removed.
raftNodes := make([]*Raft, 0, numPeers+1)
raftNodes = append(raftNodes, leader)
Expand Down Expand Up @@ -2778,7 +2778,7 @@ func TestRaft_CacheLogWithStoreError(t *testing.T) {

// Shutdown follower
if f := follower.Shutdown(); f.Error() != nil {
t.Fatalf("error shuting down follower: %v", f.Error())
t.Fatalf("error shutting down follower: %v", f.Error())
}

// Try to restart the follower and make sure it does not fail with a LogNotFound error
Expand Down Expand Up @@ -2953,7 +2953,7 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) {
// a follower that thinks there's a leader should vote for that leader.
var resp RequestVoteResponse

// partiton the leader to simulate an unstable cluster
// partition the leader to simulate an unstable cluster
c.Partition([]ServerAddress{leader.localAddr})
time.Sleep(c.propagateTimeout)

Expand Down Expand Up @@ -3097,7 +3097,7 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) {
t.Logf("[INFO] restarting %v", follower)
// Shutdown follower
if f := follower.Shutdown(); f.Error() != nil {
t.Fatalf("error shuting down follower: %v", f.Error())
t.Fatalf("error shutting down follower: %v", f.Error())
}

_, trans := NewInmemTransport(follower.localAddr)
Expand Down
2 changes: 1 addition & 1 deletion replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type followerReplication struct {
triggerCh chan struct{}

// triggerDeferErrorCh is used to provide a backchannel. By sending a
// deferErr, the sender can be notifed when the replication is done.
// deferErr, the sender can be notified when the replication is done.
triggerDeferErrorCh chan *deferError

// lastContact is updated to the current time whenever any response is
Expand Down
2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (r *Raft) takeSnapshot() (string, error) {
}

// compactLogsWithTrailing takes the last inclusive index of a snapshot,
// the lastLogIdx, and and the trailingLogs and trims the logs that
// the lastLogIdx, and the trailingLogs and trims the logs that
// are no longer needed.
func (r *Raft) compactLogsWithTrailing(snapIdx uint64, lastLogIdx uint64, trailingLogs uint64) error {
// Determine log ranges to compact
Expand Down

0 comments on commit 8368671

Please sign in to comment.