From 0f9a53caf66f81509f3abe73562fec43691e7dc7 Mon Sep 17 00:00:00 2001 From: Fatih USTA Date: Mon, 12 Jun 2023 15:19:24 +0300 Subject: [PATCH] etcdutl: Fix snapshot restore memory alloc issue When running the snapshot command, allow receiving an initial memory map allocation for the database, avoiding future memory allocation issues. Signed-off-by: Fatih USTA --- etcdutl/etcdutl/migrate_command.go | 2 +- etcdutl/etcdutl/snapshot_command.go | 6 ++++- etcdutl/snapshot/v3_snapshot.go | 23 ++++++++++++++----- server/config/config.go | 1 + server/etcdserver/cindex/cindex_test.go | 4 ++-- server/storage/backend/backend.go | 14 ++++++++--- server/storage/mvcc/store_test.go | 4 ++-- server/storage/schema/auth_roles_test.go | 4 ++-- server/storage/schema/auth_test.go | 4 ++-- server/storage/schema/auth_users_test.go | 4 ++-- server/storage/schema/lease_test.go | 2 +- server/storage/schema/schema_test.go | 6 ++--- server/storage/schema/version_test.go | 2 +- server/verify/verify.go | 3 +-- tests/e2e/utl_migrate_test.go | 2 +- .../integration/clientv3/maintenance_test.go | 2 +- tests/integration/v3_alarm_test.go | 2 +- tools/etcd-dump-db/backend.go | 2 +- 18 files changed, 55 insertions(+), 32 deletions(-) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 521cf8ba80c8..45ea407e48b7 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -92,7 +92,7 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { } dbPath := datadir.ToBackendFileName(o.dataDir) - c.be = backend.NewDefaultBackend(GetLogger(), dbPath) + c.be = backend.NewDefaultBackend(backend.BackendConfig{Logger: GetLogger(), Path: dbPath}) walPath := datadir.ToWalDir(o.dataDir) w, err := wal.OpenForRead(c.lg, walPath, walpb.Snapshot{}) diff --git a/etcdutl/etcdutl/snapshot_command.go b/etcdutl/etcdutl/snapshot_command.go index 9e557495e5c0..d9ae1781fc17 100644 --- a/etcdutl/etcdutl/snapshot_command.go +++ b/etcdutl/etcdutl/snapshot_command.go @@ -38,6 +38,7 @@ var ( restorePeerURLs string restoreName string skipHashCheck bool + initialMmapSize = uint64(10 * 1024 * 1024 * 1024) markCompacted bool revisionBump uint64 ) @@ -77,6 +78,7 @@ func NewSnapshotRestoreCommand() *cobra.Command { cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster") cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member") cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)") + cmd.Flags().Uint64Var(&initialMmapSize, "initial-memory-map-size", initialMmapSize, "Initial memory map allocation size(bytes) for the DB. if not defined, it uses the default. Also 0 means use default") cmd.Flags().Uint64Var(&revisionBump, "bump-revision", 0, "How much to increase the latest revision after restore") cmd.Flags().BoolVar(&markCompacted, "mark-compacted", false, "Mark the latest revision after restore as the point of scheduled compaction (required if --bump-revision > 0, disallowed otherwise)") @@ -104,7 +106,7 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) { func snapshotRestoreCommandFunc(_ *cobra.Command, args []string) { SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir, - restorePeerURLs, restoreName, skipHashCheck, revisionBump, markCompacted, args) + restorePeerURLs, restoreName, skipHashCheck, initialMmapSize, revisionBump, markCompacted, args) } func SnapshotRestoreCommandFunc(restoreCluster string, @@ -114,6 +116,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string, restorePeerURLs string, restoreName string, skipHashCheck bool, + initialMmapSize uint64, revisionBump uint64, markCompacted bool, args []string) { @@ -149,6 +152,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string, InitialCluster: restoreCluster, InitialClusterToken: restoreClusterToken, SkipHashCheck: skipHashCheck, + InitialMmapSize: initialMmapSize, RevisionBump: revisionBump, MarkCompacted: markCompacted, }); err != nil { diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 64d16acd75c8..94b6caa3f3c3 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -38,6 +38,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -83,7 +84,8 @@ type v3Manager struct { snapDir string cl *membership.RaftCluster - skipHashCheck bool + skipHashCheck bool + initialMmapSize uint64 } // hasChecksum returns "true" if the file size "n" @@ -204,6 +206,9 @@ type RestoreConfig struct { // (required if copied from data directory). SkipHashCheck bool + // Initial memory map size for the DB + InitialMmapSize uint64 + // RevisionBump is the amount to increase the latest revision after restore, // to allow administrators to trick clients into thinking that revision never decreased. // If 0, revision bumping is skipped. @@ -233,6 +238,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { PeerURLs: pURLs, InitialPeerURLsMap: ics, InitialClusterToken: cfg.InitialClusterToken, + InitialMmapSize: cfg.InitialMmapSize, } if err = srv.VerifyBootstrap(); err != nil { return err @@ -263,6 +269,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { s.walDir = walDir s.snapDir = filepath.Join(dataDir, "member", "snap") s.skipHashCheck = cfg.SkipHashCheck + s.initialMmapSize = cfg.InitialMmapSize s.lg.Info( "restoring snapshot", @@ -270,6 +277,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), + zap.Uint64("initial-memory-map-size", s.initialMmapSize), ) if err = s.saveDB(); err != nil { @@ -297,6 +305,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), + zap.Uint64("initial-memory-map-size", s.initialMmapSize), ) return verify.VerifyIfEnabled(verify.Config{ @@ -317,7 +326,7 @@ func (s *v3Manager) saveDB() error { return err } - be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize}) defer be.Close() err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend() @@ -331,7 +340,7 @@ func (s *v3Manager) saveDB() error { // modifyLatestRevision can increase the latest revision by the given amount and sets the scheduled compaction // to that revision so that the server will consider this revision compacted. func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error { - be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath()}) defer func() { be.ForceCommit() be.Close() @@ -471,8 +480,10 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { return nil, err } - // add members again to persist them to the backend we create. - be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + // add members again to persist them to the store we create. + st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) + s.cl.SetStore(st) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize}) defer be.Close() s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be)) for _, m := range s.cl.Members() { @@ -551,7 +562,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { } func (s *v3Manager) updateCIndex(commit uint64, term uint64) error { - be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize}) defer be.Close() cindex.UpdateConsistentIndexForce(be.BatchTx(), commit, term) diff --git a/server/config/config.go b/server/config/config.go index af8604a621a4..3b59fb054961 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -119,6 +119,7 @@ type ServerConfig struct { CompactionBatchLimit int CompactionSleepInterval time.Duration QuotaBackendBytes int64 + InitialMmapSize uint64 MaxTxnOps uint // MaxRequestBytes is the maximum request size to send over raft. diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index a056ac3d759c..2f78daf0f1d5 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -56,7 +56,7 @@ func TestConsistentIndex(t *testing.T) { be.ForceCommit() be.Close() - b := backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: zaptest.NewLogger(t), Path: tmpPath}) defer b.Close() ci.SetBackend(b) index = ci.ConsistentIndex() @@ -108,7 +108,7 @@ func TestConsistentIndexDecrease(t *testing.T) { be.ForceCommit() be.Close() - be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath) + be = backend.NewDefaultBackend(backend.BackendConfig{Logger: zaptest.NewLogger(t), Path: tmpPath}) defer be.Close() ci := NewConsistentIndex(be) ci.SetConsistentIndex(tc.index, tc.term) diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index 7aa4f846987f..7402edbdf0f7 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -164,9 +164,17 @@ func New(bcfg BackendConfig) Backend { return newBackend(bcfg) } -func NewDefaultBackend(lg *zap.Logger, path string) Backend { - bcfg := DefaultBackendConfig(lg) - bcfg.Path = path +func NewDefaultBackend(bc BackendConfig) Backend { + + bcfg := DefaultBackendConfig(bc.Logger) + bcfg.Path = bc.Path + + if bc.MmapSize > 0 { + bcfg.MmapSize = bc.MmapSize + } else { + bcfg.MmapSize = initialMmapSize + } + return newBackend(bcfg) } diff --git a/server/storage/mvcc/store_test.go b/server/storage/mvcc/store_test.go index bd6d25e171fa..3abcb0c76b8a 100644 --- a/server/storage/mvcc/store_test.go +++ b/server/storage/mvcc/store_test.go @@ -61,7 +61,7 @@ func TestScheduledCompact(t *testing.T) { be.ForceCommit() be.Close() - b := backend.NewDefaultBackend(lg, tmpPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer b.Close() v, found := UnsafeReadScheduledCompact(b.BatchTx()) assert.Equal(t, true, found) @@ -103,7 +103,7 @@ func TestFinishedCompact(t *testing.T) { be.ForceCommit() be.Close() - b := backend.NewDefaultBackend(lg, tmpPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer b.Close() v, found := UnsafeReadFinishedCompact(b.BatchTx()) assert.Equal(t, true, found) diff --git a/server/storage/schema/auth_roles_test.go b/server/storage/schema/auth_roles_test.go index d7323c6f4a6e..d86babc0708c 100644 --- a/server/storage/schema/auth_roles_test.go +++ b/server/storage/schema/auth_roles_test.go @@ -122,7 +122,7 @@ func TestGetAllRoles(t *testing.T) { abe.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() abe2 := NewAuthBackend(lg, be2) users := abe2.GetAllRoles() @@ -219,7 +219,7 @@ func TestGetRole(t *testing.T) { abe.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() abe2 := NewAuthBackend(lg, be2) users := abe2.GetRole("role1") diff --git a/server/storage/schema/auth_test.go b/server/storage/schema/auth_test.go index 96174e50ffc6..0f1277297165 100644 --- a/server/storage/schema/auth_test.go +++ b/server/storage/schema/auth_test.go @@ -66,7 +66,7 @@ func TestAuthEnabled(t *testing.T) { abe.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() abe2 := NewAuthBackend(lg, be2) tx = abe2.BatchTx() @@ -117,7 +117,7 @@ func TestAuthRevision(t *testing.T) { abe.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() abe2 := NewAuthBackend(lg, be2) tx := abe2.BatchTx() diff --git a/server/storage/schema/auth_users_test.go b/server/storage/schema/auth_users_test.go index 2261e57071b0..5911b795075d 100644 --- a/server/storage/schema/auth_users_test.go +++ b/server/storage/schema/auth_users_test.go @@ -110,7 +110,7 @@ func TestGetAllUsers(t *testing.T) { abe.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() abe2 := NewAuthBackend(lg, be2) users := abe2.ReadTx().UnsafeGetAllUsers() @@ -195,7 +195,7 @@ func TestGetUser(t *testing.T) { abe.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() abe2 := NewAuthBackend(lg, be2) users := abe2.GetUser("alice") diff --git a/server/storage/schema/lease_test.go b/server/storage/schema/lease_test.go index 88a1cd7e0633..68ffab204670 100644 --- a/server/storage/schema/lease_test.go +++ b/server/storage/schema/lease_test.go @@ -99,7 +99,7 @@ func TestLeaseBackend(t *testing.T) { be.ForceCommit() be.Close() - be2 := backend.NewDefaultBackend(lg, tmpPath) + be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer be2.Close() leases := MustUnsafeGetAllLeases(be2.ReadTx()) diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 3d9bd1065c3a..312286c4959d 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -83,7 +83,7 @@ func TestValidate(t *testing.T) { lg := zap.NewNop() dataPath := setupBackendData(t, tc.version, tc.overrideKeys) - b := backend.NewDefaultBackend(lg, dataPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath}) defer b.Close() err := Validate(lg, b.ReadTx()) if (err != nil) != tc.expectError { @@ -211,7 +211,7 @@ func TestMigrate(t *testing.T) { t.Fatal(err) } - b := backend.NewDefaultBackend(lg, dataPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath}) defer b.Close() err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion) @@ -255,7 +255,7 @@ func TestMigrateIsReversible(t *testing.T) { lg := zap.NewNop() dataPath := setupBackendData(t, tc.initialVersion, nil) - be := backend.NewDefaultBackend(lg, dataPath) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath}) defer be.Close() tx := be.BatchTx() tx.Lock() diff --git a/server/storage/schema/version_test.go b/server/storage/schema/version_test.go index 576f063df777..59ccd774dc4a 100644 --- a/server/storage/schema/version_test.go +++ b/server/storage/schema/version_test.go @@ -69,7 +69,7 @@ func TestVersion(t *testing.T) { be.ForceCommit() be.Close() - b := backend.NewDefaultBackend(lg, tmpPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath}) defer b.Close() v := UnsafeReadStorageVersion(b.BatchTx()) diff --git a/server/verify/verify.go b/server/verify/verify.go index 6aaf1422aee5..036a7c647396 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -74,8 +74,7 @@ func Verify(cfg Config) error { lg.Info("verification of persisted state successful", zap.String("data-dir", cfg.DataDir)) } }() - - be := backend.NewDefaultBackend(lg, datadir.ToBackendFileName(cfg.DataDir)) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: datadir.ToBackendFileName(cfg.DataDir)}) defer be.Close() snapshot, hardstate, err := validateWal(cfg) diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index a58d69a24c74..35d0b95fa94d 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -166,7 +166,7 @@ func TestEtctlutlMigrate(t *testing.T) { } t.Log("etcdutl migrate...") - be := backend.NewDefaultBackend(lg, filepath.Join(memberDataDir, "member/snap/db")) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: filepath.Join(memberDataDir, "member/snap/db")}) defer be.Close() ver := schema.ReadStorageVersion(be.ReadTx()) diff --git a/tests/integration/clientv3/maintenance_test.go b/tests/integration/clientv3/maintenance_test.go index 8aa275bb90c4..c99f7846eb4d 100644 --- a/tests/integration/clientv3/maintenance_test.go +++ b/tests/integration/clientv3/maintenance_test.go @@ -277,7 +277,7 @@ func testMaintenanceSnapshotErrorInflight(t *testing.T, snapshot func(context.Co // take about 1-second to read snapshot clus.Members[0].Stop(t) dpath := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") - b := backend.NewDefaultBackend(lg, dpath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dpath}) s := mvcc.NewStore(lg, b, &lease.FakeLessor{}, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) rev := 100000 for i := 2; i <= rev; i++ { diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index 50a701e68095..8d9cd2b9d78b 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -194,7 +194,7 @@ func TestV3CorruptAlarm(t *testing.T) { // Corrupt member 0 by modifying backend offline. clus.Members[0].Stop(t) fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") - be := backend.NewDefaultBackend(lg, fp) + be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: fp}) s := mvcc.NewStore(lg, be, nil, mvcc.StoreConfig{}) // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. s.Put([]byte("abc"), []byte("def"), 0) diff --git a/tools/etcd-dump-db/backend.go b/tools/etcd-dump-db/backend.go index 5c9df5e42e05..37aa5b9125eb 100644 --- a/tools/etcd-dump-db/backend.go +++ b/tools/etcd-dump-db/backend.go @@ -179,7 +179,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) } func getHash(dbPath string) (hash uint32, err error) { - b := backend.NewDefaultBackend(zap.NewNop(), dbPath) + b := backend.NewDefaultBackend(backend.BackendConfig{Logger: zap.NewNop(), Path: dbPath}) return b.Hash(schema.DefaultIgnores) }