Skip to content

Commit

Permalink
server: Ensure that cluster members stored in v2store and backend are…
Browse files Browse the repository at this point in the history
… in sync
  • Loading branch information
serathius committed Sep 14, 2021
1 parent 2fe94b1 commit 7ae97db
Show file tree
Hide file tree
Showing 3 changed files with 441 additions and 34 deletions.
92 changes: 80 additions & 12 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"path"
"sort"
Expand All @@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
Expand Down Expand Up @@ -254,12 +256,14 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
} else {
if c.v2store != nil {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
// Mitigates case where they can go out of sync. Example: https://github.com/etcd-io/etcd/issues/13196.
c.syncBackend(c.members, c.removed)
} else {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
}

if c.be != nil {
Expand Down Expand Up @@ -289,6 +293,19 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
}
}

func (c *RaftCluster) syncBackend(members map[types.ID]*Member, removed map[types.ID]bool) {
err := TrimMembershipFromBackend(c.lg, c.be)
if err != nil {
c.lg.Panic("Failed to sync backend with v2store state", zap.Error(err))
}
for _, m := range members {
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
for mid := range removed {
unsafeDeleteMemberFromBackend(c.be, mid)
}
}

// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
Expand Down Expand Up @@ -381,11 +398,37 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()

var v2Err, beErr error
if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
c.lg.Panic(
"failed to save member to backend",
zap.String("member-id", m.ID.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member already exist.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}

c.members[m.ID] = m
Expand All @@ -404,11 +447,36 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil {
mustDeleteMemberFromStore(c.lg, c.v2store, id)
v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id)
beErr = unsafeDeleteMemberFromBackend(c.be, id)
if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
c.lg.Panic(
"failed to delete member from backend",
zap.String("member-id", id.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member not found.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}

m, ok := c.members[id]
Expand Down Expand Up @@ -443,7 +511,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
return
}
Expand Down Expand Up @@ -476,7 +544,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}

c.lg.Info(
Expand All @@ -495,7 +563,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}

c.lg.Info(
Expand Down Expand Up @@ -870,7 +938,7 @@ func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
}
if c.v2store != nil {
Expand Down
Loading

0 comments on commit 7ae97db

Please sign in to comment.