From 76c8ca5a55569c3fc902a1f86f6d8c317eb4572d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 14 Jun 2019 22:15:01 +0200 Subject: [PATCH 1/4] quorum: introduce library for majority and joint quorums The quorum package contains logic to reason about committed indexes as well as vote outcomes for both majority and joint quorums. The package is oblivious to the existence of learner replicas. The plan is to hook this up to etcd/raft in subsequent commits. --- go.mod | 1 + go.sum | 2 + raft/quorum/bench_test.go | 40 ++ raft/quorum/datadriven_test.go | 250 +++++++++ raft/quorum/joint.go | 68 +++ raft/quorum/majority.go | 184 +++++++ raft/quorum/quick_test.go | 122 +++++ raft/quorum/quorum.go | 57 +++ raft/quorum/testdata/joint_commit.txt | 481 ++++++++++++++++++ raft/quorum/testdata/joint_vote.txt | 165 ++++++ raft/quorum/testdata/majority_commit.txt | 153 ++++++ raft/quorum/testdata/majority_vote.txt | 97 ++++ raft/quorum/voteresult_string.go | 26 + .../github.com/cockroachdb/datadriven/LICENSE | 201 ++++++++ .../cockroachdb/datadriven/datadriven.go | 318 ++++++++++++ .../cockroachdb/datadriven/line_scanner.go | 40 ++ .../datadriven/test_data_reader.go | 202 ++++++++ 17 files changed, 2407 insertions(+) create mode 100644 raft/quorum/bench_test.go create mode 100644 raft/quorum/datadriven_test.go create mode 100644 raft/quorum/joint.go create mode 100644 raft/quorum/majority.go create mode 100644 raft/quorum/quick_test.go create mode 100644 raft/quorum/quorum.go create mode 100644 raft/quorum/testdata/joint_commit.txt create mode 100644 raft/quorum/testdata/joint_vote.txt create mode 100644 raft/quorum/testdata/majority_commit.txt create mode 100644 raft/quorum/testdata/majority_vote.txt create mode 100644 raft/quorum/voteresult_string.go create mode 100644 vendor/github.com/cockroachdb/datadriven/LICENSE create mode 100644 vendor/github.com/cockroachdb/datadriven/datadriven.go create mode 100644 vendor/github.com/cockroachdb/datadriven/line_scanner.go create mode 100644 vendor/github.com/cockroachdb/datadriven/test_data_reader.go diff --git a/go.mod b/go.mod index cb28a0d8b8d..31eabdb8a4a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module go.etcd.io/etcd require ( github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/bgentry/speakeasy v0.1.0 + github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238 github.com/coreos/go-semver v0.2.0 github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf diff --git a/go.sum b/go.sum index 660f98f0d45..213796aa75c 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238 h1:uNljlOxtOHrPnRoPPx+JanqjAGZpNiqAGVBfGskd/pg= +github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM= diff --git a/raft/quorum/bench_test.go b/raft/quorum/bench_test.go new file mode 100644 index 00000000000..5c7961ed6cf --- /dev/null +++ b/raft/quorum/bench_test.go @@ -0,0 +1,40 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "fmt" + "math" + "math/rand" + "testing" +) + +func BenchmarkMajorityConfig_CommittedIndex(b *testing.B) { + // go test -run - -bench . -benchmem ./raft/quorum + for _, n := range []int{1, 3, 5, 7, 9, 11} { + b.Run(fmt.Sprintf("voters=%d", n), func(b *testing.B) { + c := MajorityConfig{} + l := mapAckIndexer{} + for i := uint64(0); i < uint64(n); i++ { + c[i+1] = struct{}{} + l[i+1] = Index(rand.Int63n(math.MaxInt64)) + } + + for i := 0; i < b.N; i++ { + _ = c.CommittedIndex(l) + } + }) + } +} diff --git a/raft/quorum/datadriven_test.go b/raft/quorum/datadriven_test.go new file mode 100644 index 00000000000..58bfd7234ca --- /dev/null +++ b/raft/quorum/datadriven_test.go @@ -0,0 +1,250 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" +) + +// TestDataDriven parses and executes the test cases in ./testdata/*. An entry +// in such a file specifies the command, which is either of "committed" to check +// CommittedIndex or "vote" to verify a VoteResult. The underlying configuration +// and inputs are specified via the arguments 'cfg' and 'cfgj' (for the majority +// config and, optionally, majority config joint to the first one) and 'idx' +// (for CommittedIndex) and 'votes' (for VoteResult). +// +// Internally, the harness runs some additional checks on each test case for +// which it is known that the result shouldn't change. For example, +// interchanging the majority configurations of a joint quorum must not +// influence the result; if it does, this is noted in the test's output. +func TestDataDriven(t *testing.T) { + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + // Two majority configs. The first one is always used (though it may + // be empty) and the second one is used iff joint is true. + var joint bool + var ids, idsj []uint64 + // The committed indexes for the nodes in the config in the order in + // which they appear in (ids,idsj), without repetition. An underscore + // denotes an omission (i.e. no information for this voter); this is + // different from 0. For example, + // + // cfg=(1,2) cfgj=(2,3,4) idxs=(_,5,_,7) initializes the idx for voter 2 + // to 5 and that for voter 4 to 7 (and no others). + // + // cfgj=zero is specified to instruct the test harness to treat cfgj + // as zero instead of not specified (i.e. it will trigger a joint + // quorum test instead of a majority quorum test for cfg only). + var idxs []Index + // Votes. These are initialized similar to idxs except the only values + // used are 1 (voted against) and 2 (voted for). This looks awkward, + // but is convenient because it allows sharing code between the two. + var votes []Index + + // Parse the args. + for _, arg := range d.CmdArgs { + for i := range arg.Vals { + switch arg.Key { + case "cfg": + var n uint64 + arg.Scan(t, i, &n) + ids = append(ids, n) + case "cfgj": + joint = true + if arg.Vals[i] == "zero" { + if len(arg.Vals) != 1 { + t.Fatalf("cannot mix 'zero' into configuration") + } + } else { + var n uint64 + arg.Scan(t, i, &n) + idsj = append(idsj, n) + } + case "idx": + var n uint64 + // Register placeholders as zeroes. + if arg.Vals[i] != "_" { + arg.Scan(t, i, &n) + if n == 0 { + // This is a restriction caused by the above + // special-casing for _. + t.Fatalf("cannot use 0 as idx") + } + } + idxs = append(idxs, Index(n)) + case "votes": + var s string + arg.Scan(t, i, &s) + switch s { + case "y": + votes = append(votes, 2) + case "n": + votes = append(votes, 1) + case "_": + votes = append(votes, 0) + default: + t.Fatalf("unknown vote: %s", s) + } + default: + t.Fatalf("unknown arg %s", arg.Key) + } + } + } + + // Build the two majority configs. + c := MajorityConfig{} + for _, id := range ids { + c[id] = struct{}{} + } + cj := MajorityConfig{} + for _, id := range idsj { + cj[id] = struct{}{} + } + + // Helper that returns an AckedIndexer which has the specified indexes + // mapped to the right IDs. + makeLookuper := func(idxs []Index, ids, idsj []uint64) mapAckIndexer { + l := mapAckIndexer{} + var p int // next to consume from idxs + for _, id := range append(append([]uint64(nil), ids...), idsj...) { + if _, ok := l[id]; ok { + continue + } + if p < len(idxs) { + // NB: this creates zero entries for placeholders that we remove later. + // The upshot of doing it that way is to avoid having to specify place- + // holders multiple times when omitting voters present in both halves of + // a joint config. + l[id] = idxs[p] + p++ + } + } + + for id := range l { + // Zero entries are created by _ placeholders; we don't want + // them in the lookuper because "no entry" is different from + // "zero entry". Note that we prevent tests from specifying + // zero commit indexes, so that there's no confusion between + // the two concepts. + if l[id] == 0 { + delete(l, id) + } + } + return l + } + + { + input := idxs + if d.Cmd == "vote" { + input = votes + } + if voters := JointConfig([2]MajorityConfig{c, cj}).IDs(); len(voters) != len(input) { + return fmt.Sprintf("error: mismatched input (explicit or _) for voters %v: %v", + voters, input) + } + } + + var buf strings.Builder + switch d.Cmd { + case "committed": + l := makeLookuper(idxs, ids, idsj) + + // Branch based on whether this is a majority or joint quorum + // test case. + if !joint { + idx := c.CommittedIndex(l) + fmt.Fprintf(&buf, c.Describe(l)) + // These alternative computations should return the same + // result. If not, print to the output. + if aIdx := alternativeMajorityCommittedIndex(c, l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via alternative computation\n", aIdx) + } + // Joining a majority with the empty majority should give same result. + if aIdx := JointConfig([2]MajorityConfig{c, {}}).CommittedIndex(l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via zero-joint quorum\n", aIdx) + } + // Joining a majority with itself should give same result. + if aIdx := JointConfig([2]MajorityConfig{c, c}).CommittedIndex(l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via self-joint quorum\n", aIdx) + } + overlay := func(c MajorityConfig, l AckedIndexer, id uint64, idx Index) AckedIndexer { + ll := mapAckIndexer{} + for iid := range c { + if iid == id { + ll[iid] = idx + } else if idx, ok := l.AckedIndex(iid); ok { + ll[iid] = idx + } + } + return ll + } + for id := range c { + idx, _ := l.AckedIndex(id) + if idx > idx && idx > 0 { + // If the committed index was definitely above the currently + // inspected idx, the result shouldn't change if we lower it + // further. + lo := overlay(c, l, id, idx-1) + if aIdx := c.CommittedIndex(lo); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- overlaying %d->%d", aIdx, id, idx) + } + lo = overlay(c, l, id, 0) + if aIdx := c.CommittedIndex(lo); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- overlaying %d->0", aIdx, id) + } + } + } + fmt.Fprintf(&buf, "%s\n", idx) + } else { + cc := JointConfig([2]MajorityConfig{c, cj}) + fmt.Fprintf(&buf, cc.Describe(l)) + idx := cc.CommittedIndex(l) + // Interchanging the majorities shouldn't make a difference. If it does, print. + if aIdx := JointConfig([2]MajorityConfig{c, cj}).CommittedIndex(l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via symmetry\n", aIdx) + } + fmt.Fprintf(&buf, "%s\n", idx) + } + case "vote": + ll := makeLookuper(votes, ids, idsj) + l := map[uint64]bool{} + for id, v := range ll { + l[id] = v != 1 // NB: 1 == false, 2 == true + } + + if !joint { + // Test a majority quorum. + r := c.VoteResult(l) + fmt.Fprintf(&buf, "%v\n", r) + } else { + // Run a joint quorum test case. + r := JointConfig([2]MajorityConfig{c, cj}).VoteResult(l) + // Interchanging the majorities shouldn't make a difference. If it does, print. + if ar := JointConfig([2]MajorityConfig{cj, c}).VoteResult(l); ar != r { + fmt.Fprintf(&buf, "%v <-- via symmetry\n", ar) + } + fmt.Fprintf(&buf, "%v\n", r) + } + default: + t.Fatalf("unknown command: %s", d.Cmd) + } + return buf.String() + }) + }) +} diff --git a/raft/quorum/joint.go b/raft/quorum/joint.go new file mode 100644 index 00000000000..9f8f484dc57 --- /dev/null +++ b/raft/quorum/joint.go @@ -0,0 +1,68 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +// JointConfig is a configuration of two groups of (possibly overlapping) +// majority configurations. Decisions require the support of both majorities. +type JointConfig [2]MajorityConfig + +// IDs returns a newly initialized map representing the set of voters present +// in the joint configuration. +func (c JointConfig) IDs() map[uint64]struct{} { + m := map[uint64]struct{}{} + for _, cc := range c { + for id := range cc { + m[id] = struct{}{} + } + } + return m +} + +// Describe returns a (multi-line) representation of the commit indexes for the +// given lookuper. +func (c JointConfig) Describe(l AckedIndexer) string { + return MajorityConfig(c.IDs()).Describe(l) +} + +// CommittedIndex returns the largest committed index for the given joint +// quorum. An index is jointly committed if it is committed in both constituent +// majorities. +func (c JointConfig) CommittedIndex(l AckedIndexer) Index { + idx0 := c[0].CommittedIndex(l) + idx1 := c[1].CommittedIndex(l) + if idx0 < idx1 { + return idx0 + } + return idx1 +} + +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending, lost, or won. A joint quorum +// requires both majority quorums to vote in favor. +func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult { + r1 := c[0].VoteResult(votes) + r2 := c[1].VoteResult(votes) + + if r1 == r2 { + // If they agree, return the agreed state. + return r1 + } + if r1 == VoteLost || r2 == VoteLost { + // If either config has lost, loss is the only possible outcome. + return VoteLost + } + // One side won, the other one is pending, so the whole outcome is. + return VotePending +} diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go new file mode 100644 index 00000000000..3d7bf82335a --- /dev/null +++ b/raft/quorum/majority.go @@ -0,0 +1,184 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "fmt" + "math" + "sort" + "strings" +) + +// MajorityConfig is a set of IDs that uses majority quorums to make decisions. +type MajorityConfig map[uint64]struct{} + +// Describe returns a (multi-line) representation of the commit indexes for the +// given lookuper. +func (c MajorityConfig) Describe(l AckedIndexer) string { + if len(c) == 0 { + return "" + } + type tup struct { + id uint64 + idx Index + ok bool // idx found? + bar int // length of bar displayed for this tup + } + + // Below, populate .bar so that the i-th largest commit index has bar i (we + // plot this as sort of a progress bar). The actual code is a bit more + // complicated and also makes sure that equal index => equal bar. + + n := len(c) + info := make([]tup, 0, n) + for id := range c { + idx, ok := l.AckedIndex(id) + info = append(info, tup{id: id, idx: idx, ok: ok}) + } + + // Sort by index + sort.Slice(info, func(i, j int) bool { + if info[i].idx == info[j].idx { + return info[i].id < info[j].id + } + return info[i].idx < info[j].idx + }) + + // Populate .bar. + for i := range info { + if i > 0 && info[i-1].idx < info[i].idx { + info[i].bar = i + } + } + + // Sort by ID. + sort.Slice(info, func(i, j int) bool { + return info[i].id < info[j].id + }) + + var buf strings.Builder + + // Print. + fmt.Fprint(&buf, strings.Repeat(" ", n)+" idx\n") + for i := range info { + bar := info[i].bar + if !info[i].ok { + fmt.Fprint(&buf, "?"+strings.Repeat(" ", n)) + } else { + fmt.Fprint(&buf, strings.Repeat("x", bar)+">"+strings.Repeat(" ", n-bar)) + } + fmt.Fprintf(&buf, " %5d (id=%d)\n", info[i].idx, info[i].id) + } + return buf.String() +} + +type uint64Slice []uint64 + +func insertionSort(sl uint64Slice) { + a, b := 0, len(sl) + for i := a + 1; i < b; i++ { + for j := i; j > a && sl[j] < sl[j-1]; j-- { + sl[j], sl[j-1] = sl[j-1], sl[j] + } + } +} + +// CommittedIndex computes the committed index from those supplied via the +// provided AckedIndexer (for the active config). +func (c MajorityConfig) CommittedIndex(l AckedIndexer) Index { + n := len(c) + if n == 0 { + // This plays well with joint quorums which, when one half is the zero + // MajorityConfig, should behave like the other half. + return math.MaxUint64 + } + + // Use an on-stack slice to collect the committed indexes when n <= 7 + // (otherwise we alloc). The alternative is to stash a slice on + // MajorityConfig, but this impairs usability (as is, MajorityConfig is just + // a map, and that's nice). The assumption is that running with a + // replication factor of >7 is rare, and in cases in which it happens + // performance is a lesser concern (additionally the performance + // implications of an allocation here are far from drastic). + var stk [7]uint64 + srt := uint64Slice(stk[:]) + + if cap(srt) < n { + srt = make([]uint64, n) + } + srt = srt[:n] + + { + // Fill the slice with the indexes observed. Any unused slots will be + // left as zero; these correspond to voters that may report in, but + // haven't yet. We fill from the right (since the zeroes will end up on + // the left after sorting below anyway). + i := n - 1 + for id := range c { + if idx, ok := l.AckedIndex(id); ok { + srt[i] = uint64(idx) + i-- + } + } + } + + // Sort by index. Use a bespoke algorithm (copied from the stdlib's sort + // package) to keep srt on the stack. + insertionSort(srt) + + // The smallest index into the array for which the value is acked by a + // quorum. In other words, from the end of the slice, move n/2+1 to the + // left (accounting for zero-indexing). + pos := n - (n/2 + 1) + return Index(srt[pos]) +} + +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending (i.e. neither a quorum of +// yes/no has been reached), won (a quorum of yes has been reached), or lost (a +// quorum of no has been reached). +func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult { + if len(c) == 0 { + // By convention, the elections on an empty config win. This comes in + // handy with joint quorums because it'll make a half-populated joint + // quorum behave like a majority quorum. + return VoteWon + } + + ny := [2]int{} // vote counts for no and yes, respectively + + var missing int + for id := range c { + v, ok := votes[id] + if !ok { + missing++ + continue + } + if v { + ny[1]++ + } else { + ny[0]++ + } + } + + q := len(c)/2 + 1 + if ny[1] >= q { + return VoteWon + } + if ny[1]+missing >= q { + return VotePending + } + return VoteLost +} diff --git a/raft/quorum/quick_test.go b/raft/quorum/quick_test.go new file mode 100644 index 00000000000..45fb6b00a74 --- /dev/null +++ b/raft/quorum/quick_test.go @@ -0,0 +1,122 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "math" + "math/rand" + "reflect" + "testing" + "testing/quick" +) + +// TestQuick uses quickcheck to heuristically assert that the main +// implementation of (MajorityConfig).CommittedIndex agrees with a "dumb" +// alternative version. +func TestQuick(t *testing.T) { + cfg := &quick.Config{ + MaxCount: 50000, + } + + t.Run("majority_commit", func(t *testing.T) { + fn1 := func(c memberMap, l idxMap) uint64 { + return uint64(MajorityConfig(c).CommittedIndex(mapAckIndexer(l))) + } + fn2 := func(c memberMap, l idxMap) uint64 { + return uint64(alternativeMajorityCommittedIndex(MajorityConfig(c), mapAckIndexer(l))) + } + if err := quick.CheckEqual(fn1, fn2, cfg); err != nil { + t.Fatal(err) + } + }) +} + +// smallRandIdxMap returns a reasonably sized map of ids to commit indexes. +func smallRandIdxMap(rand *rand.Rand, size int) map[uint64]Index { + // Hard-code a reasonably small size here (quick will hard-code 50, which + // is not useful here). + size = 10 + + n := rand.Intn(size) + ids := rand.Perm(2 * n)[:n] + idxs := make([]int, len(ids)) + for i := range idxs { + idxs[i] = rand.Intn(n) + } + + m := map[uint64]Index{} + for i := range ids { + m[uint64(ids[i])] = Index(idxs[i]) + } + return m +} + +type idxMap map[uint64]Index + +func (idxMap) Generate(rand *rand.Rand, size int) reflect.Value { + m := smallRandIdxMap(rand, size) + return reflect.ValueOf(m) +} + +type memberMap map[uint64]struct{} + +func (memberMap) Generate(rand *rand.Rand, size int) reflect.Value { + m := smallRandIdxMap(rand, size) + mm := map[uint64]struct{}{} + for id := range m { + mm[id] = struct{}{} + } + return reflect.ValueOf(mm) +} + +// This is an alternative implementation of (MajorityConfig).CommittedIndex(l). +func alternativeMajorityCommittedIndex(c MajorityConfig, l AckedIndexer) Index { + if len(c) == 0 { + return math.MaxUint64 + } + + idToIdx := map[uint64]Index{} + for id := range c { + if idx, ok := l.AckedIndex(id); ok { + idToIdx[id] = idx + } + } + + // Build a map from index to voters who have acked that or any higher index. + idxToVotes := map[Index]int{} + for _, idx := range idToIdx { + idxToVotes[idx] = 0 + } + + for _, idx := range idToIdx { + for idy := range idxToVotes { + if idy > idx { + continue + } + idxToVotes[idy]++ + } + } + + // Find the maximum index that has achieved quorum. + q := len(c)/2 + 1 + var maxQuorumIdx Index + for idx, n := range idxToVotes { + if n >= q && idx > maxQuorumIdx { + maxQuorumIdx = idx + } + } + + return maxQuorumIdx +} diff --git a/raft/quorum/quorum.go b/raft/quorum/quorum.go new file mode 100644 index 00000000000..ff9c6f48d89 --- /dev/null +++ b/raft/quorum/quorum.go @@ -0,0 +1,57 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "math" + "strconv" +) + +type Index uint64 + +func (i Index) String() string { + if i == math.MaxUint64 { + return "∞" + } + return strconv.FormatUint(uint64(i), 10) +} + +// AckedIndexer allows looking up a commit index for a given ID of a voter +// from a corresponding MajorityConfig. +type AckedIndexer interface { + AckedIndex(voterID uint64) (idx Index, found bool) +} + +type mapAckIndexer map[uint64]Index + +func (m mapAckIndexer) AckedIndex(id uint64) (Index, bool) { + idx, ok := m[id] + return idx, ok +} + +// VoteResult indicates the outcome of a vote. +// +//go:generate stringer -type=VoteResult +type VoteResult uint8 + +const ( + // VotePending indicates that the decision of the vote depends on future + // votes, i.e. neither "yes" or "no" has reached quorum yet. + VotePending VoteResult = 1 + iota + // VoteLost indicates that the quorum has voted "no". + VoteLost + // VoteWon indicates that the quorum has voted "yes". + VoteWon +) diff --git a/raft/quorum/testdata/joint_commit.txt b/raft/quorum/testdata/joint_commit.txt new file mode 100644 index 00000000000..12f19fb331c --- /dev/null +++ b/raft/quorum/testdata/joint_commit.txt @@ -0,0 +1,481 @@ +# No difference between a simple majority quorum and a simple majority quorum +# joint with an empty majority quorum. (This is asserted for all datadriven tests +# by the framework, so we don't dwell on it more). +# +# Note that by specifying cfgj explicitly we tell the test harness to treat the +# input as a joint quorum and not a majority quorum. If we didn't specify +# cfgj=zero the test would pass just the same, but it wouldn't be exercising the +# joint quorum path. +committed cfg=(1,2,3) cfgj=zero idx=(100,101,99) +---- + idx +x> 100 (id=1) +xx> 101 (id=2) +> 99 (id=3) +100 + +# Joint nonoverlapping singleton quorums. + +committed cfg=(1) cfgj=(2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +# Voter 1 has 100 committed, 2 nothing. This means we definitely won't commit +# past 100. +committed cfg=(1) cfgj=(2) idx=(100,_) +---- + idx +x> 100 (id=1) +? 0 (id=2) +0 + +# Committed index collapses once both majorities do, to the lower index. +committed cfg=(1) cfgj=(2) idx=(13, 100) +---- + idx +> 13 (id=1) +x> 100 (id=2) +13 + +# Joint overlapping (i.e. identical) singleton quorum. + +committed cfg=(1) cfgj=(1) idx=(_) +---- + idx +? 0 (id=1) +0 + +committed cfg=(1) cfgj=(1) idx=(100) +---- + idx +> 100 (id=1) +100 + + + +# Two-node config joint with non-overlapping single node config +committed cfg=(1,3) cfgj=(2) idx=(_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,3) cfgj=(2) idx=(100,_,_) +---- + idx +xx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +# 1 has 100 committed, 2 has 50 (collapsing half of the joint quorum to 50). +committed cfg=(1,3) cfgj=(2) idx=(100,_,50) +---- + idx +xx> 100 (id=1) +x> 50 (id=2) +? 0 (id=3) +0 + +# 2 reports 45, collapsing the other half (to 45). +committed cfg=(1,3) cfgj=(2) idx=(100,45,50) +---- + idx +xx> 100 (id=1) +x> 50 (id=2) +> 45 (id=3) +45 + +# Two-node config with overlapping single-node config. + +committed cfg=(1,2) cfgj=(2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +# 1 reports 100. +committed cfg=(1,2) cfgj=(2) idx=(100,_) +---- + idx +x> 100 (id=1) +? 0 (id=2) +0 + +# 2 reports 100. +committed cfg=(1,2) cfgj=(2) idx=(_,100) +---- + idx +? 0 (id=1) +x> 100 (id=2) +0 + +committed cfg=(1,2) cfgj=(2) idx=(50,100) +---- + idx +> 50 (id=1) +x> 100 (id=2) +50 + +committed cfg=(1,2) cfgj=(2) idx=(100,50) +---- + idx +x> 100 (id=1) +> 50 (id=2) +50 + + + +# Joint non-overlapping two-node configs. + +committed cfg=(1,2) cfgj=(3,4) idx=(50,_,_,_) +---- + idx +xxx> 50 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2) cfgj=(3,4) idx=(50,_,49,_) +---- + idx +xxx> 50 (id=1) +? 0 (id=2) +xx> 49 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2) cfgj=(3,4) idx=(50,48,49,_) +---- + idx +xxx> 50 (id=1) +x> 48 (id=2) +xx> 49 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2) cfgj=(3,4) idx=(50,48,49,47) +---- + idx +xxx> 50 (id=1) +x> 48 (id=2) +xx> 49 (id=3) +> 47 (id=4) +47 + +# Joint overlapping two-node configs. +committed cfg=(1,2) cfgj=(2,3) idx=(_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(100,_,_) +---- + idx +xx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(_,100,_) +---- + idx +? 0 (id=1) +xx> 100 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(_,100,99) +---- + idx +? 0 (id=1) +xx> 100 (id=2) +x> 99 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(101,100,99) +---- + idx +xx> 101 (id=1) +x> 100 (id=2) +> 99 (id=3) +99 + +# Joint identical two-node configs. +committed cfg=(1,2) cfgj=(1,2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +committed cfg=(1,2) cfgj=(1,2) idx=(_,40) +---- + idx +? 0 (id=1) +x> 40 (id=2) +0 + +committed cfg=(1,2) cfgj=(1,2) idx=(41,40) +---- + idx +x> 41 (id=1) +> 40 (id=2) +40 + + + +# Joint disjoint three-node configs. + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(_,_,_,_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,_,_,_,_,_) +---- + idx +xxxxx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,_,_,90,_,_) +---- + idx +xxxxx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +xxxx> 90 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,99,_,_,_,_) +---- + idx +xxxxx> 100 (id=1) +xxxx> 99 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +# First quorum <= 99, second one <= 97. Both quorums guarantee that 90 is +# committed. +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(_,99,90,97,95,_) +---- + idx +? 0 (id=1) +xxxxx> 99 (id=2) +xx> 90 (id=3) +xxxx> 97 (id=4) +xxx> 95 (id=5) +? 0 (id=6) +90 + +# First quorum collapsed to 92. Second one already had at least 95 committed, +# so the result also collapses. +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(92,99,90,97,95,_) +---- + idx +xx> 92 (id=1) +xxxxx> 99 (id=2) +x> 90 (id=3) +xxxx> 97 (id=4) +xxx> 95 (id=5) +? 0 (id=6) +92 + +# Second quorum collapses, but nothing changes in the output. +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(92,99,90,97,95,77) +---- + idx +xx> 92 (id=1) +xxxxx> 99 (id=2) +x> 90 (id=3) +xxxx> 97 (id=4) +xxx> 95 (id=5) +> 77 (id=6) +92 + + +# Joint overlapping three-node configs. + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,_,_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,_,_,_,_) +---- + idx +xxxx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,101,_,_,_) +---- + idx +xxx> 100 (id=1) +xxxx> 101 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,101,100,_,_) +---- + idx +xx> 100 (id=1) +xxxx> 101 (id=2) +> 100 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +# Second quorum could commit either 98 or 99, but first quorum is open. +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,100,_,99,98) +---- + idx +? 0 (id=1) +xxxx> 100 (id=2) +? 0 (id=3) +xxx> 99 (id=4) +xx> 98 (id=5) +0 + +# Additionally, first quorum can commit either 100 or 99 +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,100,99,99,98) +---- + idx +? 0 (id=1) +xxxx> 100 (id=2) +xx> 99 (id=3) +> 99 (id=4) +x> 98 (id=5) +98 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(1,100,99,99,98) +---- + idx +> 1 (id=1) +xxxx> 100 (id=2) +xx> 99 (id=3) +> 99 (id=4) +x> 98 (id=5) +98 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,100,99,99,98) +---- + idx +xxx> 100 (id=1) +> 100 (id=2) +x> 99 (id=3) +> 99 (id=4) +> 98 (id=5) +99 + + +# More overlap. + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(_,_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(_,100,99,_) +---- + idx +? 0 (id=1) +xxx> 100 (id=2) +xx> 99 (id=3) +? 0 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(98,100,99,_) +---- + idx +x> 98 (id=1) +xxx> 100 (id=2) +xx> 99 (id=3) +? 0 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,100,99,_) +---- + idx +xx> 100 (id=1) +> 100 (id=2) +x> 99 (id=3) +? 0 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,100,99,98) +---- + idx +xx> 100 (id=1) +> 100 (id=2) +x> 99 (id=3) +> 98 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,_,_,101) +---- + idx +xx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +xxx> 101 (id=4) +0 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,99,_,101) +---- + idx +xx> 100 (id=1) +x> 99 (id=2) +? 0 (id=3) +xxx> 101 (id=4) +99 + +# Identical. This is also exercised in the test harness, so it's listed here +# only briefly. +committed cfg=(1,2,3) cfgj=(1,2,3) idx=(50,45,_) +---- + idx +xx> 50 (id=1) +x> 45 (id=2) +? 0 (id=3) +45 diff --git a/raft/quorum/testdata/joint_vote.txt b/raft/quorum/testdata/joint_vote.txt new file mode 100644 index 00000000000..36cd0cabcff --- /dev/null +++ b/raft/quorum/testdata/joint_vote.txt @@ -0,0 +1,165 @@ +# Empty joint config wins all votes. This isn't used in production. Note that +# by specifying cfgj explicitly we tell the test harness to treat the input as +# a joint quorum and not a majority quorum. +vote cfgj=zero +---- +VoteWon + +# More examples with close to trivial configs. + +vote cfg=(1) cfgj=zero votes=(_) +---- +VotePending + +vote cfg=(1) cfgj=zero votes=(y) +---- +VoteWon + +vote cfg=(1) cfgj=zero votes=(n) +---- +VoteLost + +vote cfg=(1) cfgj=(1) votes=(_) +---- +VotePending + +vote cfg=(1) cfgj=(1) votes=(y) +---- +VoteWon + +vote cfg=(1) cfgj=(1) votes=(n) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(_,_) +---- +VotePending + +vote cfg=(1) cfgj=(2) votes=(y,_) +---- +VotePending + +vote cfg=(1) cfgj=(2) votes=(y,y) +---- +VoteWon + +vote cfg=(1) cfgj=(2) votes=(y,n) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(n,_) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(n,n) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(n,y) +---- +VoteLost + +# Two node configs. + +vote cfg=(1,2) cfgj=(3,4) votes=(_,_,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(3,4) votes=(y,_,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,n,_) +---- +VoteLost + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,n,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,y,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,y,y) +---- +VoteWon + +vote cfg=(1,2) cfgj=(2,3) votes=(_,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(2,3) votes=(_,n,_) +---- +VoteLost + +vote cfg=(1,2) cfgj=(2,3) votes=(y,y,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(2,3) votes=(y,y,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(2,3) votes=(y,y,y) +---- +VoteWon + +vote cfg=(1,2) cfgj=(1,2) votes=(_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(1,2) votes=(y,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(1,2) votes=(y,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(1,2) votes=(n,_) +---- +VoteLost + +vote cfg=(1,2) cfgj=(1,2) votes=(n,n) +---- +VoteLost + + +# Simple example for overlapping three node configs. + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,_,_,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,n,_,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,n,n,_) +---- +VoteLost + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,y,y,_) +---- +VoteWon + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,_,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,n) +---- +VoteLost + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,y) +---- +VoteWon diff --git a/raft/quorum/testdata/majority_commit.txt b/raft/quorum/testdata/majority_commit.txt new file mode 100644 index 00000000000..6ff5d0b89e0 --- /dev/null +++ b/raft/quorum/testdata/majority_commit.txt @@ -0,0 +1,153 @@ +# The empty quorum commits "everything". This is useful for its use in joint +# quorums. +committed +---- +∞ + + + +# A single voter quorum is not final when no index is known. +committed cfg=(1) idx=(_) +---- + idx +? 0 (id=1) +0 + +# When an index is known, that's the committed index, and that's final. +committed cfg=(1) idx=(12) +---- + idx +> 12 (id=1) +12 + + + + +# With two nodes, start out similarly. +committed cfg=(1, 2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +# The first committed index becomes known (for n1). Nothing changes in the +# output because idx=12 is not known to be on a quorum (which is both nodes). +committed cfg=(1, 2) idx=(12,_) +---- + idx +x> 12 (id=1) +? 0 (id=2) +0 + +# The second index comes in and finalize the decision. The result will be the +# smaller of the two indexes. +committed cfg=(1,2) idx=(12,5) +---- + idx +x> 12 (id=1) +> 5 (id=2) +5 + + + + +# No surprises for three nodes. +committed cfg=(1,2,3) idx=(_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2,3) idx=(12,_,_) +---- + idx +xx> 12 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +# We see a committed index, but a higher committed index for the last pending +# votes could change (increment) the outcome, so not final yet. +committed cfg=(1,2,3) idx=(12,5,_) +---- + idx +xx> 12 (id=1) +x> 5 (id=2) +? 0 (id=3) +5 + +# a) the case in which it does: +committed cfg=(1,2,3) idx=(12,5,6) +---- + idx +xx> 12 (id=1) +> 5 (id=2) +x> 6 (id=3) +6 + +# b) the case in which it does not: +committed cfg=(1,2,3) idx=(12,5,4) +---- + idx +xx> 12 (id=1) +x> 5 (id=2) +> 4 (id=3) +5 + +# c) a different case in which the last index is pending but it has no chance of +# swaying the outcome (because nobody in the current quorum agrees on anything +# higher than the candidate): +committed cfg=(1,2,3) idx=(5,5,_) +---- + idx +x> 5 (id=1) +> 5 (id=2) +? 0 (id=3) +5 + +# c) continued: Doesn't matter what shows up last. The result is final. +committed cfg=(1,2,3) idx=(5,5,12) +---- + idx +> 5 (id=1) +> 5 (id=2) +xx> 12 (id=3) +5 + +# With all committed idx known, the result is final. +committed cfg=(1, 2, 3) idx=(100, 101, 103) +---- + idx +> 100 (id=1) +x> 101 (id=2) +xx> 103 (id=3) +101 + + + +# Some more complicated examples. Similar to case c) above. The result is +# already final because no index higher than 103 is one short of quorum. +committed cfg=(1, 2, 3, 4, 5) idx=(101, 104, 103, 103,_) +---- + idx +x> 101 (id=1) +xxxx> 104 (id=2) +xx> 103 (id=3) +> 103 (id=4) +? 0 (id=5) +103 + +# A similar case which is not final because another vote for >= 103 would change +# the outcome. +committed cfg=(1, 2, 3, 4, 5) idx=(101, 102, 103, 103,_) +---- + idx +x> 101 (id=1) +xx> 102 (id=2) +xxx> 103 (id=3) +> 103 (id=4) +? 0 (id=5) +102 diff --git a/raft/quorum/testdata/majority_vote.txt b/raft/quorum/testdata/majority_vote.txt new file mode 100644 index 00000000000..5f9564b4f51 --- /dev/null +++ b/raft/quorum/testdata/majority_vote.txt @@ -0,0 +1,97 @@ +# The empty config always announces a won vote. +vote +---- +VoteWon + +vote cfg=(1) votes=(_) +---- +VotePending + +vote cfg=(1) votes=(n) +---- +VoteLost + +vote cfg=(123) votes=(y) +---- +VoteWon + + + + +vote cfg=(4,8) votes=(_,_) +---- +VotePending + +# With two voters, a single rejection loses the vote. +vote cfg=(4,8) votes=(n,_) +---- +VoteLost + +vote cfg=(4,8) votes=(y,_) +---- +VotePending + +vote cfg=(4,8) votes=(n,y) +---- +VoteLost + +vote cfg=(4,8) votes=(y,y) +---- +VoteWon + + + +vote cfg=(2,4,7) votes=(_,_,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(n,_,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(y,_,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(n,n,_) +---- +VoteLost + +vote cfg=(2,4,7) votes=(y,n,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(y,y,_) +---- +VoteWon + +vote cfg=(2,4,7) votes=(y,y,n) +---- +VoteWon + +vote cfg=(2,4,7) votes=(n,y,n) +---- +VoteLost + + + +# Test some random example with seven nodes (why not). +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,_,_,_) +---- +VotePending + +vote cfg=(1,2,3,4,5,6,7) votes=(_,y,y,_,n,y,n) +---- +VotePending + +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,_,n,y) +---- +VoteWon + +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,_,n,y,n,n) +---- +VotePending + +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,n,n,n) +---- +VoteLost diff --git a/raft/quorum/voteresult_string.go b/raft/quorum/voteresult_string.go new file mode 100644 index 00000000000..9eca8fd0c96 --- /dev/null +++ b/raft/quorum/voteresult_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=VoteResult"; DO NOT EDIT. + +package quorum + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[VotePending-1] + _ = x[VoteLost-2] + _ = x[VoteWon-3] +} + +const _VoteResult_name = "VotePendingVoteLostVoteWon" + +var _VoteResult_index = [...]uint8{0, 11, 19, 26} + +func (i VoteResult) String() string { + i -= 1 + if i >= VoteResult(len(_VoteResult_index)-1) { + return "VoteResult(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _VoteResult_name[_VoteResult_index[i]:_VoteResult_index[i+1]] +} diff --git a/vendor/github.com/cockroachdb/datadriven/LICENSE b/vendor/github.com/cockroachdb/datadriven/LICENSE new file mode 100644 index 00000000000..261eeb9e9f8 --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/cockroachdb/datadriven/datadriven.go b/vendor/github.com/cockroachdb/datadriven/datadriven.go new file mode 100644 index 00000000000..49e73ce380f --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/datadriven.go @@ -0,0 +1,318 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package datadriven + +import ( + "bufio" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "testing" +) + +var ( + rewriteTestFiles = flag.Bool( + "rewrite", false, + "ignore the expected results and rewrite the test files with the actual results from this "+ + "run. Used to update tests when a change affects many cases; please verify the testfile "+ + "diffs carefully!", + ) +) + +// RunTest invokes a data-driven test. The test cases are contained in a +// separate test file and are dynamically loaded, parsed, and executed by this +// testing framework. By convention, test files are typically located in a +// sub-directory called "testdata". Each test file has the following format: +// +// [,...] [arg | arg=val | arg=(val1, val2, ...)]... +// +// ---- +// +// +// The command input can contain blank lines. However, by default, the expected +// results cannot contain blank lines. This alternate syntax allows the use of +// blank lines: +// +// [,...] [arg | arg=val | arg=(val1, val2, ...)]... +// +// ---- +// ---- +// +// +// +// ---- +// ---- +// +// To execute data-driven tests, pass the path of the test file as well as a +// function which can interpret and execute whatever commands are present in +// the test file. The framework invokes the function, passing it information +// about the test case in a TestData struct. The function then returns the +// actual results of the case, which this function compares with the expected +// results, and either succeeds or fails the test. +func RunTest(t *testing.T, path string, f func(d *TestData) string) { + t.Helper() + file, err := os.OpenFile(path, os.O_RDWR, 0644 /* irrelevant */) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = file.Close() + }() + + runTestInternal(t, path, file, f, *rewriteTestFiles) +} + +// RunTestFromString is a version of RunTest which takes the contents of a test +// directly. +func RunTestFromString(t *testing.T, input string, f func(d *TestData) string) { + t.Helper() + runTestInternal(t, "" /* optionalPath */, strings.NewReader(input), f, *rewriteTestFiles) +} + +func runTestInternal( + t *testing.T, sourceName string, reader io.Reader, f func(d *TestData) string, rewrite bool, +) { + t.Helper() + + r := newTestDataReader(t, sourceName, reader, rewrite) + for r.Next(t) { + d := &r.data + actual := func() string { + defer func() { + if r := recover(); r != nil { + fmt.Printf("\npanic during %s:\n%s\n", d.Pos, d.Input) + panic(r) + } + }() + return f(d) + }() + + if r.rewrite != nil { + r.emit("----") + if hasBlankLine(actual) { + r.emit("----") + r.rewrite.WriteString(actual) + r.emit("----") + r.emit("----") + } else { + r.emit(actual) + } + } else if d.Expected != actual { + t.Fatalf("\n%s: %s\nexpected:\n%s\nfound:\n%s", d.Pos, d.Input, d.Expected, actual) + } else if testing.Verbose() { + input := d.Input + if input == "" { + input = "" + } + // TODO(tbg): it's awkward to reproduce the args, but it would be helpful. + fmt.Printf("\n%s:\n%s [%d args]\n%s\n----\n%s", d.Pos, d.Cmd, len(d.CmdArgs), input, actual) + } + } + + if r.rewrite != nil { + data := r.rewrite.Bytes() + if l := len(data); l > 2 && data[l-1] == '\n' && data[l-2] == '\n' { + data = data[:l-1] + } + if dest, ok := reader.(*os.File); ok { + if _, err := dest.WriteAt(data, 0); err != nil { + t.Fatal(err) + } + if err := dest.Truncate(int64(len(data))); err != nil { + t.Fatal(err) + } + if err := dest.Sync(); err != nil { + t.Fatal(err) + } + } else { + t.Logf("input is not a file; rewritten output is:\n%s", data) + } + } +} + +// Walk goes through all the files in a subdirectory, creating subtests to match +// the file hierarchy; for each "leaf" file, the given function is called. +// +// This can be used in conjunction with RunTest. For example: +// +// datadriven.Walk(t, path, func (t *testing.T, path string) { +// // initialize per-test state +// datadriven.RunTest(t, path, func (d *datadriven.TestData) { +// // ... +// } +// } +// +// Files: +// testdata/typing +// testdata/logprops/scan +// testdata/logprops/select +// +// If path is "testdata/typing", the function is called once and no subtests +// care created. +// +// If path is "testdata/logprops", the function is called two times, in +// separate subtests /scan, /select. +// +// If path is "testdata", the function is called three times, in subtest +// hierarchy /typing, /logprops/scan, /logprops/select. +// +func Walk(t *testing.T, path string, f func(t *testing.T, path string)) { + finfo, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + if !finfo.IsDir() { + f(t, path) + return + } + files, err := ioutil.ReadDir(path) + if err != nil { + t.Fatal(err) + } + for _, file := range files { + t.Run(file.Name(), func(t *testing.T) { + Walk(t, filepath.Join(path, file.Name()), f) + }) + } +} + +// TestData contains information about one data-driven test case that was +// parsed from the test file. +type TestData struct { + Pos string // reader and line number + + // Cmd is the first string on the directive line (up to the first whitespace). + Cmd string + + CmdArgs []CmdArg + + Input string + Expected string +} + +// ScanArgs looks up the first CmdArg matching the given key and scans it into +// the given destinations in order. If the arg does not exist, the number of +// destinations does not match that of the arguments, or a destination can not +// be populated from its matching value, a fatal error results. +// +// For example, for a TestData originating from +// +// cmd arg1=50 arg2=yoruba arg3=(50, 50, 50) +// +// the following would be valid: +// +// var i1, i2, i3, i4 int +// var s string +// td.ScanArgs(t, "arg1", &i1) +// td.ScanArgs(t, "arg2", &s) +// td.ScanArgs(t, "arg3", &i2, &i3, &i4) +func (td *TestData) ScanArgs(t *testing.T, key string, dests ...interface{}) { + t.Helper() + var arg CmdArg + for i := range td.CmdArgs { + if td.CmdArgs[i].Key == key { + arg = td.CmdArgs[i] + break + } + } + if arg.Key == "" { + t.Fatalf("missing argument: %s", key) + } + if len(dests) != len(arg.Vals) { + t.Fatalf("%s: got %d destinations, but %d values", arg.Key, len(dests), len(arg.Vals)) + } + + for i := range dests { + arg.Scan(t, i, dests[i]) + + } +} + +// CmdArg contains information about an argument on the directive line. An +// argument is specified in one of the following forms: +// - argument +// - argument=value +// - argument=(values, ...) +type CmdArg struct { + Key string + Vals []string +} + +func (arg CmdArg) String() string { + switch len(arg.Vals) { + case 0: + return arg.Key + + case 1: + return fmt.Sprintf("%s=%s", arg.Key, arg.Vals[0]) + + default: + return fmt.Sprintf("%s=(%s)", arg.Key, strings.Join(arg.Vals, ", ")) + } +} + +// Scan attempts to parse the value at index i into the dest. +func (arg CmdArg) Scan(t *testing.T, i int, dest interface{}) { + if i < 0 || i >= len(arg.Vals) { + t.Fatalf("cannot scan index %d of key %s", i, arg.Key) + } + val := arg.Vals[i] + switch dest := dest.(type) { + case *string: + *dest = val + case *int: + n, err := strconv.ParseInt(val, 10, 64) + if err != nil { + t.Fatal(err) + } + *dest = int(n) // assume 64bit ints + case *uint64: + n, err := strconv.ParseUint(val, 10, 64) + if err != nil { + t.Fatal(err) + } + *dest = n + case *bool: + b, err := strconv.ParseBool(val) + if err != nil { + t.Fatal(err) + } + *dest = b + default: + t.Fatalf("unsupported type %T for destination #%d (might be easy to add it)", dest, i+1) + } +} + +// Fatalf wraps a fatal testing error with test file position information, so +// that it's easy to locate the source of the error. +func (td TestData) Fatalf(tb testing.TB, format string, args ...interface{}) { + tb.Helper() + tb.Fatalf("%s: %s", td.Pos, fmt.Sprintf(format, args...)) +} + +func hasBlankLine(s string) bool { + scanner := bufio.NewScanner(strings.NewReader(s)) + for scanner.Scan() { + if strings.TrimSpace(scanner.Text()) == "" { + return true + } + } + return false +} diff --git a/vendor/github.com/cockroachdb/datadriven/line_scanner.go b/vendor/github.com/cockroachdb/datadriven/line_scanner.go new file mode 100644 index 00000000000..67681dcfda6 --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/line_scanner.go @@ -0,0 +1,40 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package datadriven + +import ( + "bufio" + "io" +) + +type lineScanner struct { + *bufio.Scanner + line int +} + +func newLineScanner(r io.Reader) *lineScanner { + return &lineScanner{ + Scanner: bufio.NewScanner(r), + line: 0, + } +} + +func (l *lineScanner) Scan() bool { + ok := l.Scanner.Scan() + if ok { + l.line++ + } + return ok +} diff --git a/vendor/github.com/cockroachdb/datadriven/test_data_reader.go b/vendor/github.com/cockroachdb/datadriven/test_data_reader.go new file mode 100644 index 00000000000..315fbf2dd51 --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/test_data_reader.go @@ -0,0 +1,202 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package datadriven + +import ( + "bytes" + "fmt" + "io" + "regexp" + "strings" + "testing" +) + +type testDataReader struct { + sourceName string + reader io.Reader + scanner *lineScanner + data TestData + rewrite *bytes.Buffer +} + +func newTestDataReader( + t *testing.T, sourceName string, file io.Reader, record bool, +) *testDataReader { + t.Helper() + + var rewrite *bytes.Buffer + if record { + rewrite = &bytes.Buffer{} + } + return &testDataReader{ + sourceName: sourceName, + reader: file, + scanner: newLineScanner(file), + rewrite: rewrite, + } +} + +func (r *testDataReader) Next(t *testing.T) bool { + t.Helper() + + r.data = TestData{} + for r.scanner.Scan() { + line := r.scanner.Text() + r.emit(line) + + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "#") { + // Skip comment lines. + continue + } + // Support wrapping directive lines using \, for example: + // build-scalar \ + // vars(int) + for strings.HasSuffix(line, `\`) && r.scanner.Scan() { + nextLine := r.scanner.Text() + r.emit(nextLine) + line = strings.TrimSuffix(line, `\`) + " " + strings.TrimSpace(nextLine) + } + + fields := splitDirectives(t, line) + if len(fields) == 0 { + continue + } + cmd := fields[0] + r.data.Pos = fmt.Sprintf("%s:%d", r.sourceName, r.scanner.line) + r.data.Cmd = cmd + + for _, arg := range fields[1:] { + key := arg + var vals []string + if pos := strings.IndexByte(key, '='); pos >= 0 { + key = arg[:pos] + val := arg[pos+1:] + + if len(val) > 2 && val[0] == '(' && val[len(val)-1] == ')' { + vals = strings.Split(val[1:len(val)-1], ",") + for i := range vals { + vals[i] = strings.TrimSpace(vals[i]) + } + } else { + vals = []string{val} + } + } + r.data.CmdArgs = append(r.data.CmdArgs, CmdArg{Key: key, Vals: vals}) + } + + var buf bytes.Buffer + var separator bool + for r.scanner.Scan() { + line := r.scanner.Text() + if line == "----" { + separator = true + break + } + + r.emit(line) + fmt.Fprintln(&buf, line) + } + + r.data.Input = strings.TrimSpace(buf.String()) + + if separator { + r.readExpected() + } + return true + } + return false +} + +func (r *testDataReader) readExpected() { + var buf bytes.Buffer + var line string + var allowBlankLines bool + + if r.scanner.Scan() { + line = r.scanner.Text() + if line == "----" { + allowBlankLines = true + } + } + + if allowBlankLines { + // Look for two successive lines of "----" before terminating. + for r.scanner.Scan() { + line = r.scanner.Text() + + if line == "----" { + if r.scanner.Scan() { + line2 := r.scanner.Text() + if line2 == "----" { + break + } + + fmt.Fprintln(&buf, line) + fmt.Fprintln(&buf, line2) + continue + } + } + + fmt.Fprintln(&buf, line) + } + } else { + // Terminate on first blank line. + for { + if strings.TrimSpace(line) == "" { + break + } + + fmt.Fprintln(&buf, line) + + if !r.scanner.Scan() { + break + } + + line = r.scanner.Text() + } + } + + r.data.Expected = buf.String() +} + +func (r *testDataReader) emit(s string) { + if r.rewrite != nil { + r.rewrite.WriteString(s) + r.rewrite.WriteString("\n") + } +} + +var splitDirectivesRE = regexp.MustCompile(`^ *[a-zA-Z0-9_,-\.]+(|=[-a-zA-Z0-9_@]+|=\([^)]*\))( |$)`) + +// splits a directive line into tokens, where each token is +// either: +// - a,list,of,things +// - argument +// - argument=value +// - argument=(values, ...) +func splitDirectives(t *testing.T, line string) []string { + var res []string + + for line != "" { + str := splitDirectivesRE.FindString(line) + if len(str) == 0 { + t.Fatalf("cannot parse directive %s\n", line) + } + res = append(res, strings.TrimSpace(line[0:len(str)])) + line = line[len(str):] + } + return res +} From 3def2364e48a1f07abb12d5d29dc9e5050feacd4 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 3 Jun 2019 13:16:28 +0200 Subject: [PATCH 2/4] raft: use membership sets in progress tracking Instead of having disjoint mappings of ID to *Progress for voters and learners, use a map[id]struct{} for each and share a map of *Progress among them. This is easier to handle when joint quorums are introduced, at which point a node may be a voting member of two quorums. --- raft/progress.go | 50 +++++++------- raft/raft_flow_control_test.go | 6 +- raft/raft_snap_test.go | 58 ++++++++--------- raft/raft_test.go | 116 +++++++++++++++++---------------- 4 files changed, 115 insertions(+), 115 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index fa4d63edfba..fc4afb2bb0a 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -291,8 +291,9 @@ func (in *inflights) reset() { // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type progressTracker struct { - nodes map[uint64]*Progress - learners map[uint64]*Progress + nodes map[uint64]struct{} + learners map[uint64]struct{} + prs map[uint64]*Progress votes map[uint64]bool @@ -303,8 +304,9 @@ type progressTracker struct { func makePRS(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, - nodes: map[uint64]*Progress{}, - learners: map[uint64]*Progress{}, + prs: map[uint64]*Progress{}, + nodes: map[uint64]struct{}{}, + learners: map[uint64]struct{}{}, votes: map[uint64]bool{}, } return p @@ -334,8 +336,8 @@ func (p *progressTracker) committed() uint64 { } p.matchBuf = p.matchBuf[:len(p.nodes)] idx := 0 - for _, pr := range p.nodes { - p.matchBuf[idx] = pr.Match + for id := range p.nodes { + p.matchBuf[idx] = p.prs[id].Match idx++ } sort.Sort(&p.matchBuf) @@ -343,50 +345,44 @@ func (p *progressTracker) committed() uint64 { } func (p *progressTracker) removeAny(id uint64) { - pN := p.nodes[id] - pL := p.learners[id] + _, okPR := p.prs[id] + _, okV := p.nodes[id] + _, okL := p.learners[id] - if pN == nil && pL == nil { + if !okPR { + panic("attempting to remove unknown peer %x") + } else if !okV && !okL { panic("attempting to remove unknown peer %x") - } else if pN != nil && pL != nil { + } else if okV && okL { panic(fmt.Sprintf("peer %x is both voter and learner", id)) } delete(p.nodes, id) delete(p.learners, id) + delete(p.prs, id) } // initProgress initializes a new progress for the given node or learner. The // node may not exist yet in either form or a panic will ensue. func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { - if pr := p.nodes[id]; pr != nil { + if pr := p.prs[id]; pr != nil { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } - if pr := p.learners[id]; pr != nil { - panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr)) - } if !isLearner { - p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} - return + p.nodes[id] = struct{}{} + } else { + p.learners[id] = struct{}{} } - p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} + p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner} } func (p *progressTracker) getProgress(id uint64) *Progress { - if pr, ok := p.nodes[id]; ok { - return pr - } - - return p.learners[id] + return p.prs[id] } // visit invokes the supplied closure for all tracked progresses. func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { - for id, pr := range p.nodes { - f(id, pr) - } - - for id, pr := range p.learners { + for id, pr := range p.prs { f(id, pr) } } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 699bb5b0780..033e336921c 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 145473824c6..246ed07e207 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() + sm.prs.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) - if sm.prs.nodes[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) + if sm.prs.prs[2].PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot) } } @@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } - if sm.prs.nodes[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 1 { + t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next) } - if !sm.prs.nodes[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) + if !sm.prs.prs[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) } } @@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } - if sm.prs.nodes[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 12 { + t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next) } - if !sm.prs.nodes[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) + if !sm.prs.prs[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) } } @@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { mustSend(n2, n1, pb.MsgAppResp) // Leader has correct state for follower. - pr := n1.prs.nodes[2] + pr := n1.prs.prs[2] if pr.State != ProgressStateReplicate { t.Fatalf("unexpected state %v", pr) } @@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } // The follower entered ProgressStateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.prs.nodes[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next) } - if n := sm.prs.nodes[2].ins.count; n != 1 { + if n := sm.prs.prs[2].ins.count; n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index d4f0fa26828..31ad3f1aedc 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() // Send proposals to r1. The first 5 entries should be appended to the log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { t.Errorf("unexpected progress %v", pr) } if err := r.Step(propMsg); err != nil { @@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs.nodes[2].Paused = true + r.prs.prs[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused) + if r.prs.prs[2].Paused { + t.Errorf("paused = %v, want false", r.prs.prs[2].Paused) } } @@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.prs.nodes[2].becomeProbe() + r.prs.prs[2].becomeProbe() blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) @@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.prs.nodes[2].becomeReplicate() - r.prs.nodes[3].becomeReplicate() + r.prs.prs[2].becomeReplicate() + r.prs.prs[3].becomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -2632,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) { sm.readMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) - p := sm.prs.nodes[2] + p := sm.prs.prs[2] if p.Match != tt.wmatch { t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) } @@ -2679,9 +2679,9 @@ func TestBcastBeat(t *testing.T) { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower - sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6 + sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6 // normal follower - sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() @@ -2689,8 +2689,8 @@ func TestBcastBeat(t *testing.T) { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.prs.nodes[2].Match), - 3: min(sm.raftLog.committed, sm.prs.nodes[3].Match), + 2: min(sm.raftLog.committed, sm.prs.prs[2].Match), + 3: min(sm.raftLog.committed, sm.prs.prs[3].Match), } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { @@ -2776,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) { sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].State = tt.state - sm.prs.nodes[2].Next = tt.next + sm.prs.prs[2].State = tt.state + sm.prs.prs[2].Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.prs.nodes[2] + p := sm.prs.prs[2] if p.Next != tt.wnext { t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) } @@ -2792,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeProbe() + r.prs.prs[2].becomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2811,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2826,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } // consume the heartbeat @@ -2849,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } } @@ -2859,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2876,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeSnapshot(10) + r.prs.prs[2].becomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2897,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.prs.nodes[2].Match = 3 - r.prs.nodes[2].becomeReplicate() - r.prs.nodes[2].optimisticUpdate(5) + r.prs.prs[2].Match = 3 + r.prs.prs[2].becomeReplicate() + r.prs.prs[2].optimisticUpdate(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - if r.prs.nodes[2].State != ProgressStateProbe { - t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe) + if r.prs.prs[2].State != ProgressStateProbe { + t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe) } - if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext { - t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext) + if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext { + t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext) } } @@ -2973,13 +2973,13 @@ func TestRestoreWithLearner(t *testing.T) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { - if sm.prs.nodes[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false) + if sm.prs.prs[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false) } } for _, n := range s.Metadata.ConfState.Learners { - if !sm.prs.learners[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true) + if !sm.prs.prs[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true) } } @@ -3121,8 +3121,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) + sm.prs.prs[2].Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) msgs := sm.readMessages() if len(msgs) != 1 { @@ -3152,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1 - sm.prs.nodes[2].RecentActive = false + sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs.prs[2].RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -3201,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.prs.nodes[3].RecentActive { + if lead.prs.prs[3].RecentActive { break } } @@ -3304,8 +3304,8 @@ func TestAddLearner(t *testing.T) { if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } - if !r.prs.learners[2].IsLearner { - t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true) + if !r.prs.prs[2].IsLearner { + t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true) } } @@ -3619,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.prs.nodes[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) + if lead.prs.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of log. @@ -3642,8 +3642,8 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - if lead.prs.nodes[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) + if lead.prs.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of snapshot. @@ -3722,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { t.Fatalf("should return drop proposal error while transferring") } - if lead.prs.nodes[1].Match != 1 { - t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1) + if lead.prs.prs[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1) } } @@ -4334,14 +4334,18 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs.nodes = make(map[uint64]*Progress) - v.prs.learners = make(map[uint64]*Progress) + v.prs.nodes = make(map[uint64]struct{}) + v.prs.learners = make(map[uint64]struct{}) + v.prs.prs = make(map[uint64]*Progress) for i := 0; i < size; i++ { + pr := &Progress{} if _, ok := learners[peerAddrs[i]]; ok { - v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true} + pr.IsLearner = true + v.prs.learners[peerAddrs[i]] = struct{}{} } else { - v.prs.nodes[peerAddrs[i]] = &Progress{} + v.prs.nodes[peerAddrs[i]] = struct{}{} } + v.prs.prs[peerAddrs[i]] = pr } v.reset(v.Term) npeers[id] = v From 0384c587eb96acaddcab38ed3ae17319994e68f0 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 3 Jun 2019 14:23:04 +0200 Subject: [PATCH 3/4] raft: rename makeP{RS,rogressTracker} --- raft/progress.go | 2 +- raft/raft.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index fc4afb2bb0a..b0bd817a9c9 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -301,7 +301,7 @@ type progressTracker struct { matchBuf uint64Slice } -func makePRS(maxInflight int) progressTracker { +func makeProgressTracker(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, prs: map[uint64]*Progress{}, diff --git a/raft/raft.go b/raft/raft.go index 61df549b96d..780a77b1552 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -343,7 +343,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: makePRS(c.MaxInflightMsgs), + prs: makeProgressTracker(c.MaxInflightMsgs), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -1346,7 +1346,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs = makePRS(r.prs.maxInflight) + r.prs = makeProgressTracker(r.prs.maxInflight) r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true From e03962990731caaaac7cb8fd892f856086506753 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 14 Jun 2019 21:47:49 +0200 Subject: [PATCH 4/4] raft: use half-populated joint quorum To ease a future transition into joint quorums, this commit removes the previous "ad-hoc" majority-based quorum and vote computations with that introduced in the `raft/quorum` package. More specifically, the progressTracker now uses a quorum.JointConfig for which the "second" majority quorum is always empty; in this case the quorum behaves like the one quorum.MajorityConfig that is actually present. Or, more briefly, this change is a no-op, but it will take the busywork out of actually starting to make use of joint quorums in the future. On a side node, I suspect that this might've fixed a bug regarding the read index though I haven't been able to explicitly come up with a counter-example. The problem was that the acks collected for the read index weren't taking into account membership changes, so they'd run the danger of using acks from nodes since removed to claim that a quorum of acks had been received. There's a chance that there isn't a counter-example (the only guarantee extracted from the "quorum" is that there isn't another leader, but even if there's another leader all that matters is that that leader doesn't have a divergent history from the stale leader in the hypothetical counter-example), but either way there is morally a bug here that is now fixed because VoteCommitted doesn't care about votes from members that are not voters known to the currently active configuration. --- raft/progress.go | 93 ++++++++++++++++++++++++----------------------- raft/raft.go | 27 ++++++-------- raft/raft_test.go | 5 ++- raft/read_only.go | 12 ++++-- 4 files changed, 69 insertions(+), 68 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index b0bd817a9c9..37663562493 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -17,6 +17,8 @@ package raft import ( "fmt" "sort" + + "go.etcd.io/etcd/raft/quorum" ) const ( @@ -291,23 +293,25 @@ func (in *inflights) reset() { // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type progressTracker struct { - nodes map[uint64]struct{} + voters quorum.JointConfig learners map[uint64]struct{} prs map[uint64]*Progress votes map[uint64]bool maxInflight int - matchBuf uint64Slice } func makeProgressTracker(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, - prs: map[uint64]*Progress{}, - nodes: map[uint64]struct{}{}, - learners: map[uint64]struct{}{}, - votes: map[uint64]bool{}, + voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + quorum.MajorityConfig{}, + }, + learners: map[uint64]struct{}{}, + votes: map[uint64]bool{}, + prs: map[uint64]*Progress{}, } return p } @@ -315,40 +319,35 @@ func makeProgressTracker(maxInflight int) progressTracker { // isSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. func (p *progressTracker) isSingleton() bool { - return len(p.nodes) == 1 + return len(p.voters[0]) == 1 && len(p.voters[1]) == 0 } -func (p *progressTracker) quorum() int { - return len(p.nodes)/2 + 1 -} +type progressAckIndexer map[uint64]*Progress -func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool { - return len(m) >= p.quorum() +var _ quorum.AckedIndexer = progressAckIndexer(nil) + +func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) { + pr, ok := l[id] + if !ok { + return 0, false + } + return quorum.Index(pr.Match), true } // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *progressTracker) committed() uint64 { - // Preserving matchBuf across calls is an optimization - // used to avoid allocating a new slice on each call. - if cap(p.matchBuf) < len(p.nodes) { - p.matchBuf = make(uint64Slice, len(p.nodes)) - } - p.matchBuf = p.matchBuf[:len(p.nodes)] - idx := 0 - for id := range p.nodes { - p.matchBuf[idx] = p.prs[id].Match - idx++ - } - sort.Sort(&p.matchBuf) - return p.matchBuf[len(p.matchBuf)-p.quorum()] + return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs))) } func (p *progressTracker) removeAny(id uint64) { _, okPR := p.prs[id] - _, okV := p.nodes[id] + _, okV1 := p.voters[0][id] + _, okV2 := p.voters[1][id] _, okL := p.learners[id] + okV := okV1 || okV2 + if !okPR { panic("attempting to remove unknown peer %x") } else if !okV && !okL { @@ -357,7 +356,8 @@ func (p *progressTracker) removeAny(id uint64) { panic(fmt.Sprintf("peer %x is both voter and learner", id)) } - delete(p.nodes, id) + delete(p.voters[0], id) + delete(p.voters[1], id) delete(p.learners, id) delete(p.prs, id) } @@ -369,7 +369,7 @@ func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } if !isLearner { - p.nodes[id] = struct{}{} + p.voters[0][id] = struct{}{} } else { p.learners[id] = struct{}{} } @@ -391,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { // the view of the local raft state machine. Otherwise, it returns // false. func (p *progressTracker) quorumActive() bool { - var act int + votes := map[uint64]bool{} p.visit(func(id uint64, pr *Progress) { - if pr.RecentActive && !pr.IsLearner { - act++ + if pr.IsLearner { + return } + votes[id] = pr.RecentActive }) - return act >= p.quorum() + return p.voters.VoteResult(votes) == quorum.VoteWon } func (p *progressTracker) voterNodes() []uint64 { - nodes := make([]uint64, 0, len(p.nodes)) - for id := range p.nodes { + m := p.voters.IDs() + nodes := make([]uint64, 0, len(m)) + for id := range m { nodes = append(nodes, id) } sort.Sort(uint64Slice(nodes)) @@ -435,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) { // tallyVotes returns the number of granted and rejected votes, and whether the // election outcome is known. -func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) { - for _, v := range p.votes { - if v { +func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) { + // Make sure to populate granted/rejected correctly even if the votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. + for id, pr := range p.prs { + if pr.IsLearner { + continue + } + if p.votes[id] { granted++ } else { rejected++ } } - - q := p.quorum() - - result = electionIndeterminate - if granted >= q { - result = electionWon - } else if rejected >= q { - result = electionLost - } + result := p.voters.VoteResult(p.votes) return granted, rejected, result } diff --git a/raft/raft.go b/raft/raft.go index 780a77b1552..06ba6bf12b2 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/raft/quorum" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -744,7 +745,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon { + if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -754,7 +755,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.nodes { + for id := range r.prs.voters.IDs() { if id == r.id { continue } @@ -769,15 +770,7 @@ func (r *raft) campaign(t CampaignType) { } } -type electionResult byte - -const ( - electionIndeterminate electionResult = iota - electionLost - electionWon -) - -func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) { +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { @@ -999,7 +992,9 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if !r.prs.isSingleton() { // more than one voting member in cluster + // If more than the local vote is needed, go through a full broadcast, + // otherwise optimize. + if !r.prs.isSingleton() { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1110,7 +1105,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { + if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } @@ -1210,14 +1205,14 @@ func stepCandidate(r *raft, m pb.Message) error { gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) switch res { - case electionWon: + case quorum.VoteWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } - case electionLost: + case quorum.VoteLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) @@ -1417,7 +1412,7 @@ func (r *raft) removeNode(id uint64) { r.prs.removeAny(id) // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { + if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index 31ad3f1aedc..40be17cfc8f 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4334,7 +4334,8 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs.nodes = make(map[uint64]struct{}) + v.prs.voters[0] = make(map[uint64]struct{}) + v.prs.voters[1] = make(map[uint64]struct{}) v.prs.learners = make(map[uint64]struct{}) v.prs.prs = make(map[uint64]*Progress) for i := 0; i < size; i++ { @@ -4343,7 +4344,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw pr.IsLearner = true v.prs.learners[peerAddrs[i]] = struct{}{} } else { - v.prs.nodes[peerAddrs[i]] = struct{}{} + v.prs.voters[0][peerAddrs[i]] = struct{}{} } v.prs.prs[peerAddrs[i]] = pr } diff --git a/raft/read_only.go b/raft/read_only.go index 39eb2b06515..6987f1bd7d7 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -29,7 +29,11 @@ type ReadState struct { type readIndexStatus struct { req pb.Message index uint64 - acks map[uint64]struct{} + // NB: this never records 'false', but it's more convenient to use this + // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If + // this becomes performance sensitive enough (doubtful), quorum.VoteResult + // can change to an API that is closer to that of CommittedIndex. + acks map[uint64]bool } type readOnly struct { @@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) { if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { rs, ok := ro.pendingReadIndex[string(context)] if !ok { return nil } - rs.acks[id] = struct{}{} + rs.acks[id] = true return rs.acks }