Skip to content

Commit

Permalink
server: Ensure that cluster members stored in v2store and backend are…
Browse files Browse the repository at this point in the history
… in sync
  • Loading branch information
serathius committed Sep 14, 2021
1 parent 079cea0 commit 32bfadb
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 62 deletions.
90 changes: 78 additions & 12 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"path"
"sort"
Expand All @@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
Expand Down Expand Up @@ -254,12 +256,14 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
} else {
if c.v2store != nil {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
// Mitigates backend becoming out of sync with v2store. Example: https://github.com/etcd-io/etcd/issues/13196.
c.syncBackendAndStore(c.members, c.removed)
} else {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
}

if c.be != nil {
Expand Down Expand Up @@ -289,6 +293,19 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
}
}

func (c *RaftCluster) syncBackendAndStore(members map[types.ID]*Member, removed map[types.ID]bool) {
err := TrimMembershipFromBackend(c.lg, c.be)
if err != nil {
c.lg.Panic("Failed to sync backend with v2store state", zap.Error(err))
}
for _, m := range members {
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
for mid := range removed {
unsafeDeleteMemberFromBackend(c.be, mid)
}
}

// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
Expand Down Expand Up @@ -381,11 +398,36 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()

var v2Err, beErr error
if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
if beErr != nil && !errors.Is(beErr, memberAlreadyExistErr) {
c.lg.Panic(
"failed to save member to backend",
zap.String("member-id", m.ID.String()),
zap.Error(beErr),
)
}
}
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}

c.members[m.ID] = m
Expand All @@ -404,11 +446,35 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil {
mustDeleteMemberFromStore(c.lg, c.v2store, id)
v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id)
beErr = unsafeDeleteMemberFromBackend(c.be, id)
if beErr != nil && !errors.Is(beErr, memberNotFoundErr) {
c.lg.Panic(
"failed to delete member from backend",
zap.String("member-id", id.String()),
zap.Error(beErr),
)
}
}
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}

m, ok := c.members[id]
Expand Down Expand Up @@ -443,7 +509,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
return
}
Expand Down Expand Up @@ -476,7 +542,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}

c.lg.Info(
Expand All @@ -495,7 +561,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}

c.lg.Info(
Expand Down Expand Up @@ -870,7 +936,7 @@ func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
}
if c.v2store != nil {
Expand Down
70 changes: 40 additions & 30 deletions server/etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,64 +1049,64 @@ func TestRecoverSyncsBackendAndStoreV2(t *testing.T) {
storeV2Members: []*Member{alice},
storeV2MembersRemoved: []*Member{bob},

expectMembers: []*Member{alice},
expectMembersRemoved: []*Member{bob},
expectMembers: []*Member{alice},
expectMembersRemoved: []*Member{bob},
},
{
name: "v2store doesn't exist",
storeV2Nil: true,
name: "v2store doesn't exist",
storeV2Nil: true,

backendMembers: []*Member{alice},
backendMembersRemoved: []*Member{bob},

expectMembers: []*Member{alice},
expectMembersRemoved: []*Member{bob},
expectMembers: []*Member{alice},
expectMembersRemoved: []*Member{bob},
},
{
name: "missing member add in backend",
name: "missing member add in backend",
backendMembers: []*Member{alice},

storeV2Members: []*Member{alice, bob},

expectMembers: []*Member{alice},
expectMembers: []*Member{alice, bob},
},
{
name: "missing member add in storeV2",
name: "missing member add in storeV2",
backendMembers: []*Member{alice, bob},

storeV2Members: []*Member{alice},

expectMembers: []*Member{alice, bob},
expectMembers: []*Member{alice},
},
{
name: "missing member remove in be",
backendMembers: []*Member{alice},
name: "missing member remove in be",
backendMembers: []*Member{alice},

storeV2Members: []*Member{alice},
storeV2MembersRemoved: []*Member{bob},

expectMembers: []*Member{alice},
expectMembers: []*Member{alice},
expectMembersRemoved: []*Member{bob},
},
{
name: "missing member remove in storev2",
name: "missing member remove in storev2",
backendMembers: []*Member{alice},
backendMembersRemoved: []*Member{bob},

storeV2Members: []*Member{alice},
storeV2Members: []*Member{alice},

expectMembers: []*Member{alice},
expectMembersRemoved: []*Member{bob},
expectMembers: []*Member{alice},
},
{
name: "All cases combined",
name: "All cases combined",
backendMembers: []*Member{alice, bob},
backendMembersRemoved: []*Member{cecile, dan},

storeV2Members: []*Member{alice, cecile},
storeV2MembersRemoved: []*Member{bob, dan},

expectMembers: []*Member{alice, bob},
expectMembersRemoved: []*Member{cecile, dan},
expectMembers: []*Member{alice, cecile},
expectMembersRemoved: []*Member{bob, dan},
},
}
for _, tc := range tcs {
Expand All @@ -1121,17 +1121,17 @@ func TestRecoverSyncsBackendAndStoreV2(t *testing.T) {
}

for _, m := range tc.backendMembers {
mustSaveMemberToBackend(lg, be, m)
unsafeSaveMemberToBackend(lg, be, m)
}
for _, m := range tc.backendMembersRemoved {
mustDeleteMemberFromBackend(be, m.ID)
unsafeDeleteMemberFromBackend(be, m.ID)
}
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
for _, m := range tc.storeV2MembersRemoved {
mustSaveMemberToStore(lg, st, m)
mustDeleteMemberFromStore(lg, st, m.ID)
unsafeDeleteMemberFromStore(st, m.ID)
}
be.ForceCommit()

Expand Down Expand Up @@ -1176,9 +1176,8 @@ func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
name: "Adding new member should succeed",
},
{
name: "Adding member should fail if it was only in storeV2",
name: "Adding member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
expectPanics: true,
},
{
name: "Adding member should succeed if it was only in backend",
Expand Down Expand Up @@ -1220,7 +1219,7 @@ func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
mustSaveMemberToBackend(lg, be, m)
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
Expand All @@ -1244,6 +1243,11 @@ func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers)
}
})
}
}
Expand All @@ -1260,7 +1264,8 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
storeV2Members []*Member
backendMembers []*Member

expectPanics bool
expectMembers []*Member
expectPanics bool
}{
{
name: "Removing new member should fail",
Expand All @@ -1271,9 +1276,8 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
storeV2Members: []*Member{alice},
},
{
name: "Removing member should fail if it was only in backend",
name: "Removing member should succeed if it was only in backend",
backendMembers: []*Member{alice},
expectPanics: true,
},
{
name: "Removing member should succeed if it exists in both",
Expand Down Expand Up @@ -1310,7 +1314,7 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
mustSaveMemberToBackend(lg, be, m)
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
Expand All @@ -1334,6 +1338,12 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, beRemoved := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{}, beMembers)
assert.Equal(t, map[types.ID]bool{alice.ID: true}, beRemoved)
}
})
}
}
Loading

0 comments on commit 32bfadb

Please sign in to comment.