Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Add hooks to allow app modules to add things to state-sync #10961

Merged
merged 13 commits into from
Feb 24, 2022
3,126 changes: 3,098 additions & 28 deletions api/cosmos/base/snapshots/v1beta1/snapshot.pulsar.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,19 @@ func DefaultStoreLoader(ms sdk.CommitMultiStore) error {
return ms.LoadLatestVersion()
}

// CommitMultiStore returns the root multi-store.
// App constructor can use this to access the `cms`.
// UNSAFE: only safe to use during app initialization.
func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore {
Copy link
Member

@tac0turtle tac0turtle Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe prefix comment with UNSAFE to users know not to use this for just anything.

Im happy this is added, I was working on something that needs this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we bundle store into SnapshotManager? I'm not sure if we should export this function.

Copy link
Collaborator Author

@yihuang yihuang Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for supporting code like this, and not all extension snapshotter need the cms, I don't see a more elegant way to support this:

func NewApp() *App {
  ...
  app := baseapp.NewBaseApp(...)
  app.SnapshotManager().RegisterExtension(
    NewWasmSnapshotter(app.CommitMultiStore(), wasmStoreKey),
  )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a function like this useful for debugging app hash non-determinism sources.

I think it should be there but needs a more extensive comment.

Copy link
Collaborator Author

@yihuang yihuang Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think if we add:

// UNSAFE: only safe to use during app initialization.
func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore {
	if app.sealed {
		panic("cannot call CommitMultiStore() after baseapp is sealed")
	}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought I have already started to use this for another feature and bez added it in on of his prs. Lets leave it as is.

return app.cms
}

// SnapshotManager returns the snapshot manager.
// application use this to register extra extension snapshotters.
func (app *BaseApp) SnapshotManager() *snapshots.Manager {
return app.snapshotManager
}

// LoadVersion loads the BaseApp application version. It will panic if called
// more than once on a running baseapp.
func (app *BaseApp) LoadVersion(version int64) error {
Expand Down
2 changes: 1 addition & 1 deletion baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (app *BaseApp) SetSnapshotStore(snapshotStore *snapshots.Store) {
app.snapshotManager = nil
return
}
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms)
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms, nil)
}

// SetSnapshotInterval sets the snapshot interval.
Expand Down
37 changes: 36 additions & 1 deletion proto/cosmos/base/snapshots/v1beta1/snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,39 @@ message Snapshot {
// Metadata contains SDK-specific snapshot metadata.
message Metadata {
repeated bytes chunk_hashes = 1; // SHA-256 chunk hashes
}
}

// SnapshotItem is an item contained in a rootmulti.Store snapshot.
message SnapshotItem {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's moved to snapshot package to avoid cyclic import.

// item is the specific type of snapshot item.
oneof item {
SnapshotStoreItem store = 1;
SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"];
SnapshotExtensionMeta extension = 3;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to send SnapshotExtensionMeta as an item, or it can go to Snapshot - we have already Metadata there so it make sense to put it to Snapshot directly or reuse Metadata (latter preferable)

Copy link
Collaborator Author

@yihuang yihuang Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnapshotExtensionMeta also acts as a delimiter in the stream, the restoration process uses it to separate multiple snapshots.

SnapshotExtensionPayload extension_payload = 4;
}
}

// SnapshotStoreItem contains metadata about a snapshotted store.
message SnapshotStoreItem {
string name = 1;
}

// SnapshotIAVLItem is an exported IAVL node.
message SnapshotIAVLItem {
bytes key = 1;
bytes value = 2;
int64 version = 3;
int32 height = 4;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need height here? It is already in Snapshot message -- Snapshot.height == Item.version?

Copy link
Collaborator Author

@yihuang yihuang Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this height is for the exported entries of iavl tree, where it could store states of multiple versions.

EDIT: I think height is for some internal structure of iavl tree. version is the block height, there could be multiple versions in same tree.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's add a comment please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, I just copied the comments from #10794 shamelessly.

}

// SnapshotExtensionMeta contains metadata about an external snapshotter.
message SnapshotExtensionMeta {
string name = 1;
uint32 format = 2;
}

// SnapshotExtensionPayload contains payloads of an external snapshotter.
message SnapshotExtensionPayload {
bytes payload = 1;
}
28 changes: 0 additions & 28 deletions proto/cosmos/base/store/v1beta1/snapshot.proto

This file was deleted.

8 changes: 5 additions & 3 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package mock
import (
"io"

protoio "github.com/gogo/protobuf/io"
dbm "github.com/tendermint/tm-db"

snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand Down Expand Up @@ -122,13 +124,13 @@ func (ms multiStore) SetInitialVersion(version int64) error {
panic("not implemented")
}

func (ms multiStore) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (ms multiStore) Snapshot(height uint64, protoWriter protoio.Writer) error {
panic("not implemented")
}

func (ms multiStore) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

Expand Down
File renamed without changes.
File renamed without changes.
105 changes: 74 additions & 31 deletions snapshots/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package snapshots_test

import (
"bufio"
"bytes"
"compress/zlib"
"crypto/sha256"
"errors"
"io"
"os"
"testing"
"time"

protoio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/snapshots/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func checksums(slice [][]byte) [][]byte {
Expand Down Expand Up @@ -56,45 +61,85 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
return bodies
}

// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)

ch := make(chan io.ReadCloser)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
types.WriteExtensionItem(protoWriter, item)
}
protoWriter.Close()
zWriter.Close()
bufWriter.Flush()
chunkWriter.Close()
}()

var chunks [][]byte
for chunkBody := range ch {
chunk, err := io.ReadAll(chunkBody)
if err != nil {
panic(err)
}
chunks = append(chunks, chunk)
}
return chunks
}

type mockSnapshotter struct {
chunks [][]byte
items [][]byte
}

func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
if format == 0 {
return types.ErrUnknownFormat
return snapshottypes.SnapshotItem{}, types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
if m.items != nil {
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}
if ready != nil {
close(ready)

m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
m.items = append(m.items, payload.Payload)
}

m.chunks = [][]byte{}
for reader := range chunks {
chunk, err := io.ReadAll(reader)
if err != nil {
return snapshottypes.SnapshotItem{}, nil
}

func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := types.WriteExtensionItem(protoWriter, item); err != nil {
return err
}
m.chunks = append(m.chunks, chunk)
}

return nil
}

func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format == 0 {
return nil, types.ErrUnknownFormat
}
ch := make(chan io.ReadCloser, len(m.chunks))
for _, chunk := range m.chunks {
ch <- io.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch, nil
func (m *mockSnapshotter) SnapshotFormat() uint32 {
return 1
}
func (m *mockSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}

// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
Expand All @@ -110,7 +155,7 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
store, err := snapshots.NewStore(db.NewMemDB(), tempdir)
require.NoError(t, err)
hung := newHungSnapshotter()
mgr := snapshots.NewManager(store, hung)
mgr := snapshots.NewManager(store, hung, nil)

go func() {
_, err := mgr.Create(1)
Expand All @@ -137,15 +182,13 @@ func (m *hungSnapshotter) Close() {
close(m.ch)
}

func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
<-m.ch
ch := make(chan io.ReadCloser, 1)
ch <- io.NopCloser(bytes.NewReader([]byte{}))
return ch, nil
return nil
}

func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}
Loading