From df49a510decc197591f09e81dfd494555bc10dc7 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 10 Jan 2025 17:50:56 +0000 Subject: [PATCH] Create a v2 snapshot when running etcdutl migrate command Also added test to cover the etcdutl migrate command Signed-off-by: Benjamin Wang --- etcdutl/etcdutl/common.go | 83 +++++++++++++++++++++ etcdutl/etcdutl/migrate_command.go | 67 ++++++++++++----- server/etcdserver/api/membership/cluster.go | 16 ++-- tests/e2e/utl_migrate_test.go | 55 ++++++++++++-- 4 files changed, 192 insertions(+), 29 deletions(-) diff --git a/etcdutl/etcdutl/common.go b/etcdutl/etcdutl/common.go index c7473cc7fd7b..be6fd5f61784 100644 --- a/etcdutl/etcdutl/common.go +++ b/etcdutl/etcdutl/common.go @@ -16,14 +16,19 @@ package etcdutl import ( "errors" + "fmt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.etcd.io/raft/v3/raftpb" @@ -68,3 +73,81 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro return snapshot, nil } + +func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error { + var ( + lg = GetLogger() + + snapDir = datadir.ToSnapDir(dataDir) + walDir = datadir.ToWALDir(dataDir) + ) + + ci, term := schema.ReadConsistentIndex(be.ReadTx()) + + cl := membership.NewCluster(lg) + cl.SetBackend(schema.NewMembershipBackend(lg, be)) + cl.UnsafeLoad() + + latestWALSnap, err := getLatestWALSnap(lg, dataDir) + if err != nil { + return err + } + + // Each time before creating the v2 snapshot, etcdserve always flush + // the backend storage (bbolt db), so the consistent index should never + // less than the Index or term of the latest snapshot. + if ci < latestWALSnap.Index || term < latestWALSnap.Term { + // This should never happen + return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term) + } + + voters, learners := getVotersAndLearners(cl) + confState := raftpb.ConfState{ + Voters: voters, + Learners: learners, + } + + // create the v2 snaspshot file + raftSnap := raftpb.Snapshot{ + Data: etcdserver.GetMembershipInfoInV2Format(lg, cl), + Metadata: raftpb.SnapshotMetadata{ + Index: ci, + Term: term, + ConfState: confState, + }, + } + sn := snap.New(lg, snapDir) + if err = sn.SaveSnap(raftSnap); err != nil { + return err + } + + // save WAL snapshot record + w, err := wal.Open(lg, walDir, latestWALSnap) + if err != nil { + return err + } + defer w.Close() + // We must read all records to locate the tail of the last valid WAL file. + if _, _, _, err = w.ReadAll(); err != nil { + return err + } + + return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}) +} + +func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) { + var ( + voters []uint64 + learners []uint64 + ) + for _, m := range cl.Members() { + if m.IsLearner { + learners = append(learners, uint64(m.ID)) + continue + } + + voters = append(voters, uint64(m.ID)) + } + + return voters, learners +} diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index a265c7baae76..524b6e25169a 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -74,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) { func (o *migrateOptions) Config() (*migrateConfig, error) { c := &migrateConfig{ - force: o.force, - lg: GetLogger(), + force: o.force, + dataDir: o.dataDir, + lg: GetLogger(), } var err error dotCount := strings.Count(o.targetVersion, ".") @@ -90,47 +91,73 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion)) } - dbPath := datadir.ToBackendFileName(o.dataDir) - c.be = backend.NewDefaultBackend(GetLogger(), dbPath) + return c, nil +} + +type migrateConfig struct { + lg *zap.Logger + be backend.Backend + targetVersion *semver.Version + walVersion schema.WALVersion + dataDir string + force bool +} - walPath := datadir.ToWALDir(o.dataDir) - walSnap, err := getLatestWALSnap(c.lg, o.dataDir) +func (c *migrateConfig) finalize() error { + walPath := datadir.ToWALDir(c.dataDir) + walSnap, err := getLatestWALSnap(c.lg, c.dataDir) if err != nil { - return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err) + return fmt.Errorf("failed to get the lastest snapshot: %w", err) } w, err := wal.OpenForRead(c.lg, walPath, walSnap) if err != nil { - return nil, fmt.Errorf(`failed to open wal: %w`, err) + return fmt.Errorf(`failed to open wal: %w`, err) } defer w.Close() c.walVersion, err = wal.ReadWALVersion(w) if err != nil { - return nil, fmt.Errorf(`failed to read wal: %w`, err) + return fmt.Errorf(`failed to read wal: %w`, err) } - return c, nil -} - -type migrateConfig struct { - lg *zap.Logger - be backend.Backend - targetVersion *semver.Version - walVersion schema.WALVersion - force bool + return nil } func migrateCommandFunc(c *migrateConfig) error { + dbPath := datadir.ToBackendFileName(c.dataDir) + c.be = backend.NewDefaultBackend(GetLogger(), dbPath) defer c.be.Close() + tx := c.be.BatchTx() current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { - c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") + c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err)) return err } if current == *c.targetVersion { c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } + + // only generate a v2 snapshot file for downgrade case + if c.targetVersion.LessThan(current) { + // Update cluster version + be := schema.NewMembershipBackend(c.lg, c.be) + be.MustSaveClusterVersionToBackend(c.targetVersion) + + // forcibly create a v2 snapshot file + // TODO: remove in 3.8 + if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil { + c.lg.Error("Failed to create v2 snapshot file", zap.Error(err)) + return err + } + c.lg.Info("Generated a v2 snapshot file") + } + + if err = c.finalize(); err != nil { + c.lg.Error("Failed to finalize config", zap.Error(err)) + return err + } + err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion) if err != nil { if !c.force { @@ -139,7 +166,9 @@ func migrateCommandFunc(c *migrateConfig) error { c.lg.Info("normal migrate failed, trying with force", zap.Error(err)) migrateForce(c.lg, tx, c.targetVersion) } + c.be.ForceCommit() + return nil } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 6539b977d233..c40a65c8a0af 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -256,10 +256,7 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) { c.versionChanged = n } -func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { - c.Lock() - defer c.Unlock() - +func (c *RaftCluster) UnsafeLoad() { if c.be != nil { c.version = c.be.ClusterVersionFromBackend() c.members, c.removed = c.be.MustReadMembersFromBackend() @@ -267,11 +264,20 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { c.version = clusterVersionFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store) } - c.buildMembershipMetric() if c.be != nil { c.downgradeInfo = c.be.DowngradeInfoFromBackend() } +} + +func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { + c.Lock() + defer c.Unlock() + + c.UnsafeLoad() + + c.buildMembershipMetric() + sv := semver.Must(semver.NewVersion(version.Version)) if c.downgradeInfo != nil && c.downgradeInfo.Enabled { c.lg.Info( diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index 5ee933f0ef28..cee3fb87390c 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -49,6 +49,7 @@ func TestEtctlutlMigrate(t *testing.T) { expectLogsSubString string expectStorageVersion *semver.Version + expectTargetBinary string }{ { name: "Invalid target version string", @@ -81,23 +82,25 @@ func TestEtctlutlMigrate(t *testing.T) { { name: "Migrate v3.5 to v3.5 is no-op", clusterVersion: e2e.LastVersion, - clusterSize: 1, targetVersion: "3.5", + clusterSize: 1, expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`, }, { name: "Upgrade 1 member cluster from v3.5 to v3.6 should work", clusterVersion: e2e.LastVersion, - clusterSize: 1, targetVersion: "3.6", + clusterSize: 1, expectStorageVersion: &version.V3_6, + expectTargetBinary: e2e.BinPath.Etcd, }, { name: "Upgrade 3 member cluster from v3.5 to v3.6 should work", clusterVersion: e2e.LastVersion, - clusterSize: 3, targetVersion: "3.6", + clusterSize: 3, expectStorageVersion: &version.V3_6, + expectTargetBinary: e2e.BinPath.Etcd, }, { name: "Migrate v3.6 to v3.6 is no-op", @@ -112,6 +115,7 @@ func TestEtctlutlMigrate(t *testing.T) { clusterSize: 1, expectLogsSubString: "updated storage version", expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil. + expectTargetBinary: e2e.BinPath.EtcdLastRelease, }, { name: "Downgrade 3 member cluster from v3.6 to v3.5 should work", @@ -119,6 +123,7 @@ func TestEtctlutlMigrate(t *testing.T) { clusterSize: 3, expectLogsSubString: "updated storage version", expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil. + expectTargetBinary: e2e.BinPath.EtcdLastRelease, }, { name: "Upgrade v3.6 to v3.7 with force should work", @@ -141,7 +146,7 @@ func TestEtctlutlMigrate(t *testing.T) { epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithVersion(tc.clusterVersion), e2e.WithDataDirPath(dataDirPath), - e2e.WithClusterSize(1), + e2e.WithClusterSize(tc.clusterSize), e2e.WithKeepDataDir(true), // Set low SnapshotCount to ensure wal snapshot is done e2e.WithSnapshotCount(1), @@ -163,7 +168,7 @@ func TestEtctlutlMigrate(t *testing.T) { require.NoError(t, e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), expect.ExpectedResponse{Value: "OK"})) } - t.Log("Stopping the the members") + t.Log("Stopping all the servers") for i := 0; i < len(epc.Procs); i++ { t.Logf("Stopping server %d: %v", i, epc.Procs[i].EndpointsGRPC()) err = epc.Procs[i].Stop() @@ -190,6 +195,46 @@ func TestEtctlutlMigrate(t *testing.T) { assert.Equal(t, tc.expectStorageVersion, ver) be.Close() } + + if len(tc.expectTargetBinary) == 0 || !fileutil.Exist(tc.expectTargetBinary) { + return + } + + t.Log("Start all members with new binary") + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Replace binary for member %d: %v", i, epc.Procs[i].EndpointsGRPC()) + member := epc.Procs[i] + member.Config().ExecPath = tc.expectTargetBinary + } + require.NoError(t, epc.Start(context.TODO())) + + t.Log("Verify the versions of all members") + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Verify the version of member %d: %v", i, epc.Procs[i].EndpointsGRPC()) + expectedVersion := tc.expectStorageVersion + if expectedVersion == nil { + expectedVersion = &version.V3_5 + } + + verifyVersion(t, epc, epc.Procs[i], expectedVersion, expectedVersion) + } }) } } + +func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedServerVersion, expectedClusterVersion *semver.Version) error { + var err error + expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedServerVersion.Major, expectedServerVersion.Minor, expectedClusterVersion.Major, expectedClusterVersion.Minor) + for i := 0; i < 35; i++ { + if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil { + t.Logf("#%d: v3 is not ready yet (%v)", i, err) + time.Sleep(200 * time.Millisecond) + continue + } + break + } + if err != nil { + return fmt.Errorf("failed to verify version, expected %v got (%w)", expected, err) + } + return nil +}