diff --git a/CHANGELOG.md b/CHANGELOG.md index c18a83af005c..911bf5f5d570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [\#9576](https://github.com/cosmos/cosmos-sdk/pull/9576) Add debug error message to query result when enabled * (types) [\#11200](https://github.com/cosmos/cosmos-sdk/pull/11200) Added `Min()` and `Max()` operations on sdk.Coins. +* [#11267](https://github.com/cosmos/cosmos-sdk/pull/11267) Add hooks to allow app modules to add things to state-sync (backport #10961). ## [v0.45.1](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.45.1) - 2022-02-03 diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index d9981ec5f8ad..06543047e209 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -366,8 +366,11 @@ func DefaultStoreLoader(ms storetypes.CommitMultiStore) error { // CommitMultiStore returns the root multi-store. // App constructor can use this to access the `cms`. -// UNSAFE: must not be used during the abci life cycle. -func (app *BaseApp) CommitMultiStore() storetypes.CommitMultiStore { +// UNSAFE: only safe to use during app initialization. +func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore { + if app.sealed { + panic("cannot call CommitMultiStore() after baseapp is sealed") + } return app.cms } diff --git a/server/mock/store.go b/server/mock/store.go index fa7721f44ea5..4dbb418b623c 100644 --- a/server/mock/store.go +++ b/server/mock/store.go @@ -3,13 +3,12 @@ package mock import ( "io" - protoio "github.com/cosmos/gogoproto/io" + protoio "github.com/gogo/protobuf/io" + dbm "github.com/tendermint/tm-db" - corestore "cosmossdk.io/core/store" - "cosmossdk.io/store/metrics" - pruningtypes "cosmossdk.io/store/pruning/types" - snapshottypes "cosmossdk.io/store/snapshots/types" - storetypes "cosmossdk.io/store/types" + snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + store "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" ) var _ storetypes.MultiStore = multiStore{} @@ -159,10 +158,6 @@ func (ms multiStore) Restore( panic("not implemented") } -func (ms multiStore) RollbackToVersion(version int64) error { - panic("not implemented") -} - func (ms multiStore) LatestVersion() int64 { panic("not implemented") } diff --git a/store/rootmulti/snapshot_test.go b/store/rootmulti/snapshot_test.go index fc00f5be25d9..cbdb481a05f7 100644 --- a/store/rootmulti/snapshot_test.go +++ b/store/rootmulti/snapshot_test.go @@ -13,19 +13,16 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - corestore "cosmossdk.io/core/store" - coretesting "cosmossdk.io/core/testing" - "cosmossdk.io/log" - "cosmossdk.io/store/iavl" - "cosmossdk.io/store/metrics" - "cosmossdk.io/store/rootmulti" - "cosmossdk.io/store/snapshots" - snapshottypes "cosmossdk.io/store/snapshots/types" - "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/snapshots" + snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/iavl" + "github.com/cosmos/cosmos-sdk/store/rootmulti" + "github.com/cosmos/cosmos-sdk/store/types" + dbm "github.com/tendermint/tm-db" ) -func newMultiStoreWithGeneratedData(db corestore.KVStoreWithBatch, stores uint8, storeKeys uint64) *rootmulti.Store { - multiStore := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) +func newMultiStoreWithGeneratedData(db dbm.DB, stores uint8, storeKeys uint64) *rootmulti.Store { + multiStore := rootmulti.NewStore(db) r := rand.New(rand.NewSource(49872768940)) // Fixed seed for deterministic tests keys := []*types.KVStoreKey{} @@ -34,10 +31,7 @@ func newMultiStoreWithGeneratedData(db corestore.KVStoreWithBatch, stores uint8, multiStore.MountStoreWithDB(key, types.StoreTypeIAVL, nil) keys = append(keys, key) } - err := multiStore.LoadLatestVersion() - if err != nil { - panic(err) - } + multiStore.LoadLatestVersion() for _, key := range keys { store := multiStore.GetCommitKVStore(key).(*iavl.Store) @@ -54,27 +48,23 @@ func newMultiStoreWithGeneratedData(db corestore.KVStoreWithBatch, stores uint8, } multiStore.Commit() - err = multiStore.LoadLatestVersion() - if err != nil { - panic(err) - } + multiStore.LoadLatestVersion() return multiStore } -func newMultiStoreWithMixedMounts(db corestore.KVStoreWithBatch) *rootmulti.Store { - store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) +func newMultiStoreWithMixedMounts(db dbm.DB) *rootmulti.Store { + store := rootmulti.NewStore(db) store.MountStoreWithDB(types.NewKVStoreKey("iavl1"), types.StoreTypeIAVL, nil) store.MountStoreWithDB(types.NewKVStoreKey("iavl2"), types.StoreTypeIAVL, nil) store.MountStoreWithDB(types.NewKVStoreKey("iavl3"), types.StoreTypeIAVL, nil) store.MountStoreWithDB(types.NewTransientStoreKey("trans1"), types.StoreTypeTransient, nil) - if err := store.LoadLatestVersion(); err != nil { - panic(err) - } + store.LoadLatestVersion() + return store } -func newMultiStoreWithMixedMountsAndBasicData(db corestore.KVStoreWithBatch) *rootmulti.Store { +func newMultiStoreWithMixedMountsAndBasicData(db dbm.DB) *rootmulti.Store { store := newMultiStoreWithMixedMounts(db) store1 := store.GetStoreByName("iavl1").(types.CommitKVStore) store2 := store.GetStoreByName("iavl2").(types.CommitKVStore) @@ -101,7 +91,6 @@ func newMultiStoreWithMixedMountsAndBasicData(db corestore.KVStoreWithBatch) *ro } func assertStoresEqual(t *testing.T, expect, actual types.CommitKVStore, msgAndArgs ...interface{}) { - t.Helper() assert.Equal(t, expect.LastCommitID(), actual.LastCommitID()) expectIter := expect.Iterator(nil, nil) expectMap := map[string][]byte{} @@ -125,7 +114,7 @@ func TestMultistoreSnapshot_Checksum(t *testing.T) { // This checksum test makes sure that the byte stream remains identical. If the test fails // without having changed the data (e.g. because the Protobuf or zlib encoding changes), // snapshottypes.CurrentFormat must be bumped. - store := newMultiStoreWithGeneratedData(coretesting.NewMemDB(), 5, 10000) + store := newMultiStoreWithGeneratedData(dbm.NewMemDB(), 5, 10000) version := uint64(store.LastCommitID().Version) testcases := []struct { @@ -138,10 +127,11 @@ func TestMultistoreSnapshot_Checksum(t *testing.T) { "aa048b4ee0f484965d7b3b06822cf0772cdcaad02f3b1b9055e69f2cb365ef3c", "7921eaa3ed4921341e504d9308a9877986a879fe216a099c86e8db66fcba4c63", "a4a864e6c02c9fca5837ec80dc84f650b25276ed7e4820cf7516ced9f9901b86", - "980925390cc50f14998ecb1e87de719ca9dd7e72f5fefbe445397bf670f36c31", + "ca2879ac6e7205d257440131ba7e72bef784cd61642e32b847729e543c1928b9", }}, } for _, tc := range testcases { + tc := tc t.Run(fmt.Sprintf("Format %v", tc.format), func(t *testing.T) { ch := make(chan io.ReadCloser) go func() { @@ -166,7 +156,7 @@ func TestMultistoreSnapshot_Checksum(t *testing.T) { } func TestMultistoreSnapshot_Errors(t *testing.T) { - store := newMultiStoreWithMixedMountsAndBasicData(coretesting.NewMemDB()) + store := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB()) testcases := map[string]struct { height uint64 @@ -176,6 +166,7 @@ func TestMultistoreSnapshot_Errors(t *testing.T) { "unknown height": {9, nil}, } for name, tc := range testcases { + tc := tc t.Run(name, func(t *testing.T) { err := store.Snapshot(tc.height, nil) require.Error(t, err) @@ -187,8 +178,8 @@ func TestMultistoreSnapshot_Errors(t *testing.T) { } func TestMultistoreSnapshotRestore(t *testing.T) { - source := newMultiStoreWithMixedMountsAndBasicData(coretesting.NewMemDB()) - target := newMultiStoreWithMixedMounts(coretesting.NewMemDB()) + source := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB()) + target := newMultiStoreWithMixedMounts(dbm.NewMemDB()) version := uint64(source.LastCommitID().Version) require.EqualValues(t, 3, version) dummyExtensionItem := snapshottypes.SnapshotItem{ @@ -219,8 +210,7 @@ func TestMultistoreSnapshotRestore(t *testing.T) { require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) assert.Equal(t, source.LastCommitID(), target.LastCommitID()) - for _, key := range source.StoreKeysByName() { - sourceStore := source.GetStoreByName(key.Name()).(types.CommitKVStore) + for key, sourceStore := range source.GetStores() { targetStore := target.GetStoreByName(key.Name()).(types.CommitKVStore) switch sourceStore.GetStoreType() { case types.StoreTypeTransient: @@ -233,19 +223,18 @@ func TestMultistoreSnapshotRestore(t *testing.T) { } func benchmarkMultistoreSnapshot(b *testing.B, stores uint8, storeKeys uint64) { - b.Helper() b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.") b.ReportAllocs() b.StopTimer() - source := newMultiStoreWithGeneratedData(coretesting.NewMemDB(), stores, storeKeys) + source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys) version := source.LastCommitID().Version require.EqualValues(b, 1, version) b.StartTimer() for i := 0; i < b.N; i++ { - target := rootmulti.NewStore(coretesting.NewMemDB(), log.NewNopLogger(), metrics.NewNoOpMetrics()) - for _, key := range source.StoreKeysByName() { + target := rootmulti.NewStore(dbm.NewMemDB()) + for key := range source.GetStores() { target.MountStoreWithDB(key, types.StoreTypeIAVL, nil) } err := target.LoadLatestVersion() @@ -269,19 +258,18 @@ func benchmarkMultistoreSnapshot(b *testing.B, stores uint8, storeKeys uint64) { } func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys uint64) { - b.Helper() b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.") b.ReportAllocs() b.StopTimer() - source := newMultiStoreWithGeneratedData(coretesting.NewMemDB(), stores, storeKeys) + source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys) version := uint64(source.LastCommitID().Version) require.EqualValues(b, 1, version) b.StartTimer() for i := 0; i < b.N; i++ { - target := rootmulti.NewStore(coretesting.NewMemDB(), log.NewNopLogger(), metrics.NewNoOpMetrics()) - for _, key := range source.StoreKeysByName() { + target := rootmulti.NewStore(dbm.NewMemDB()) + for key := range source.GetStores() { target.MountStoreWithDB(key, types.StoreTypeIAVL, nil) } err := target.LoadLatestVersion() diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 777eefed8c86..e751528fb0e9 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -1,8 +1,7 @@ package rootmulti import ( - "crypto/sha256" - "errors" + "encoding/binary" "fmt" "io" "math" @@ -14,23 +13,22 @@ import ( protoio "github.com/cosmos/gogoproto/io" gogotypes "github.com/cosmos/gogoproto/types" iavltree "github.com/cosmos/iavl" - - corestore "cosmossdk.io/core/store" - coretesting "cosmossdk.io/core/testing" - errorsmod "cosmossdk.io/errors" - "cosmossdk.io/store/cachemulti" - dbm "cosmossdk.io/store/db" - "cosmossdk.io/store/dbadapter" - "cosmossdk.io/store/iavl" - "cosmossdk.io/store/listenkv" - "cosmossdk.io/store/mem" - "cosmossdk.io/store/metrics" - "cosmossdk.io/store/pruning" - pruningtypes "cosmossdk.io/store/pruning/types" - snapshottypes "cosmossdk.io/store/snapshots/types" - "cosmossdk.io/store/tracekv" - "cosmossdk.io/store/transient" - "cosmossdk.io/store/types" + protoio "github.com/gogo/protobuf/io" + gogotypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + abci "github.com/tendermint/tendermint/abci/types" + dbm "github.com/tendermint/tm-db" + + snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/iavl" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/mem" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/transient" + "github.com/cosmos/cosmos-sdk/store/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) const ( @@ -161,9 +159,9 @@ func (rs *Store) GetCommitKVStore(key types.StoreKey) types.CommitKVStore { return rs.stores[key] } -// StoreKeysByName returns mapping storeNames -> StoreKeys -func (rs *Store) StoreKeysByName() map[string]types.StoreKey { - return rs.keysByName +// GetStores returns mounted stores +func (rs *Store) GetStores() map[types.StoreKey]types.CommitKVStore { + return rs.stores } // LoadLatestVersionAndUpgrade implements CommitMultiStore @@ -697,47 +695,6 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { return store } -func (rs *Store) handlePruning(version int64) error { - pruneHeight := rs.pruningManager.GetPruningHeight(version) - rs.logger.Debug("prune start", "height", version) - defer rs.logger.Debug("prune end", "height", version) - return rs.PruneStores(pruneHeight) -} - -// PruneStores prunes all history up to the specific height of the multi store. -func (rs *Store) PruneStores(pruningHeight int64) (err error) { - if pruningHeight <= 0 { - rs.logger.Debug("pruning skipped, height is less than or equal to 0") - return nil - } - - rs.logger.Debug("pruning store", "heights", pruningHeight) - - for key, store := range rs.stores { - rs.logger.Debug("pruning store", "key", key) // Also log store.name (a private variable)? - - // If the store is wrapped with an inter-block cache, we must first unwrap - // it to get the underlying IAVL store. - if store.GetStoreType() != types.StoreTypeIAVL { - continue - } - - store = rs.GetCommitKVStore(key) - - err := store.(*iavl.Store).DeleteVersionsTo(pruningHeight) - if err == nil { - continue - } - - if errors.Is(err, iavltree.ErrVersionDoesNotExist) { - return err - } - - rs.logger.Error("failed to prune store", "key", key, "err", err) - } - return nil -} - // GetStoreByName performs a lookup of a StoreKey given a store name typically // provided in a path. The StoreKey is then used to perform a lookup and return // a Store. If the Store is wrapped in an inter-block cache, it will be unwrapped @@ -849,10 +806,10 @@ func parsePath(path string) (storeName, subpath string, err error) { // TestMultistoreSnapshot_Checksum test. func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error { if height == 0 { - return errorsmod.Wrap(types.ErrLogic, "cannot snapshot height 0") + return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot snapshot height 0") } - if height > uint64(GetLatestVersion(rs.db)) { - return errorsmod.Wrapf(types.ErrLogic, "cannot snapshot future height %v", height) + if height > uint64(rs.LastCommitID().Version) { + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot snapshot future height %v", height) } // Collect stores to snapshot (only IAVL stores are supported) @@ -870,7 +827,7 @@ func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error { // Non-persisted stores shouldn't be snapshotted continue default: - return errorsmod.Wrapf(types.ErrLogic, + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "don't know how to snapshot store %q of type %T", key.Name(), store) } } @@ -883,58 +840,44 @@ func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error { // and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes // are demarcated by new SnapshotStore items. for _, store := range stores { - rs.logger.Debug("starting snapshot", "store", store.name, "height", height) exporter, err := store.Export(int64(height)) if err != nil { - rs.logger.Error("snapshot failed; exporter error", "store", store.name, "err", err) + return err + } + defer exporter.Close() + err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Store{ + Store: &snapshottypes.SnapshotStoreItem{ + Name: store.name, + }, + }, + }) + if err != nil { return err } - err = func() error { - defer exporter.Close() - - err := protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_Store{ - Store: &snapshottypes.SnapshotStoreItem{ - Name: store.name, + for { + node, err := exporter.Next() + if err == iavltree.ExportDone { + break + } else if err != nil { + return err + } + err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_IAVL{ + IAVL: &snapshottypes.SnapshotIAVLItem{ + Key: node.Key, + Value: node.Value, + Height: int32(node.Height), + Version: node.Version, }, }, }) if err != nil { - rs.logger.Error("snapshot failed; item store write failed", "store", store.name, "err", err) return err } - - nodeCount := 0 - for { - node, err := exporter.Next() - if errors.Is(err, iavltree.ErrorExportDone) { - rs.logger.Debug("snapshot Done", "store", store.name, "nodeCount", nodeCount) - break - } else if err != nil { - return err - } - err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_IAVL{ - IAVL: &snapshottypes.SnapshotIAVLItem{ - Key: node.Key, - Value: node.Value, - Height: int32(node.Height), - Version: node.Version, - }, - }, - }) - if err != nil { - return err - } - nodeCount++ - } - - return nil - }() - if err != nil { - return err } + exporter.Close() } return nil @@ -954,10 +897,10 @@ loop: for { snapshotItem = snapshottypes.SnapshotItem{} err := protoReader.ReadMsg(&snapshotItem) - if errors.Is(err, io.EOF) { + if err == io.EOF { break } else if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message") + return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message") } switch item := snapshotItem.Item.(type) { @@ -965,17 +908,17 @@ loop: if importer != nil { err = importer.Commit() if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "IAVL commit failed") + return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "IAVL commit failed") } importer.Close() } store, ok := rs.GetStoreByName(item.Store.Name).(*iavl.Store) if !ok || store == nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrapf(types.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name) + return snapshottypes.SnapshotItem{}, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name) } importer, err = store.Import(int64(height)) if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "import failed") + return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "import failed") } defer importer.Close() // Importer height must reflect the node height (which usually matches the block height, but not always) @@ -983,11 +926,10 @@ loop: case *snapshottypes.SnapshotItem_IAVL: if importer == nil { - rs.logger.Error("failed to restore; received IAVL node item before store item") - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(types.ErrLogic, "received IAVL node item before store item") + return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(sdkerrors.ErrLogic, "received IAVL node item before store item") } if item.IAVL.Height > math.MaxInt8 { - return snapshottypes.SnapshotItem{}, errorsmod.Wrapf(types.ErrLogic, "node height %v cannot exceed %v", + return snapshottypes.SnapshotItem{}, sdkerrors.Wrapf(sdkerrors.ErrLogic, "node height %v cannot exceed %v", item.IAVL.Height, math.MaxInt8) } node := &iavltree.ExportNode{ @@ -1006,7 +948,7 @@ loop: } err := importer.Add(node) if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "IAVL node import failed") + return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "IAVL node import failed") } default: @@ -1017,12 +959,12 @@ loop: if importer != nil { err := importer.Commit() if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "IAVL commit failed") + return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "IAVL commit failed") } importer.Close() } - rs.flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height))) + flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)), []int64{}) return snapshotItem, rs.LoadLatestVersion() } diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index 4e45adc49f9d..46e2ab63194d 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -2,23 +2,22 @@ package rootmulti import ( "bytes" - "crypto/sha256" "fmt" "testing" "time" "github.com/stretchr/testify/require" - - corestore "cosmossdk.io/core/store" - coretesting "cosmossdk.io/core/testing" - "cosmossdk.io/errors" - "cosmossdk.io/log" - "cosmossdk.io/store/cachemulti" - "cosmossdk.io/store/iavl" - sdkmaps "cosmossdk.io/store/internal/maps" - "cosmossdk.io/store/metrics" - pruningtypes "cosmossdk.io/store/pruning/types" - "cosmossdk.io/store/types" + abci "github.com/tendermint/tendermint/abci/types" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/cosmos/cosmos-sdk/store/iavl" + sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) func TestStoreType(t *testing.T) { @@ -625,71 +624,6 @@ func TestMultiStore_PruningRestart(t *testing.T) { actualHeightToPrune = ms.pruningManager.GetPruningHeight(ms.LatestVersion()) require.Equal(t, int64(8), actualHeightToPrune) - // Ensure async pruning is done - isPruned := func() bool { - ms.Commit() // to flush the batch with the pruned heights - for v := int64(1); v <= actualHeightToPrune; v++ { - if _, err := ms.CacheMultiStoreWithVersion(v); err == nil { - return false - } - } - return true - } - - require.Eventually(t, isPruned, 1*time.Second, 10*time.Millisecond, "expected error when loading pruned heights") -} - -var _ types.PausablePruner = &pauseableCommitKVStoreStub{} - -type pauseableCommitKVStoreStub struct { - types.CommitKVStore - pauseCalled []bool -} - -func (p *pauseableCommitKVStoreStub) PausePruning(b bool) { - p.pauseCalled = append(p.pauseCalled, b) -} - -func TestPausePruningOnCommit(t *testing.T) { - store := NewStore(coretesting.NewMemDB(), log.NewNopLogger(), metrics.NewNoOpMetrics()) - store.SetPruning(pruningtypes.NewCustomPruningOptions(2, 11)) - store.MountStoreWithDB(testStoreKey1, types.StoreTypeIAVL, nil) - require.NoError(t, store.LoadLatestVersion()) - - myStub := &pauseableCommitKVStoreStub{CommitKVStore: store.stores[testStoreKey1]} - store.stores[testStoreKey1] = myStub - // when - store.Commit() - // then - require.Equal(t, []bool{true, false}, myStub.pauseCalled) -} - -// TestUnevenStoresHeightCheck tests if loading root store correctly errors when -// there's any module store with the wrong height -func TestUnevenStoresHeightCheck(t *testing.T) { - db := coretesting.NewMemDB() - store := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) - err := store.LoadLatestVersion() - require.Nil(t, err) - - // commit to increment store's height - store.Commit() - - // mount store4 to root store - store.MountStoreWithDB(types.NewKVStoreKey("store4"), types.StoreTypeIAVL, nil) - - // load the stores without upgrades - err = store.LoadLatestVersion() - require.Error(t, err) - - // now, let's load with upgrades... - upgrades := &types.StoreUpgrades{ - Added: []string{"store4"}, - } - err = store.LoadLatestVersionAndUpgrade(upgrades) - require.Nil(t, err) -} - func TestSetInitialVersion(t *testing.T) { db := coretesting.NewMemDB() multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) @@ -789,96 +723,6 @@ func TestTraceConcurrency(t *testing.T) { stopW <- struct{}{} } -func BenchmarkMultistoreSnapshot100K(b *testing.B) { - benchmarkMultistoreSnapshot(b, 10, 10000) -} - -func TestTraceConcurrency(t *testing.T) { - db := coretesting.NewMemDB() - multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) - err := multi.LoadLatestVersion() - require.NoError(t, err) - - b := &bytes.Buffer{} - key := multi.keysByName["store1"] - tc := types.TraceContext(map[string]interface{}{"blockHeight": 64}) - - multi.SetTracer(b) - multi.SetTracingContext(tc) - - cms := multi.CacheMultiStore() - store1 := cms.GetKVStore(key) - cw := store1.CacheWrapWithTrace(b, tc) - _ = cw - require.NotNil(t, store1) - - stop := make(chan struct{}) - stopW := make(chan struct{}) - - go func(stop chan struct{}) { - for { - select { - case <-stop: - return - default: - store1.Set([]byte{1}, []byte{1}) - cms.Write() - } - } - }(stop) - - go func(stop chan struct{}) { - for { - select { - case <-stop: - return - default: - multi.SetTracingContext(tc) - } - } - }(stopW) - - time.Sleep(3 * time.Second) - stop <- struct{}{} - stopW <- struct{}{} -} - -func TestCommitOrdered(t *testing.T) { - db := coretesting.NewMemDB() - multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) - err := multi.LoadLatestVersion() - require.Nil(t, err) - - emptyHash := sha256.Sum256([]byte{}) - appHash := emptyHash[:] - commitID := types.CommitID{Hash: appHash} - checkStore(t, multi, commitID, commitID) - - k, v := []byte("wind"), []byte("blows") - k2, v2 := []byte("water"), []byte("flows") - k3, v3 := []byte("fire"), []byte("burns") - - store1 := multi.GetStoreByName("store1").(types.KVStore) - store1.Set(k, v) - - store2 := multi.GetStoreByName("store2").(types.KVStore) - store2.Set(k2, v2) - - store3 := multi.GetStoreByName("store3").(types.KVStore) - store3.Set(k3, v3) - - typeID := multi.Commit() - require.Equal(t, int64(1), typeID.Version) - - ci, err := multi.GetCommitInfo(1) - require.NoError(t, err) - require.Equal(t, int64(1), ci.Version) - require.Equal(t, 3, len(ci.StoreInfos)) - for i, s := range ci.StoreInfos { - require.Equal(t, s.Name, fmt.Sprintf("store%d", i+1)) - } -} - //----------------------------------------------------------------------- // utils @@ -899,9 +743,9 @@ func newMultiStoreWithMounts(db corestore.KVStoreWithBatch, pruningOpts pruningt return store } -func newMultiStoreWithModifiedMounts(db corestore.KVStoreWithBatch, pruningOpts pruningtypes.PruningOptions) (*Store, *types.StoreUpgrades) { - store := NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) - store.SetPruning(pruningOpts) +func newMultiStoreWithModifiedMounts(db dbm.DB, pruningOpts types.PruningOptions) (*Store, *types.StoreUpgrades) { + store := NewStore(db) + store.pruningOpts = pruningOpts store.MountStoreWithDB(types.NewKVStoreKey("store1"), types.StoreTypeIAVL, nil) store.MountStoreWithDB(types.NewKVStoreKey("restore2"), types.StoreTypeIAVL, nil) @@ -920,13 +764,6 @@ func newMultiStoreWithModifiedMounts(db corestore.KVStoreWithBatch, pruningOpts return store, upgrades } -func unmountStore(rootStore *Store, storeKeyName string) { - sk := rootStore.keysByName[storeKeyName] - delete(rootStore.stores, sk) - delete(rootStore.storesParams, sk) - delete(rootStore.keysByName, storeKeyName) -} - func checkStore(t *testing.T, store *Store, expect, got types.CommitID) { t.Helper() require.Equal(t, expect, got) diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index 911fcf75490f..e7f07c282738 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -68,15 +68,10 @@ func TestManager_Take(t *testing.T) { {7, 8, 9}, } snapshotter := &mockSnapshotter{ - items: items, - prunedHeights: make(map[int64]struct{}), + items: items, } - extSnapshotter := newExtSnapshotter(10) - - expectChunks := snapshotItems(items, extSnapshotter) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) - err := manager.RegisterExtensions(extSnapshotter) - require.NoError(t, err) + expectChunks := snapshotItems(items) + manager := snapshots.NewManager(store, snapshotter) // nil manager should return error _, err = (*snapshots.Manager)(nil).Create(1) @@ -98,7 +93,7 @@ func TestManager_Take(t *testing.T) { Height: 5, Format: snapshotter.SnapshotFormat(), Chunks: 1, - Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, + Hash: []uint8{0xcd, 0x17, 0x9e, 0x7f, 0x28, 0xb6, 0x82, 0x90, 0xc7, 0x25, 0xf3, 0x42, 0xac, 0x65, 0x73, 0x50, 0xaa, 0xa0, 0x10, 0x5c, 0x40, 0x8c, 0xd5, 0x1, 0xed, 0x82, 0xb5, 0xca, 0x8b, 0xe0, 0x83, 0xa2}, Metadata: types.Metadata{ ChunkHashes: checksums(expectChunks), }, @@ -151,7 +146,7 @@ func TestManager_Restore(t *testing.T) { {7, 8, 9}, } - chunks := snapshotItems(expectItems, newExtSnapshotter(10)) + chunks := snapshotItems(expectItems) // Restore errors on invalid format err = manager.Restore(types.Snapshot{ @@ -214,14 +209,6 @@ func TestManager_Restore(t *testing.T) { } assert.Equal(t, expectItems, target.items) - assert.Equal(t, 10, len(extSnapshotter.state)) - - // The snapshot is saved in local snapshot store - snapshots, err := store.List() - require.NoError(t, err) - snapshot := snapshots[0] - require.Equal(t, uint64(3), snapshot.Height) - require.Equal(t, types.CurrentFormat, snapshot.Format) // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ diff --git a/store/snapshots/stream.go b/store/snapshots/stream.go deleted file mode 100644 index e010f9224468..000000000000 --- a/store/snapshots/stream.go +++ /dev/null @@ -1,113 +0,0 @@ -package snapshots - -import ( - "bufio" - "compress/zlib" - "io" - - protoio "github.com/cosmos/gogoproto/io" - "github.com/cosmos/gogoproto/proto" - - "cosmossdk.io/errors" -) - -const ( - // Do not change chunk size without new snapshot format (must be uniform across nodes) - snapshotChunkSize = uint64(10e6) - snapshotBufferSize = int(snapshotChunkSize) - // Do not change compression level without new snapshot format (must be uniform across nodes) - snapshotCompressionLevel = 7 -) - -// StreamWriter set up a stream pipeline to serialize snapshot nodes: -// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser -type StreamWriter struct { - chunkWriter *ChunkWriter - bufWriter *bufio.Writer - zWriter *zlib.Writer - protoWriter protoio.WriteCloser -} - -// NewStreamWriter set up a stream pipeline to serialize snapshot DB records. -func NewStreamWriter(ch chan<- io.ReadCloser) *StreamWriter { - chunkWriter := NewChunkWriter(ch, snapshotChunkSize) - bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize) - zWriter, err := zlib.NewWriterLevel(bufWriter, snapshotCompressionLevel) - if err != nil { - chunkWriter.CloseWithError(errors.Wrap(err, "zlib failure")) - return nil - } - protoWriter := protoio.NewDelimitedWriter(zWriter) - return &StreamWriter{ - chunkWriter: chunkWriter, - bufWriter: bufWriter, - zWriter: zWriter, - protoWriter: protoWriter, - } -} - -// WriteMsg implements protoio.Write interface -func (sw *StreamWriter) WriteMsg(msg proto.Message) error { - return sw.protoWriter.WriteMsg(msg) -} - -// Close implements io.Closer interface -func (sw *StreamWriter) Close() error { - if err := sw.protoWriter.Close(); err != nil { - sw.chunkWriter.CloseWithError(err) - return err - } - if err := sw.bufWriter.Flush(); err != nil { - sw.chunkWriter.CloseWithError(err) - return err - } - return sw.chunkWriter.Close() -} - -// CloseWithError pass error to chunkWriter -func (sw *StreamWriter) CloseWithError(err error) { - sw.chunkWriter.CloseWithError(err) -} - -// StreamReader set up a restore stream pipeline -// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode -type StreamReader struct { - chunkReader *ChunkReader - zReader io.ReadCloser - protoReader protoio.ReadCloser -} - -// NewStreamReader set up a restore stream pipeline. -func NewStreamReader(chunks <-chan io.ReadCloser) (*StreamReader, error) { - chunkReader := NewChunkReader(chunks) - zReader, err := zlib.NewReader(chunkReader) - if err != nil { - return nil, errors.Wrap(err, "zlib failure") - } - protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize) - return &StreamReader{ - chunkReader: chunkReader, - zReader: zReader, - protoReader: protoReader, - }, nil -} - -// ReadMsg implements protoio.Reader interface -func (sr *StreamReader) ReadMsg(msg proto.Message) error { - return sr.protoReader.ReadMsg(msg) -} - -// Close implements io.Closer interface -func (sr *StreamReader) Close() error { - var err error - if err1 := sr.protoReader.Close(); err1 != nil { - err = err1 - } - if err2 := sr.zReader.Close(); err2 != nil { - err = err2 - } - if err3 := sr.chunkReader.Close(); err3 != nil { - err = err3 - } - return err -} diff --git a/store/snapshots/types/util.go b/store/snapshots/types/util.go deleted file mode 100644 index 861647088b7b..000000000000 --- a/store/snapshots/types/util.go +++ /dev/null @@ -1,16 +0,0 @@ -package types - -import ( - protoio "github.com/cosmos/gogoproto/io" -) - -// WriteExtensionPayload writes an extension payload for current extension snapshotter. -func WriteExtensionPayload(protoWriter protoio.Writer, payload []byte) error { - return protoWriter.WriteMsg(&SnapshotItem{ - Item: &SnapshotItem_ExtensionPayload{ - ExtensionPayload: &SnapshotExtensionPayload{ - Payload: payload, - }, - }, - }) -}