From 250f708e8f1ae3d57a6bb9dfef2923c1eb875bbb Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 10 Jan 2025 17:50:56 +0000 Subject: [PATCH] Create a v2 snapshot when running etcdutl migrate command Signed-off-by: Benjamin Wang --- etcdutl/ctl.go | 1 + etcdutl/etcdutl/common.go | 120 ++++++++++++++++++++++++++ etcdutl/etcdutl/migrate_command.go | 86 +++++++++--------- etcdutl/etcdutl/v2snapshot_command.go | 61 +++++++++++++ 4 files changed, 226 insertions(+), 42 deletions(-) create mode 100644 etcdutl/etcdutl/v2snapshot_command.go diff --git a/etcdutl/ctl.go b/etcdutl/ctl.go index 8418729bfd1..72a8223eacc 100644 --- a/etcdutl/ctl.go +++ b/etcdutl/ctl.go @@ -45,6 +45,7 @@ func init() { etcdutl.NewVersionCommand(), etcdutl.NewCompletionCommand(), etcdutl.NewMigrateCommand(), + etcdutl.NewV2SnapshotCommand(), // TODO: remove in 3.8 ) } diff --git a/etcdutl/etcdutl/common.go b/etcdutl/etcdutl/common.go index d54827d0457..7fabdb2fc58 100644 --- a/etcdutl/etcdutl/common.go +++ b/etcdutl/etcdutl/common.go @@ -15,11 +15,24 @@ package etcdutl import ( + "errors" + "fmt" + + "github.com/coreos/go-semver/semver" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/schema" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/raft/v3/raftpb" ) func GetLogger() *zap.Logger { @@ -32,3 +45,110 @@ func GetLogger() *zap.Logger { } return lg } + +func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) { + snapshot, err := getLatestV2Snapshot(lg, dataDir) + if err != nil { + return walpb.Snapshot{}, err + } + + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + return walsnap, nil +} + +func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, error) { + walPath := datadir.ToWALDir(dataDir) + walSnaps, err := wal.ValidSnapshotEntries(lg, walPath) + if err != nil { + return nil, err + } + + ss := snap.New(lg, datadir.ToSnapDir(dataDir)) + snapshot, err := ss.LoadNewestAvailable(walSnaps) + if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { + return nil, err + } + + return snapshot, nil +} + +func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error { + var ( + lg = GetLogger() + + snapDir = datadir.ToSnapDir(dataDir) + walDir = datadir.ToWALDir(dataDir) + ) + + ci, term := schema.ReadConsistentIndex(be.ReadTx()) + + cl := membership.NewCluster(lg) + cl.SetBackend(schema.NewMembershipBackend(lg, be)) + cl.Recover(func(*zap.Logger, *semver.Version) {}) + + latestWALSnap, err := getLatestWALSnap(lg, dataDir) + if err != nil { + return err + } + + // Each time before creating the v2 snapshot, etcdserve always flush + // the backend storage (bbolt db), so the consistent index should never + // less than the Index or term of the latest snapshot. + if ci < latestWALSnap.Index || term < latestWALSnap.Term { + // This should never happen + return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWalSnap.Index, latestWalSnap.Term) + } + + voters, learners := getVotersAndLearners(cl) + confState := raftpb.ConfState{ + Voters: voters, + Learners: learners, + } + + // create the v2 snaspshot file + raftSnap := raftpb.Snapshot{ + Data: etcdserver.GetMembershipInfoInV2Format(lg, cl), + Metadata: raftpb.SnapshotMetadata{ + Index: ci, + Term: term, + ConfState: confState, + }, + } + sn := snap.New(lg, snapDir) + if err := sn.SaveSnap(raftSnap); err != nil { + return err + } + + // save WAL snapshot record + w, err := wal.Open(lg, walDir, latestWalSnap) + if err != nil { + return err + } + defer w.Close() + // We must read all records to locate the tail of the last valid WAL file. + if _, _, _, err = w.ReadAll(); err != nil { + return err + } + + return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}) +} + +func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) { + var ( + voters []uint64 + learners []uint64 + ) + for _, m := range cl.Members() { + if m.IsLearner { + learners = append(learners, uint64(m.ID)) + continue + } + + voters = append(voters, uint64(m.ID)) + } + + return voters, learners +} diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 7f435b80a28..cda07e1f977 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -15,7 +15,6 @@ package etcdutl import ( - "errors" "fmt" "strings" @@ -25,12 +24,10 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/pkg/v3/cobrautl" - "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" - "go.etcd.io/etcd/server/v3/storage/wal/walpb" ) // NewMigrateCommand prints out the version of etcd. @@ -77,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) { func (o *migrateOptions) Config() (*migrateConfig, error) { c := &migrateConfig{ - force: o.force, - lg: GetLogger(), + force: o.force, + dataDir: o.dataDir, + lg: GetLogger(), } var err error dotCount := strings.Count(o.targetVersion, ".") @@ -93,67 +91,69 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion)) } - dbPath := datadir.ToBackendFileName(o.dataDir) - c.be = backend.NewDefaultBackend(GetLogger(), dbPath) + return c, nil +} - walPath := datadir.ToWALDir(o.dataDir) - walSnap, err := getLatestWALSnap(c.lg, o.dataDir) +type migrateConfig struct { + lg *zap.Logger + be backend.Backend + targetVersion *semver.Version + walVersion schema.WALVersion + dataDir string + force bool +} + +func (c *migrateConfig) finalize() error { + walPath := datadir.ToWALDir(c.dataDir) + walSnap, err := getLatestWALSnap(c.lg, c.dataDir) if err != nil { - return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err) + return fmt.Errorf("failed to get the lastest snapshot: %w", err) } w, err := wal.OpenForRead(c.lg, walPath, walSnap) if err != nil { - return nil, fmt.Errorf(`failed to open wal: %w`, err) + return fmt.Errorf(`failed to open wal: %w`, err) } defer w.Close() c.walVersion, err = wal.ReadWALVersion(w) if err != nil { - return nil, fmt.Errorf(`failed to read wal: %w`, err) - } - - return c, nil -} - -func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) { - walPath := datadir.ToWALDir(dataDir) - walSnaps, err := wal.ValidSnapshotEntries(lg, walPath) - if err != nil { - return walpb.Snapshot{}, err - } - - ss := snap.New(lg, datadir.ToSnapDir(dataDir)) - snapshot, err := ss.LoadNewestAvailable(walSnaps) - if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { - return walpb.Snapshot{}, err - } - - var walsnap walpb.Snapshot - if snapshot != nil { - walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + return fmt.Errorf(`failed to read wal: %w`, err) } - return walsnap, nil -} -type migrateConfig struct { - lg *zap.Logger - be backend.Backend - targetVersion *semver.Version - walVersion schema.WALVersion - force bool + return nil } func migrateCommandFunc(c *migrateConfig) error { + dbPath := datadir.ToBackendFileName(c.dataDir) + c.be = backend.NewDefaultBackend(GetLogger(), dbPath) defer c.be.Close() + tx := c.be.BatchTx() current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { - c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") + c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err)) return err } if current == *c.targetVersion { c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } + + // Update cluster version + be := schema.NewMembershipBackend(c.lg, c.be) + be.MustSaveClusterVersionToBackend(c.targetVersion) + + // forcibly create a v2 snapshot file + // TODO: remove in 3.8 + if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil { + c.lg.Error("Failed to create v2 snapshot file", zap.Error(err)) + return err + } + + if err = c.finalize(); err != nil { + c.lg.Error("Failed to finalize config", zap.Error(err)) + return err + } + err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion) if err != nil { if !c.force { @@ -162,7 +162,9 @@ func migrateCommandFunc(c *migrateConfig) error { c.lg.Info("normal migrate failed, trying with force", zap.Error(err)) migrateForce(c.lg, tx, c.targetVersion) } + c.be.ForceCommit() + return nil } diff --git a/etcdutl/etcdutl/v2snapshot_command.go b/etcdutl/etcdutl/v2snapshot_command.go new file mode 100644 index 00000000000..0084c5d0afd --- /dev/null +++ b/etcdutl/etcdutl/v2snapshot_command.go @@ -0,0 +1,61 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutl + +import ( + "fmt" + "path/filepath" + + "github.com/spf13/cobra" + + "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/datadir" +) + +var createV2SnapDataDir string + +// NewV2SnapshotCommand returns the cobra command for "v2snapshot". +// TODO: remove the command in 3.8 +func NewV2SnapshotCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "v2snapshot ", + Short: "Manages etcd node v2snapshots", + } + cmd.AddCommand(newV2SnapshotCreateCommand()) + return cmd +} + +func newV2SnapshotCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create", + Short: "Create a new v2 snapshot file", + Run: v2SnapshotCreateFunc, + } + cmd.Flags().StringVar(&createV2SnapDataDir, "data-dir", "", "Required. Path to the etcd data directory.") + cmd.MarkFlagRequired("data-dir") + cmd.MarkFlagDirname("data-dir") + return cmd +} + +func v2SnapshotCreateFunc(_ *cobra.Command, _ []string) { + be := backend.NewDefaultBackend(GetLogger(), filepath.Join(datadir.ToSnapDir(createV2SnapDataDir), "db")) + defer be.Close() + + if err := createV2SnapshotFromV3Store(createV2SnapDataDir, be); err != nil { + cobrautl.ExitWithError(cobrautl.ExitError, err) + } + fmt.Println("Created a v2 snapshot file.") +}