Skip to content

Commit

Permalink
etcdutl: Fix snapshot restore memory alloc issue
Browse files Browse the repository at this point in the history
When running the snapshot command, allow receiving an initial memory map
allocation for the database, avoiding future memory allocation issues.

Co-authored-by: Benjamin Wang <benjamin.wang@broadcom.com>
Signed-off-by: Fatih USTA <fatihusta86@gmail.com>
Signed-off-by: Ivan Valdes <ivan@vald.es>
  • Loading branch information
2 people authored and ivanvc committed Feb 9, 2024
1 parent e5665a7 commit 542c737
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 32 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
c.be = backend.NewDefaultBackend(backend.BackendConfig{Logger: GetLogger(), Path: dbPath})

walPath := datadir.ToWalDir(o.dataDir)
w, err := wal.OpenForRead(c.lg, walPath, walpb.Snapshot{})
Expand Down
6 changes: 5 additions & 1 deletion etcdutl/etcdutl/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
restorePeerURLs string
restoreName string
skipHashCheck bool
initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
markCompacted bool
revisionBump uint64
)
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member")
cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)")
cmd.Flags().Uint64Var(&initialMmapSize, "initial-memory-map-size", initialMmapSize, "Initial memory map allocation size(bytes) for the DB. It uses the default value if not defined or defined to 0")
cmd.Flags().Uint64Var(&revisionBump, "bump-revision", 0, "How much to increase the latest revision after restore")
cmd.Flags().BoolVar(&markCompacted, "mark-compacted", false, "Mark the latest revision after restore as the point of scheduled compaction (required if --bump-revision > 0, disallowed otherwise)")

Expand Down Expand Up @@ -104,7 +106,7 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) {

func snapshotRestoreCommandFunc(_ *cobra.Command, args []string) {
SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir,
restorePeerURLs, restoreName, skipHashCheck, revisionBump, markCompacted, args)
restorePeerURLs, restoreName, skipHashCheck, initialMmapSize, revisionBump, markCompacted, args)
}

func SnapshotRestoreCommandFunc(restoreCluster string,
Expand All @@ -114,6 +116,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string,
restorePeerURLs string,
restoreName string,
skipHashCheck bool,
initialMmapSize uint64,
revisionBump uint64,
markCompacted bool,
args []string) {
Expand Down Expand Up @@ -149,6 +152,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string,
InitialCluster: restoreCluster,
InitialClusterToken: restoreClusterToken,
SkipHashCheck: skipHashCheck,
InitialMmapSize: initialMmapSize,
RevisionBump: revisionBump,
MarkCompacted: markCompacted,
}); err != nil {
Expand Down
20 changes: 14 additions & 6 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type v3Manager struct {
snapDir string
cl *membership.RaftCluster

skipHashCheck bool
skipHashCheck bool
initialMmapSize uint64
}

// hasChecksum returns "true" if the file size "n"
Expand Down Expand Up @@ -204,6 +205,9 @@ type RestoreConfig struct {
// (required if copied from data directory).
SkipHashCheck bool

// InitialMmapSize is the database initial memory map size.
InitialMmapSize uint64

// RevisionBump is the amount to increase the latest revision after restore,
// to allow administrators to trick clients into thinking that revision never decreased.
// If 0, revision bumping is skipped.
Expand Down Expand Up @@ -233,6 +237,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
PeerURLs: pURLs,
InitialPeerURLsMap: ics,
InitialClusterToken: cfg.InitialClusterToken,
InitialMmapSize: cfg.InitialMmapSize,
}
if err = srv.VerifyBootstrap(); err != nil {
return err
Expand Down Expand Up @@ -263,13 +268,15 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
s.walDir = walDir
s.snapDir = filepath.Join(dataDir, "member", "snap")
s.skipHashCheck = cfg.SkipHashCheck
s.initialMmapSize = cfg.InitialMmapSize

s.lg.Info(
"restoring snapshot",
zap.String("path", s.srcDbPath),
zap.String("wal-dir", s.walDir),
zap.String("data-dir", dataDir),
zap.String("snap-dir", s.snapDir),
zap.Uint64("initial-memory-map-size", s.initialMmapSize),
)

if err = s.saveDB(); err != nil {
Expand Down Expand Up @@ -297,6 +304,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
zap.String("wal-dir", s.walDir),
zap.String("data-dir", dataDir),
zap.String("snap-dir", s.snapDir),
zap.Uint64("initial-memory-map-size", s.initialMmapSize),
)

return verify.VerifyIfEnabled(verify.Config{
Expand All @@ -317,7 +325,7 @@ func (s *v3Manager) saveDB() error {
return err
}

be := backend.NewDefaultBackend(s.lg, s.outDbPath())
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize})
defer be.Close()

err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend()
Expand All @@ -331,7 +339,7 @@ func (s *v3Manager) saveDB() error {
// modifyLatestRevision can increase the latest revision by the given amount and sets the scheduled compaction
// to that revision so that the server will consider this revision compacted.
func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath()})
defer func() {
be.ForceCommit()
be.Close()
Expand Down Expand Up @@ -471,8 +479,8 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
return nil, err
}

// add members again to persist them to the backend we create.
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
// add members again to persist them to the store we create.
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize})
defer be.Close()
s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be))
for _, m := range s.cl.Members() {
Expand Down Expand Up @@ -551,7 +559,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
}

func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize})
defer be.Close()

cindex.UpdateConsistentIndexForce(be.BatchTx(), commit, term)
Expand Down
1 change: 1 addition & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type ServerConfig struct {
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
InitialMmapSize uint64
MaxTxnOps uint

// MaxRequestBytes is the maximum request size to send over raft.
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestConsistentIndex(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: zaptest.NewLogger(t), Path: tmpPath})
defer b.Close()
ci.SetBackend(b)
index = ci.ConsistentIndex()
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestConsistentIndexDecrease(t *testing.T) {
be.ForceCommit()
be.Close()

be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
be = backend.NewDefaultBackend(backend.BackendConfig{Logger: zaptest.NewLogger(t), Path: tmpPath})
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
Expand Down
14 changes: 11 additions & 3 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,17 @@ func New(bcfg BackendConfig) Backend {
return newBackend(bcfg)
}

func NewDefaultBackend(lg *zap.Logger, path string) Backend {
bcfg := DefaultBackendConfig(lg)
bcfg.Path = path
func NewDefaultBackend(bc BackendConfig) Backend {

bcfg := DefaultBackendConfig(bc.Logger)
bcfg.Path = bc.Path

if bc.MmapSize > 0 {
bcfg.MmapSize = bc.MmapSize
} else {
bcfg.MmapSize = initialMmapSize
}

return newBackend(bcfg)
}

Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestScheduledCompact(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(lg, tmpPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer b.Close()
v, found := UnsafeReadScheduledCompact(b.BatchTx())
assert.Equal(t, true, found)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestFinishedCompact(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(lg, tmpPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer b.Close()
v, found := UnsafeReadFinishedCompact(b.BatchTx())
assert.Equal(t, true, found)
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/auth_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestGetAllRoles(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetAllRoles()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestGetRole(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetRole("role1")
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestAuthEnabled(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
tx = abe2.BatchTx()
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAuthRevision(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
tx := abe2.BatchTx()
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/auth_users_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestGetAllUsers(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.ReadTx().UnsafeGetAllUsers()
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestGetUser(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetUser("alice")
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestLeaseBackend(t *testing.T) {
be.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(lg, tmpPath)
be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer be2.Close()
leases := MustUnsafeGetAllLeases(be2.ReadTx())

Expand Down
6 changes: 3 additions & 3 deletions server/storage/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestValidate(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.version, tc.overrideKeys)

b := backend.NewDefaultBackend(lg, dataPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath})
defer b.Close()
err := Validate(lg, b.ReadTx())
if (err != nil) != tc.expectError {
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestMigrate(t *testing.T) {
t.Fatal(err)
}

b := backend.NewDefaultBackend(lg, dataPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath})
defer b.Close()

err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion)
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestMigrateIsReversible(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.initialVersion, nil)

be := backend.NewDefaultBackend(lg, dataPath)
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath})
defer be.Close()
tx := be.BatchTx()
tx.Lock()
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestVersion(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(lg, tmpPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
defer b.Close()
v := UnsafeReadStorageVersion(b.BatchTx())

Expand Down
3 changes: 1 addition & 2 deletions server/verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func Verify(cfg Config) error {
lg.Info("verification of persisted state successful", zap.String("data-dir", cfg.DataDir))
}
}()

be := backend.NewDefaultBackend(lg, datadir.ToBackendFileName(cfg.DataDir))
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: datadir.ToBackendFileName(cfg.DataDir)})
defer be.Close()

snapshot, hardstate, err := validateWal(cfg)
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/utl_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestEtctlutlMigrate(t *testing.T) {
}

t.Log("etcdutl migrate...")
be := backend.NewDefaultBackend(lg, filepath.Join(memberDataDir, "member/snap/db"))
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: filepath.Join(memberDataDir, "member/snap/db")})
defer be.Close()

ver := schema.ReadStorageVersion(be.ReadTx())
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/clientv3/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func testMaintenanceSnapshotErrorInflight(t *testing.T, snapshot func(context.Co
// take about 1-second to read snapshot
clus.Members[0].Stop(t)
dpath := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
b := backend.NewDefaultBackend(lg, dpath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dpath})
s := mvcc.NewStore(lg, b, &lease.FakeLessor{}, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
rev := 100000
for i := 2; i <= rev; i++ {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestV3CorruptAlarm(t *testing.T) {
// Corrupt member 0 by modifying backend offline.
clus.Members[0].Stop(t)
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
be := backend.NewDefaultBackend(lg, fp)
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: fp})
s := mvcc.NewStore(lg, be, nil, mvcc.StoreConfig{})
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
s.Put([]byte("abc"), []byte("def"), 0)
Expand Down
2 changes: 1 addition & 1 deletion tools/etcd-dump-db/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error)
}

func getHash(dbPath string) (hash uint32, err error) {
b := backend.NewDefaultBackend(zap.NewNop(), dbPath)
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: zap.NewNop(), Path: dbPath})
return b.Hash(schema.DefaultIgnores)
}

Expand Down

0 comments on commit 542c737

Please sign in to comment.