Skip to content

Commit

Permalink
chore: backport Add hooks to allow app modules to add things to state…
Browse files Browse the repository at this point in the history
…-sync cosmos#10961 (#237)

* backport: Add hooks to allow app modules to add things to state-sync (backport cosmos#10961)

* remove TestTraceConcurrency
  • Loading branch information
p0mvn committed May 17, 2022
1 parent 0041954 commit 949254a
Show file tree
Hide file tree
Showing 17 changed files with 2,320 additions and 1,549 deletions.
16 changes: 16 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,22 @@ func DefaultStoreLoader(ms sdk.CommitMultiStore) error {
return ms.LoadLatestVersion()
}

// CommitMultiStore returns the root multi-store.
// App constructor can use this to access the `cms`.
// UNSAFE: only safe to use during app initialization.
func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore {
if app.sealed {
panic("cannot call CommitMultiStore() after baseapp is sealed")
}
return app.cms
}

// SnapshotManager returns the snapshot manager.
// application use this to register extra extension snapshotters.
func (app *BaseApp) SnapshotManager() *snapshots.Manager {
return app.snapshotManager
}

// LoadVersion loads the BaseApp application version. It will panic if called
// more than once on a running baseapp.
func (app *BaseApp) LoadVersion(version int64) error {
Expand Down
39 changes: 38 additions & 1 deletion proto/cosmos/base/snapshots/v1beta1/snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,41 @@ message Snapshot {
// Metadata contains SDK-specific snapshot metadata.
message Metadata {
repeated bytes chunk_hashes = 1; // SHA-256 chunk hashes
}
}

// SnapshotItem is an item contained in a rootmulti.Store snapshot.
message SnapshotItem {
// item is the specific type of snapshot item.
oneof item {
SnapshotStoreItem store = 1;
SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"];
SnapshotExtensionMeta extension = 3;
SnapshotExtensionPayload extension_payload = 4;
}
}

// SnapshotStoreItem contains metadata about a snapshotted store.
message SnapshotStoreItem {
string name = 1;
}

// SnapshotIAVLItem is an exported IAVL node.
message SnapshotIAVLItem {
bytes key = 1;
bytes value = 2;
// version is block height
int64 version = 3;
// height is depth of the tree.
int32 height = 4;
}

// SnapshotExtensionMeta contains metadata about an external snapshotter.
message SnapshotExtensionMeta {
string name = 1;
uint32 format = 2;
}

// SnapshotExtensionPayload contains payloads of an external snapshotter.
message SnapshotExtensionPayload {
bytes payload = 1;
}
28 changes: 0 additions & 28 deletions proto/cosmos/base/store/v1beta1/snapshot.proto

This file was deleted.

8 changes: 5 additions & 3 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package mock
import (
"io"

protoio "github.com/gogo/protobuf/io"
dbm "github.com/tendermint/tm-db"

pruningtypes "github.com/cosmos/cosmos-sdk/pruning/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
store "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand Down Expand Up @@ -131,13 +133,13 @@ func (ms multiStore) SetSnapshotInterval(snapshotInterval uint64) {
panic("not implemented")
}

func (ms multiStore) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (ms multiStore) Snapshot(height uint64, protoWriter protoio.Writer) error {
panic("not implemented")
}

func (ms multiStore) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

Expand Down
File renamed without changes.
File renamed without changes.
104 changes: 74 additions & 30 deletions snapshots/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
package snapshots_test

import (
"bufio"
"bytes"
"compress/zlib"
"crypto/sha256"
"errors"
"io"
"io/ioutil"
"testing"
"time"

protoio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
db "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/snapshots/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
snaphotsTestUtil "github.com/cosmos/cosmos-sdk/testutil/snapshots"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func checksums(slice [][]byte) [][]byte {
Expand Down Expand Up @@ -58,34 +63,79 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
return bodies
}

// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)

ch := make(chan io.ReadCloser)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
types.WriteExtensionItem(protoWriter, item)
}
protoWriter.Close()
zWriter.Close()
bufWriter.Flush()
chunkWriter.Close()
}()

var chunks [][]byte
for chunkBody := range ch {
chunk, err := io.ReadAll(chunkBody)
if err != nil {
panic(err)
}
chunks = append(chunks, chunk)
}
return chunks
}

type mockSnapshotter struct {
chunks [][]byte
prunedHeights map[int64]struct{}
snapshotInterval uint64
items [][]byte
}

func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
if format == 0 {
return types.ErrUnknownFormat
return snapshottypes.SnapshotItem{}, types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
if m.items != nil {
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}
if ready != nil {
close(ready)

m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
m.items = append(m.items, payload.Payload)
}

m.chunks = [][]byte{}
for reader := range chunks {
chunk, err := ioutil.ReadAll(reader)
if err != nil {
return snapshottypes.SnapshotItem{}, nil
}

func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := types.WriteExtensionItem(protoWriter, item); err != nil {
return err
}
m.chunks = append(m.chunks, chunk)
}

return nil
}

Expand All @@ -101,16 +151,12 @@ func (m *mockSnapshotter) SetSnapshotInterval(snapshotInterval uint64) {
m.snapshotInterval = snapshotInterval
}

func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format == 0 {
return nil, types.ErrUnknownFormat
}
ch := make(chan io.ReadCloser, len(m.chunks))
for _, chunk := range m.chunks {
ch <- ioutil.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch, nil
func (m *mockSnapshotter) SnapshotFormat() uint32 {
return 1
}

func (m *mockSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}

// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
Expand Down Expand Up @@ -155,11 +201,9 @@ func (m *hungSnapshotter) Close() {
close(m.ch)
}

func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
<-m.ch
ch := make(chan io.ReadCloser, 1)
ch <- ioutil.NopCloser(bytes.NewReader([]byte{}))
return ch, nil
return nil
}

func (m *hungSnapshotter) PruneSnapshotHeight(height int64) {
Expand All @@ -171,7 +215,7 @@ func (m *hungSnapshotter) SetSnapshotInterval(snapshotInterval uint64) {
}

func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}
Loading

0 comments on commit 949254a

Please sign in to comment.