diff --git a/CHANGELOG.md b/CHANGELOG.md index ed426c69a3ce..ba5eb0c7507a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (gov) [#15979](https://github.com/cosmos/cosmos-sdk/pull/15979) Improve gov error message when failing to convert v1 proposal to v1beta1. * (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands. * (server) [#16061](https://github.com/cosmos/cosmos-sdk/pull/16061) Add Comet bootstrap command. +* (snapshots) [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally. * (x/staking) [#16068](https://github.com/cosmos/cosmos-sdk/pull/16068) Update simulation to allow non-EOA accounts to stake. * (server) [#16142](https://github.com/cosmos/cosmos-sdk/pull/16142) Remove JSON Indentation from the GRPC to REST gateway's responses. (Saving bandwidth) * (types) [#16145](https://github.com/cosmos/cosmos-sdk/pull/16145) Rename interface `ExtensionOptionI` back to `TxExtensionOptionI` to avoid breaking change. diff --git a/snapshots/manager.go b/snapshots/manager.go index 22e453346af4..ca57eab91429 100644 --- a/snapshots/manager.go +++ b/snapshots/manager.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math" + "os" "sort" "sync" @@ -38,12 +39,12 @@ type Manager struct { multistore types.Snapshotter logger log.Logger - mtx sync.Mutex - operation operation - chRestore chan<- io.ReadCloser - chRestoreDone <-chan restoreDone - restoreChunkHashes [][]byte - restoreChunkIndex uint32 + mtx sync.Mutex + operation operation + chRestore chan<- uint32 + chRestoreDone <-chan restoreDone + restoreSnapshot *types.Snapshot + restoreChunkIndex uint32 } // operation represents a Manager operation. Only one operation can be in progress at a time. @@ -61,7 +62,8 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 + chunkBufferSize = 4 + chunkIDBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -134,7 +136,7 @@ func (m *Manager) endLocked() { m.chRestore = nil } m.chRestoreDone = nil - m.restoreChunkHashes = nil + m.restoreSnapshot = nil m.restoreChunkIndex = 0 } @@ -290,11 +292,18 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } // Start an asynchronous snapshot restoration, passing chunks and completion status via channels. - chChunks := make(chan io.ReadCloser, chunkBufferSize) + chChunkIDs := make(chan uint32, chunkIDBufferSize) chDone := make(chan restoreDone, 1) + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o750); err != nil { + return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs) + go func() { - err := m.restoreSnapshot(snapshot, chChunks) + err := m.doRestoreSnapshot(snapshot, chChunks) chDone <- restoreDone{ complete: err == nil, err: err, @@ -302,17 +311,39 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { close(chDone) }() - m.chRestore = chChunks + m.chRestore = chChunkIDs m.chRestoreDone = chDone - m.restoreChunkHashes = snapshot.Metadata.ChunkHashes + m.restoreSnapshot = &snapshot m.restoreChunkIndex = 0 return nil } -// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. -func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { - var nextItem types.SnapshotItem +func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser { + chunks := make(chan io.ReadCloser, chunkBufferSize) + go func() { + defer close(chunks) + + for chunkID := range chunkIDs { + chunk, err := m.store.loadChunkFile(height, format, chunkID) + if err != nil { + m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err) + break + } + chunks <- chunk + } + }() + + return chunks +} +// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. +func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o750); err != nil { + return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + var nextItem types.SnapshotItem streamReader, err := NewStreamReader(chChunks) if err != nil { return err @@ -374,7 +405,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress") } - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk") } @@ -391,19 +422,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { // Verify the chunk hash. hash := sha256.Sum256(chunk) - expected := m.restoreChunkHashes[m.restoreChunkIndex] + expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex] if !bytes.Equal(hash[:], expected) { return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch, "expected %x, got %x", hash, expected) } + if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil { + return false, sdkerrors.Wrapf(err, "save chunk content %d", m.restoreChunkIndex) + } + // Pass the chunk to the restore, and wait for completion if it was the final one. - m.chRestore <- io.NopCloser(bytes.NewReader(chunk)) + m.chRestore <- m.restoreChunkIndex m.restoreChunkIndex++ - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { close(m.chRestore) m.chRestore = nil + + // the chunks are all written into files, we can save the snapshot to the db, + // even if the restoration may not completed yet. + if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil { + return false, sdkerrors.Wrap(err, "save restoring snapshot") + } + done := <-m.chRestoreDone m.endLocked() if done.err != nil { @@ -412,6 +454,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { if !done.complete { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely") } + return true, nil } return false, nil @@ -437,7 +480,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error { } defer m.endLocked() - return m.restoreSnapshot(*snapshot, ch) + return m.doRestoreSnapshot(*snapshot, ch) } // sortedExtensionNames sort extension names for deterministic iteration. diff --git a/snapshots/manager_test.go b/snapshots/manager_test.go index a5343f759fcd..9b60691508a1 100644 --- a/snapshots/manager_test.go +++ b/snapshots/manager_test.go @@ -213,6 +213,13 @@ func TestManager_Restore(t *testing.T) { assert.Equal(t, expectItems, target.items) assert.Equal(t, 10, len(extSnapshotter.state)) + // The snapshot is saved in local snapshot store + snapshots, err := store.List() + require.NoError(t, err) + snapshot := snapshots[0] + require.Equal(t, uint64(3), snapshot.Height) + require.Equal(t, types.CurrentFormat, snapshot.Format) + // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ Height: 3, diff --git a/snapshots/store.go b/snapshots/store.go index 0c5b295a14e1..1087c826fab2 100644 --- a/snapshots/store.go +++ b/snapshots/store.go @@ -307,6 +307,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types return nil } +// saveChunkContent save the chunk to disk +func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error { + path := s.PathChunk(snapshot.Height, snapshot.Format, index) + return os.WriteFile(path, chunk, 0o600) +} + // saveSnapshot saves snapshot metadata to the database. func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { value, err := proto.Marshal(snapshot)