From 32bfadbc7896d2ab763efc11bf905f9c09f1efaa Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 14 Sep 2021 11:08:45 +0200 Subject: [PATCH] server: Ensure that cluster members stored in v2store and backend are in sync --- server/etcdserver/api/membership/cluster.go | 90 ++++++++++++++++--- .../etcdserver/api/membership/cluster_test.go | 70 ++++++++------- server/etcdserver/api/membership/store.go | 61 ++++++++----- 3 files changed, 159 insertions(+), 62 deletions(-) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 3df9588be8c1..728b5adbe22f 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -20,6 +20,7 @@ import ( "crypto/sha1" "encoding/binary" "encoding/json" + "errors" "fmt" "path" "sort" @@ -32,6 +33,7 @@ import ( "go.etcd.io/etcd/pkg/v3/netutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2error" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/buckets" @@ -254,12 +256,14 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { c.Lock() defer c.Unlock() - if c.be != nil { - c.version = clusterVersionFromBackend(c.lg, c.be) - c.members, c.removed = membersFromBackend(c.lg, c.be) - } else { + if c.v2store != nil { c.version = clusterVersionFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store) + // Mitigates backend becoming out of sync with v2store. Example: https://github.com/etcd-io/etcd/issues/13196. + c.syncBackendAndStore(c.members, c.removed) + } else { + c.version = clusterVersionFromBackend(c.lg, c.be) + c.members, c.removed = membersFromBackend(c.lg, c.be) } if c.be != nil { @@ -289,6 +293,19 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { } } +func (c *RaftCluster) syncBackendAndStore(members map[types.ID]*Member, removed map[types.ID]bool) { + err := TrimMembershipFromBackend(c.lg, c.be) + if err != nil { + c.lg.Panic("Failed to sync backend with v2store state", zap.Error(err)) + } + for _, m := range members { + unsafeSaveMemberToBackend(c.lg, c.be, m) + } + for mid := range removed { + unsafeDeleteMemberFromBackend(c.be, mid) + } +} + // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { @@ -381,11 +398,36 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() + + var v2Err, beErr error if c.v2store != nil { - mustSaveMemberToStore(c.lg, c.v2store, m) + v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m) + if v2Err != nil { + if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist { + c.lg.Panic( + "failed to save member to store", + zap.String("member-id", m.ID.String()), + zap.Error(v2Err), + ) + } + } } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, m) + beErr = unsafeSaveMemberToBackend(c.lg, c.be, m) + if beErr != nil && !errors.Is(beErr, memberAlreadyExistErr) { + c.lg.Panic( + "failed to save member to backend", + zap.String("member-id", m.ID.String()), + zap.Error(beErr), + ) + } + } + if v2Err != nil && (beErr != nil || c.be == nil) { + c.lg.Panic( + "failed to save member to store", + zap.String("member-id", m.ID.String()), + zap.Error(v2Err), + ) } c.members[m.ID] = m @@ -404,11 +446,35 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() + var v2Err, beErr error if c.v2store != nil { - mustDeleteMemberFromStore(c.lg, c.v2store, id) + v2Err = unsafeDeleteMemberFromStore(c.v2store, id) + if v2Err != nil { + if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound { + c.lg.Panic( + "failed to delete member from store", + zap.String("member-id", id.String()), + zap.Error(v2Err), + ) + } + } } if c.be != nil && shouldApplyV3 { - mustDeleteMemberFromBackend(c.be, id) + beErr = unsafeDeleteMemberFromBackend(c.be, id) + if beErr != nil && !errors.Is(beErr, memberNotFoundErr) { + c.lg.Panic( + "failed to delete member from backend", + zap.String("member-id", id.String()), + zap.Error(beErr), + ) + } + } + if v2Err != nil && (beErr != nil || c.be == nil) { + c.lg.Panic( + "failed to delete member from store", + zap.String("member-id", id.String()), + zap.Error(v2Err), + ) } m, ok := c.members[id] @@ -443,7 +509,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply mustUpdateMemberAttrInStore(c.lg, c.v2store, m) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, m) + unsafeSaveMemberToBackend(c.lg, c.be, m) } return } @@ -476,7 +542,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, c.members[id]) + unsafeSaveMemberToBackend(c.lg, c.be, c.members[id]) } c.lg.Info( @@ -495,7 +561,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, c.members[id]) + unsafeSaveMemberToBackend(c.lg, c.be, c.members[id]) } c.lg.Info( @@ -870,7 +936,7 @@ func (c *RaftCluster) PushMembershipToStorage() { if c.be != nil { TrimMembershipFromBackend(c.lg, c.be) for _, m := range c.members { - mustSaveMemberToBackend(c.lg, c.be, m) + unsafeSaveMemberToBackend(c.lg, c.be, m) } } if c.v2store != nil { diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index c033d4c2e578..f8ca19fa8f3f 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -1049,64 +1049,64 @@ func TestRecoverSyncsBackendAndStoreV2(t *testing.T) { storeV2Members: []*Member{alice}, storeV2MembersRemoved: []*Member{bob}, - expectMembers: []*Member{alice}, - expectMembersRemoved: []*Member{bob}, + expectMembers: []*Member{alice}, + expectMembersRemoved: []*Member{bob}, }, { - name: "v2store doesn't exist", - storeV2Nil: true, + name: "v2store doesn't exist", + storeV2Nil: true, backendMembers: []*Member{alice}, backendMembersRemoved: []*Member{bob}, - expectMembers: []*Member{alice}, - expectMembersRemoved: []*Member{bob}, + expectMembers: []*Member{alice}, + expectMembersRemoved: []*Member{bob}, }, { - name: "missing member add in backend", + name: "missing member add in backend", backendMembers: []*Member{alice}, storeV2Members: []*Member{alice, bob}, - expectMembers: []*Member{alice}, + expectMembers: []*Member{alice, bob}, }, { - name: "missing member add in storeV2", + name: "missing member add in storeV2", backendMembers: []*Member{alice, bob}, storeV2Members: []*Member{alice}, - expectMembers: []*Member{alice, bob}, + expectMembers: []*Member{alice}, }, { - name: "missing member remove in be", - backendMembers: []*Member{alice}, + name: "missing member remove in be", + backendMembers: []*Member{alice}, storeV2Members: []*Member{alice}, storeV2MembersRemoved: []*Member{bob}, - expectMembers: []*Member{alice}, + expectMembers: []*Member{alice}, + expectMembersRemoved: []*Member{bob}, }, { - name: "missing member remove in storev2", + name: "missing member remove in storev2", backendMembers: []*Member{alice}, backendMembersRemoved: []*Member{bob}, - storeV2Members: []*Member{alice}, + storeV2Members: []*Member{alice}, - expectMembers: []*Member{alice}, - expectMembersRemoved: []*Member{bob}, + expectMembers: []*Member{alice}, }, { - name: "All cases combined", + name: "All cases combined", backendMembers: []*Member{alice, bob}, backendMembersRemoved: []*Member{cecile, dan}, storeV2Members: []*Member{alice, cecile}, storeV2MembersRemoved: []*Member{bob, dan}, - expectMembers: []*Member{alice, bob}, - expectMembersRemoved: []*Member{cecile, dan}, + expectMembers: []*Member{alice, cecile}, + expectMembersRemoved: []*Member{bob, dan}, }, } for _, tc := range tcs { @@ -1121,17 +1121,17 @@ func TestRecoverSyncsBackendAndStoreV2(t *testing.T) { } for _, m := range tc.backendMembers { - mustSaveMemberToBackend(lg, be, m) + unsafeSaveMemberToBackend(lg, be, m) } for _, m := range tc.backendMembersRemoved { - mustDeleteMemberFromBackend(be, m.ID) + unsafeDeleteMemberFromBackend(be, m.ID) } for _, m := range tc.storeV2Members { mustSaveMemberToStore(lg, st, m) } for _, m := range tc.storeV2MembersRemoved { mustSaveMemberToStore(lg, st, m) - mustDeleteMemberFromStore(lg, st, m.ID) + unsafeDeleteMemberFromStore(st, m.ID) } be.ForceCommit() @@ -1176,9 +1176,8 @@ func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) { name: "Adding new member should succeed", }, { - name: "Adding member should fail if it was only in storeV2", + name: "Adding member should succeed if it was only in storeV2", storeV2Members: []*Member{alice}, - expectPanics: true, }, { name: "Adding member should succeed if it was only in backend", @@ -1220,7 +1219,7 @@ func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) { mustCreateBackendBuckets(be) st := v2store.New() for _, m := range tc.backendMembers { - mustSaveMemberToBackend(lg, be, m) + unsafeSaveMemberToBackend(lg, be, m) } be.ForceCommit() for _, m := range tc.storeV2Members { @@ -1244,6 +1243,11 @@ func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) { storeV2Members, _ := membersFromStore(lg, st) assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members) } + if !tc.backendNil { + be.ForceCommit() + beMembers, _ := mustReadMembersFromBackend(lg, be) + assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers) + } }) } } @@ -1260,7 +1264,8 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) { storeV2Members []*Member backendMembers []*Member - expectPanics bool + expectMembers []*Member + expectPanics bool }{ { name: "Removing new member should fail", @@ -1271,9 +1276,8 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) { storeV2Members: []*Member{alice}, }, { - name: "Removing member should fail if it was only in backend", + name: "Removing member should succeed if it was only in backend", backendMembers: []*Member{alice}, - expectPanics: true, }, { name: "Removing member should succeed if it exists in both", @@ -1310,7 +1314,7 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) { mustCreateBackendBuckets(be) st := v2store.New() for _, m := range tc.backendMembers { - mustSaveMemberToBackend(lg, be, m) + unsafeSaveMemberToBackend(lg, be, m) } be.ForceCommit() for _, m := range tc.storeV2Members { @@ -1334,6 +1338,12 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) { storeV2Members, _ := membersFromStore(lg, st) assert.Equal(t, map[types.ID]*Member{}, storeV2Members) } + if !tc.backendNil { + be.ForceCommit() + beMembers, beRemoved := mustReadMembersFromBackend(lg, be) + assert.Equal(t, map[types.ID]*Member{}, beMembers) + assert.Equal(t, map[types.ID]bool{alice.ID: true}, beRemoved) + } }) } } diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index 0bab3e42ed4b..77d5166de0a0 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -15,6 +15,7 @@ package membership import ( + "bytes" "encoding/json" "fmt" "path" @@ -39,9 +40,11 @@ const ( var ( StoreMembersPrefix = path.Join(storePrefix, "members") storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") + memberAlreadyExistErr = fmt.Errorf("member already exists") + memberNotFoundErr = fmt.Errorf("member not found") ) -func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { +func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error { mkey := backendMemberKey(m.ID) mvalue, err := json.Marshal(m) if err != nil { @@ -51,7 +54,11 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { tx := be.BatchTx() tx.Lock() defer tx.Unlock() + if unsafeMemberExists(tx, mkey) { + return memberAlreadyExistErr + } tx.UnsafePut(buckets.Members, mkey, mvalue) + return nil } // TrimClusterFromBackend removes all information about cluster (versions) @@ -64,14 +71,29 @@ func TrimClusterFromBackend(be backend.Backend) error { return nil } -func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { +func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error { mkey := backendMemberKey(id) tx := be.BatchTx() tx.Lock() defer tx.Unlock() - tx.UnsafeDelete(buckets.Members, mkey) tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) + if !unsafeMemberExists(tx, mkey) { + return memberNotFoundErr + } + tx.UnsafeDelete(buckets.Members, mkey) + return nil +} + +func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool { + var found bool + tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { + if bytes.Equal(k, mkey) { + found = true + } + return nil + }) + return found } func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { @@ -182,35 +204,34 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D } func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { - b, err := json.Marshal(m.RaftAttributes) + err := unsafeSaveMemberToStore(lg, s, m) if err != nil { - lg.Panic("failed to marshal raftAttributes", zap.Error(err)) - } - p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) - if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { lg.Panic( "failed to save member to store", - zap.String("path", p), + zap.String("member-id", m.ID.String()), zap.Error(err), ) } } -func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { +func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + lg.Panic("failed to marshal raftAttributes", zap.Error(err)) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + _, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) + return err +} + +func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error { if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { - lg.Panic( - "failed to delete member from store", - zap.String("path", MemberStoreKey(id)), - zap.Error(err), - ) + return err } if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to create removedMember", - zap.String("path", RemovedMemberStoreKey(id)), - zap.Error(err), - ) + return err } + return nil } func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {