Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save restored snapshot locally #16060

Merged
merged 17 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion store/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#15712](https://github.com/cosmos/cosmos-sdk/pull/15712) Add `WorkingHash` function to the store interface to get the current app hash before commit.
* [#14645](https://github.com/cosmos/cosmos-sdk/pull/14645) Add limit to the length of key and value.
* [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has.
* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally.

## [v0.1.0-alpha.1](https://github.com/cosmos/cosmos-sdk/releases/tag/store%2Fv0.1.0-alpha.1) - 2023-03-17

### Features

* [#14746](https://github.com/cosmos/cosmos-sdk/pull/14746) The `store` module is extracted to have a separate go.mod file which allows it be a standalone module.
* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
83 changes: 63 additions & 20 deletions store/snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math"
"os"
"sort"
"sync"

Expand Down Expand Up @@ -39,12 +40,12 @@ type Manager struct {
multistore types.Snapshotter
logger log.Logger

mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
mtx sync.Mutex
operation operation
chRestore chan<- uint32
chRestoreDone <-chan restoreDone
restoreSnapshot *types.Snapshot
restoreChunkIndex uint32
}

// operation represents a Manager operation. Only one operation can be in progress at a time.
Expand All @@ -62,7 +63,8 @@ const (
opPrune operation = "prune"
opRestore operation = "restore"

chunkBufferSize = 4
chunkBufferSize = 4
chunkIDBufferSize = 1024

snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
Expand Down Expand Up @@ -135,7 +137,7 @@ func (m *Manager) endLocked() {
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreSnapshot = nil
m.restoreChunkIndex = 0
}

Expand Down Expand Up @@ -291,29 +293,58 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
}

// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chChunkIDs := make(chan uint32, chunkIDBufferSize)
chDone := make(chan restoreDone, 1)

dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o750); err != nil {
return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir)
}

chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs)

go func() {
err := m.restoreSnapshot(snapshot, chChunks)
err := m.doRestoreSnapshot(snapshot, chChunks)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()

m.chRestore = chChunks
m.chRestore = chChunkIDs
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreSnapshot = &snapshot
m.restoreChunkIndex = 0
return nil
}

// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
var nextItem types.SnapshotItem
func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser {
chunks := make(chan io.ReadCloser, chunkBufferSize)
go func() {
defer close(chunks)

for chunkID := range chunkIDs {
chunk, err := m.store.loadChunkFile(height, format, chunkID)
if err != nil {
m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err)
break
}
chunks <- chunk
}
}()

return chunks
}

// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o750); err != nil {
return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir)
}

var nextItem types.SnapshotItem
streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
Expand Down Expand Up @@ -375,7 +406,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
return false, errorsmod.Wrap(storetypes.ErrLogic, "no restore operation in progress")
}

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
return false, errorsmod.Wrap(storetypes.ErrLogic, "received unexpected chunk")
}

Expand All @@ -392,19 +423,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {

// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, errorsmod.Wrapf(types.ErrChunkHashMismatch,
"expected %x, got %x", hash, expected)
}

if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil {
return false, errorsmod.Wrapf(err, "save chunk content %d", m.restoreChunkIndex)
}

// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
m.chRestore <- m.restoreChunkIndex
m.restoreChunkIndex++

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
close(m.chRestore)
m.chRestore = nil

// the chunks are all written into files, we can save the snapshot to the db,
// even if the restoration may not completed yet.
if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil {
return false, errorsmod.Wrap(err, "save restoring snapshot")
}

done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
Expand All @@ -413,6 +455,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
if !done.complete {
return false, errorsmod.Wrap(storetypes.ErrLogic, "restore ended prematurely")
}

return true, nil
}
return false, nil
Expand All @@ -438,7 +481,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error {
}
defer m.endLocked()

return m.restoreSnapshot(*snapshot, ch)
return m.doRestoreSnapshot(*snapshot, ch)
}

// sortedExtensionNames sort extension names for deterministic iteration.
Expand Down
7 changes: 7 additions & 0 deletions store/snapshots/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ func TestManager_Restore(t *testing.T) {
assert.Equal(t, expectItems, target.items)
assert.Equal(t, 10, len(extSnapshotter.state))

// The snapshot is saved in local snapshot store
snapshots, err := store.List()
require.NoError(t, err)
snapshot := snapshots[0]
require.Equal(t, uint64(3), snapshot.Height)
require.Equal(t, types.CurrentFormat, snapshot.Format)

// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{
Height: 3,
Expand Down
6 changes: 6 additions & 0 deletions store/snapshots/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types
return nil
}

// saveChunkContent save the chunk to disk
func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error {
path := s.PathChunk(snapshot.Height, snapshot.Format, index)
return os.WriteFile(path, chunk, 0o600)
}

// saveSnapshot saves snapshot metadata to the database.
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
value, err := proto.Marshal(snapshot)
Expand Down