From c7c9a0daa83506879b02d696cca899a355f2c57d Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 8 May 2023 17:01:06 +0800 Subject: [PATCH 01/11] feat: support saving restoring snapshot locally use cases: - help to serve the chunks to other nodes on p2p network. - do local restoration later without re-downloading the snapshot. - copy the downloaded snapshot to other places for local restoration. --- store/CHANGELOG.md | 3 +- store/snapshots/manager.go | 81 +++++++++++++++++++++++++++++--------- store/snapshots/store.go | 6 +++ 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/store/CHANGELOG.md b/store/CHANGELOG.md index 4bf90b279233..9511db293d04 100644 --- a/store/CHANGELOG.md +++ b/store/CHANGELOG.md @@ -30,10 +30,11 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#15712](https://github.com/cosmos/cosmos-sdk/pull/15712) Add `WorkingHash` function to the store interface to get the current app hash before commit. * [#14645](https://github.com/cosmos/cosmos-sdk/pull/14645) Add limit to the length of key and value. * [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has. +* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) feat: support saving restoring snapshot locally ## [v0.1.0-alpha.1](https://github.com/cosmos/cosmos-sdk/releases/tag/store%2Fv0.1.0-alpha.1) - 2023-03-17 ### Features * [#14746](https://github.com/cosmos/cosmos-sdk/pull/14746) The `store` module is extracted to have a separate go.mod file which allows it be a standalone module. -* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height. \ No newline at end of file +* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height. diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 90e980d4d016..b70bc00777d0 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math" + "os" "sort" "sync" @@ -39,12 +40,12 @@ type Manager struct { multistore types.Snapshotter logger log.Logger - mtx sync.Mutex - operation operation - chRestore chan<- io.ReadCloser - chRestoreDone <-chan restoreDone - restoreChunkHashes [][]byte - restoreChunkIndex uint32 + mtx sync.Mutex + operation operation + chRestore chan<- uint32 + chRestoreDone <-chan restoreDone + restoreSnapshot *types.Snapshot + restoreChunkIndex uint32 } // operation represents a Manager operation. Only one operation can be in progress at a time. @@ -62,7 +63,8 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 + // the channel is lightweight(`uint32`), so we can afford to have a big buffer + chunkBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -135,7 +137,7 @@ func (m *Manager) endLocked() { m.chRestore = nil } m.chRestoreDone = nil - m.restoreChunkHashes = nil + m.restoreSnapshot = nil m.restoreChunkIndex = 0 } @@ -291,11 +293,18 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } // Start an asynchronous snapshot restoration, passing chunks and completion status via channels. - chChunks := make(chan io.ReadCloser, chunkBufferSize) + chChunkIDs := make(chan uint32, chunkBufferSize) chDone := make(chan restoreDone, 1) + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o755); err != nil { + return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs) + go func() { - err := m.restoreSnapshot(snapshot, chChunks) + err := m.doRestoreSnapshot(snapshot, chChunks) chDone <- restoreDone{ complete: err == nil, err: err, @@ -303,17 +312,39 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { close(chDone) }() - m.chRestore = chChunks + m.chRestore = chChunkIDs m.chRestoreDone = chDone - m.restoreChunkHashes = snapshot.Metadata.ChunkHashes + m.restoreSnapshot = &snapshot m.restoreChunkIndex = 0 return nil } -// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. -func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { - var nextItem types.SnapshotItem +func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser { + chunks := make(chan io.ReadCloser) + go func() { + defer close(chunks) + + for chunkID := range chunkIDs { + chunk, err := m.store.loadChunkFile(height, format, chunkID) + if err != nil { + m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err) + break + } + chunks <- chunk + } + }() + + return chunks +} +// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. +func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o755); err != nil { + return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + var nextItem types.SnapshotItem streamReader, err := NewStreamReader(chChunks) if err != nil { return err @@ -375,7 +406,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { return false, errorsmod.Wrap(storetypes.ErrLogic, "no restore operation in progress") } - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { return false, errorsmod.Wrap(storetypes.ErrLogic, "received unexpected chunk") } @@ -392,19 +423,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { // Verify the chunk hash. hash := sha256.Sum256(chunk) - expected := m.restoreChunkHashes[m.restoreChunkIndex] + expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex] if !bytes.Equal(hash[:], expected) { return false, errorsmod.Wrapf(types.ErrChunkHashMismatch, "expected %x, got %x", hash, expected) } + if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil { + return false, errorsmod.Wrapf(err, "save chunk content %d", m.restoreChunkIndex) + } + // Pass the chunk to the restore, and wait for completion if it was the final one. - m.chRestore <- io.NopCloser(bytes.NewReader(chunk)) + m.chRestore <- m.restoreChunkIndex m.restoreChunkIndex++ - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { close(m.chRestore) m.chRestore = nil + + // the chunks are all written into files, we can save the snapshot to the db, + // even if the restoration may not completed yet. + if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil { + return false, errorsmod.Wrap(err, "save restoring snapshot") + } + done := <-m.chRestoreDone m.endLocked() if done.err != nil { @@ -413,6 +455,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { if !done.complete { return false, errorsmod.Wrap(storetypes.ErrLogic, "restore ended prematurely") } + return true, nil } return false, nil diff --git a/store/snapshots/store.go b/store/snapshots/store.go index 91629eaaa148..d249259c4fd2 100644 --- a/store/snapshots/store.go +++ b/store/snapshots/store.go @@ -315,6 +315,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types return nil } +// saveChunkContent save the chunk to disk +func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error { + path := s.pathChunk(snapshot.Height, snapshot.Format, index) + return os.WriteFile(path, chunk, 0o644) +} + // saveSnapshot saves snapshot metadata to the database. func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { value, err := proto.Marshal(snapshot) From 6e91c7897891a822b12e26ba611814648e6f21a8 Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 9 May 2023 09:51:01 +0800 Subject: [PATCH 02/11] Update store/snapshots/store.go --- store/snapshots/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/store.go b/store/snapshots/store.go index d249259c4fd2..59089a1d810c 100644 --- a/store/snapshots/store.go +++ b/store/snapshots/store.go @@ -318,7 +318,7 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types // saveChunkContent save the chunk to disk func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error { path := s.pathChunk(snapshot.Height, snapshot.Format, index) - return os.WriteFile(path, chunk, 0o644) + return os.WriteFile(path, chunk, 0o600) } // saveSnapshot saves snapshot metadata to the database. From e1c44dd7920e5c3d602513d0a30bf31f2a1eada3 Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 9 May 2023 10:10:21 +0800 Subject: [PATCH 03/11] Update store/snapshots/manager.go --- store/snapshots/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index b70bc00777d0..f8e9444faf74 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -320,7 +320,7 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser { - chunks := make(chan io.ReadCloser) + chunks := make(chan io.ReadCloser, 4) go func() { defer close(chunks) From 3ea6c6fd3c67e7190ca20ee5e7414f8750aec342 Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 9 May 2023 10:11:47 +0800 Subject: [PATCH 04/11] Apply suggestions from code review --- store/snapshots/manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index f8e9444faf74..6f7166a67c7a 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -63,8 +63,8 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - // the channel is lightweight(`uint32`), so we can afford to have a big buffer - chunkBufferSize = 1024 + chunkBufferSize = 4 + chunkIDBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -293,7 +293,7 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } // Start an asynchronous snapshot restoration, passing chunks and completion status via channels. - chChunkIDs := make(chan uint32, chunkBufferSize) + chChunkIDs := make(chan uint32, chunkIDBufferSize) chDone := make(chan restoreDone, 1) dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) @@ -320,7 +320,7 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser { - chunks := make(chan io.ReadCloser, 4) + chunks := make(chan io.ReadCloser, chunkBufferSize) go func() { defer close(chunks) From 4c2f1cb5fb460b0f47621f3c950a87b6e9404dad Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 9 May 2023 10:18:33 +0800 Subject: [PATCH 05/11] Update store/snapshots/manager.go --- store/snapshots/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 6f7166a67c7a..08f207bb7d73 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -340,7 +340,7 @@ func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan // doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) - if err := os.MkdirAll(dir, 0o755); err != nil { + if err := os.MkdirAll(dir, 0o750); err != nil { return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir) } From 55d27c68966da297f42b2b790db9c93c957e9827 Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 9 May 2023 10:18:54 +0800 Subject: [PATCH 06/11] Update store/snapshots/manager.go --- store/snapshots/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 08f207bb7d73..443e30c266fa 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -297,7 +297,7 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { chDone := make(chan restoreDone, 1) dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) - if err := os.MkdirAll(dir, 0o755); err != nil { + if err := os.MkdirAll(dir, 0o750); err != nil { return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir) } From 4a7b0892b9172e803d9d6d079e2e8773122d3945 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 9 May 2023 17:34:39 +0800 Subject: [PATCH 07/11] fix lint --- store/snapshots/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 443e30c266fa..d3b6a7889eb5 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -63,7 +63,7 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 + chunkBufferSize = 4 chunkIDBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit From f10c7b6cd4771d627056d8c50520b23c596e3cdd Mon Sep 17 00:00:00 2001 From: yihuang Date: Fri, 12 May 2023 11:49:47 +0800 Subject: [PATCH 08/11] Update store/CHANGELOG.md Co-authored-by: Aleksandr Bezobchuk --- store/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/CHANGELOG.md b/store/CHANGELOG.md index 9511db293d04..c5d108783cab 100644 --- a/store/CHANGELOG.md +++ b/store/CHANGELOG.md @@ -30,7 +30,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#15712](https://github.com/cosmos/cosmos-sdk/pull/15712) Add `WorkingHash` function to the store interface to get the current app hash before commit. * [#14645](https://github.com/cosmos/cosmos-sdk/pull/14645) Add limit to the length of key and value. * [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has. -* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) feat: support saving restoring snapshot locally +* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally. ## [v0.1.0-alpha.1](https://github.com/cosmos/cosmos-sdk/releases/tag/store%2Fv0.1.0-alpha.1) - 2023-03-17 From 106c137e0ac5d9f4477217c1501e27fe3110e7b5 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 12 May 2023 15:01:48 +0800 Subject: [PATCH 09/11] add assert in unit test --- store/snapshots/manager_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index a3e9e35957b3..d18696d12855 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -213,6 +213,13 @@ func TestManager_Restore(t *testing.T) { assert.Equal(t, expectItems, target.items) assert.Equal(t, 10, len(extSnapshotter.state)) + // The snapshot is saved in local snapshot store + snapshots, err := store.List() + require.NoError(t, err) + snapshot := snapshots[0] + require.Equal(t, uint64(3), snapshot.Height) + require.Equal(t, types.CurrentFormat, snapshot.Format) + // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ Height: 3, From cda3f5e385c97dbb167d439576ca15f2f61d53c2 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 15 May 2023 22:07:10 +0800 Subject: [PATCH 10/11] fix build error --- store/snapshots/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 7551439923f2..57e6a8f18af5 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -481,7 +481,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error { } defer m.endLocked() - return m.restoreSnapshot(*snapshot, ch) + return m.doRestoreSnapshot(*snapshot, ch) } // sortedExtensionNames sort extension names for deterministic iteration. From fb143fa918087fa53cb9fbd90c53d94914e2dc6b Mon Sep 17 00:00:00 2001 From: HuangYi Date: Mon, 15 May 2023 22:09:00 +0800 Subject: [PATCH 11/11] fix build --- store/snapshots/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/snapshots/store.go b/store/snapshots/store.go index db4f6ee7c48e..2f08a6e6c463 100644 --- a/store/snapshots/store.go +++ b/store/snapshots/store.go @@ -317,7 +317,7 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types // saveChunkContent save the chunk to disk func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error { - path := s.pathChunk(snapshot.Height, snapshot.Format, index) + path := s.PathChunk(snapshot.Height, snapshot.Format, index) return os.WriteFile(path, chunk, 0o600) }