Skip to content

Commit

Permalink
feat: support saving restoring snapshot locally (backport: cosmos#16060)
Browse files Browse the repository at this point in the history
use cases:
- help to serve the chunks to other nodes on p2p network.
- do local restoration later without re-downloading the snapshot.
- copy the downloaded snapshot to other places for local restoration.
  • Loading branch information
yihuang committed May 9, 2023
1 parent 5e199ca commit b63095f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (store) [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has.
* (simapp) [#15903](https://github.com/cosmos/cosmos-sdk/pull/15903) Add `AppStateFnWithExtendedCbs` with moduleStateCb callback function to allow access moduleState. Note, this function is present in simtestutil from `v0.47.2+`.
* (gov) [#15979](https://github.com/cosmos/cosmos-sdk/pull/15979) Improve gov error message when failing to convert v1 proposal to v1beta1.
* (store) [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) support saving restoring snapshot locally

### Bug Fixes

Expand Down
74 changes: 56 additions & 18 deletions snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math"
"os"
"sort"
"sync"

Expand Down Expand Up @@ -38,12 +39,12 @@ type Manager struct {
multistore types.Snapshotter
logger log.Logger

mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
mtx sync.Mutex
operation operation
chRestore chan<- uint32
chRestoreDone <-chan restoreDone
restoreSnapshot *types.Snapshot
restoreChunkIndex uint32
}

// operation represents a Manager operation. Only one operation can be in progress at a time.
Expand All @@ -61,7 +62,7 @@ const (
opPrune operation = "prune"
opRestore operation = "restore"

chunkBufferSize = 4
chunkBufferSize = 1024

snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
Expand Down Expand Up @@ -131,7 +132,7 @@ func (m *Manager) endLocked() {
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreSnapshot = nil
m.restoreChunkIndex = 0
}

Expand Down Expand Up @@ -284,27 +285,52 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
}

// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chChunkIDs := make(chan uint32, chunkBufferSize)
chDone := make(chan restoreDone, 1)

dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o755); err != nil {
return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir)
}

chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs)

go func() {
err := m.restoreSnapshot(snapshot, chChunks)
err := m.doRestoreSnapshot(snapshot, chChunks)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()

m.chRestore = chChunks
m.chRestore = chChunkIDs
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreSnapshot = &snapshot
m.restoreChunkIndex = 0
return nil
}

// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser {
chunks := make(chan io.ReadCloser)
go func() {
defer close(chunks)

for chunkID := range chunkIDs {
chunk, err := m.store.loadChunkFile(height, format, chunkID)
if err != nil {
m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err)
break
}
chunks <- chunk
}
}()

return chunks
}

// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
Expand Down Expand Up @@ -348,7 +374,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress")
}

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk")
}

Expand All @@ -365,19 +391,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {

// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch,
"expected %x, got %x", hash, expected)
}

if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil {
return false, sdkerrors.Wrapf(err, "save chunk content %d", m.restoreChunkIndex)
}

// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
m.chRestore <- m.restoreChunkIndex
m.restoreChunkIndex++

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
close(m.chRestore)
m.chRestore = nil

// the chunks are written into files, we can now save the snapshot to the db,
// even if the restoration may not completed yet.
if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil {
return false, sdkerrors.Wrap(err, "save restoring snapshot")
}

done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
Expand All @@ -386,6 +423,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
if !done.complete {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely")
}

return true, nil
}
return false, nil
Expand Down
6 changes: 6 additions & 0 deletions snapshots/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ func (s *Store) Save(
return snapshot, s.saveSnapshot(snapshot)
}

// saveChunkContent save the chunk to disk
func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error {
path := s.pathChunk(snapshot.Height, snapshot.Format, index)
return os.WriteFile(path, chunk, 0o644)
}

// saveSnapshot saves snapshot metadata to the database.
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
value, err := proto.Marshal(snapshot)
Expand Down

0 comments on commit b63095f

Please sign in to comment.