Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Snapshot after cluster version downgrade #13686

Merged
merged 1 commit into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,20 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
}

func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3)
prevVersion := a.s.Cluster().Version()
newVersion := semver.Must(semver.NewVersion(r.Ver))
a.s.cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3)
// Force snapshot after cluster version downgrade.
if prevVersion != nil && newVersion.LessThan(*prevVersion) {
lg := a.s.Logger()
if lg != nil {
lg.Info("Cluster version downgrade detected, forcing snapshot",
zap.String("prev-cluster-version", prevVersion.String()),
zap.String("new-cluster-version", newVersion.String()),
)
}
a.s.forceSnapshot = true
}
}

func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
Expand Down
12 changes: 10 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ type EtcdServer struct {
clusterVersionChanged *notify.Notifier

*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
}

// NewServer creates a new EtcdServer from the supplied configuration. The
Expand Down Expand Up @@ -1079,23 +1082,28 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
if !s.shouldSnapshot(ep) {
return
}

lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.ID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceSnapshot),
)
s.forceSnapshot = false

s.snapshot(ep.appliedi, ep.confState)
ep.snapi = ep.appliedi
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
}
Expand Down
13 changes: 0 additions & 13 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package e2e
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -66,8 +65,6 @@ func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessClust
ClusterSize: 1,
InitialToken: "new",
KeepDataDir: true,
// TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l
SnapshotCount: 5,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
Expand All @@ -77,16 +74,6 @@ func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessClust
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

prefixArgs := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")}
t.Log("Write keys to ensure wal snapshot is created so cluster version set is snapshotted")
e2e.ExecuteWithTimeout(t, 20*time.Second, func() {
for i := 0; i < 10; i++ {
if err := e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), "OK"); err != nil {
t.Fatal(err)
}
}
})
return epc
}

Expand Down