diff --git a/store/v2/commitment/iavl/tree.go b/store/v2/commitment/iavl/tree.go index ea02eecedb62..5de6a4c86842 100644 --- a/store/v2/commitment/iavl/tree.go +++ b/store/v2/commitment/iavl/tree.go @@ -8,11 +8,15 @@ import ( "cosmossdk.io/core/log" corestore "cosmossdk.io/core/store" + "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" dbm "cosmossdk.io/store/v2/db" ) -var _ commitment.Tree = (*IavlTree)(nil) +var ( + _ commitment.Tree = (*IavlTree)(nil) + _ store.PausablePruner = (*IavlTree)(nil) +) // IavlTree is a wrapper around iavl.MutableTree. type IavlTree struct { @@ -21,7 +25,7 @@ type IavlTree struct { // NewIavlTree creates a new IavlTree instance. func NewIavlTree(db corestore.KVStoreWithBatch, logger log.Logger, cfg *Config) *IavlTree { - tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger) + tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger, iavl.AsyncPruningOption(true)) return &IavlTree{ tree: tree, } @@ -98,6 +102,15 @@ func (t *IavlTree) Prune(version uint64) error { return t.tree.DeleteVersionsTo(int64(version)) } +// PausePruning pauses the pruning process. +func (t *IavlTree) PausePruning(pause bool) { + if pause { + t.tree.SetCommitting() + } else { + t.tree.UnsetCommitting() + } +} + // Export exports the tree exporter at the given version. func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) { tree, err := t.tree.GetImmutable(int64(version)) diff --git a/store/v2/commitment/iavl/tree_test.go b/store/v2/commitment/iavl/tree_test.go index 6fdd950c85cc..1ef49980dd2a 100644 --- a/store/v2/commitment/iavl/tree_test.go +++ b/store/v2/commitment/iavl/tree_test.go @@ -2,27 +2,27 @@ package iavl import ( "testing" + "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "cosmossdk.io/core/log" corestore "cosmossdk.io/core/store" - "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" dbm "cosmossdk.io/store/v2/db" ) func TestCommitterSuite(t *testing.T) { s := &commitment.CommitStoreTestSuite{ - NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) { + NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) { multiTrees := make(map[string]commitment.Tree) cfg := DefaultConfig() for _, storeKey := range storeKeys { prefixDB := dbm.NewPrefixDB(db, []byte(storeKey)) multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg) } - return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger) + return commitment.NewCommitStore(multiTrees, db, logger) }, } @@ -100,8 +100,14 @@ func TestIavlTree(t *testing.T) { err = tree.Prune(1) require.NoError(t, err) require.Equal(t, uint64(3), tree.GetLatestVersion()) - err = tree.LoadVersion(1) - require.Error(t, err) + // async pruning check + checkErr := func() bool { + if _, err := tree.tree.LoadVersion(1); err != nil { + return true + } + return false + } + require.Eventually(t, checkErr, 2*time.Second, 100*time.Millisecond) // load version 2 err = tree.LoadVersion(2) diff --git a/store/v2/commitment/store.go b/store/v2/commitment/store.go index a18cbc49e315..a24487160b57 100644 --- a/store/v2/commitment/store.go +++ b/store/v2/commitment/store.go @@ -28,6 +28,7 @@ const ( var ( _ store.Committer = (*CommitStore)(nil) _ snapshots.CommitSnapshotter = (*CommitStore)(nil) + _ store.PausablePruner = (*CommitStore)(nil) ) // CommitStore is a wrapper around multiple Tree objects mapped by a unique store @@ -39,26 +40,18 @@ type CommitStore struct { logger log.Logger db corestore.KVStoreWithBatch multiTrees map[string]Tree - - // pruneOptions is the pruning configuration. - pruneOptions *store.PruneOptions // TODO are there no default prune options? } // NewCommitStore creates a new CommitStore instance. -func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) { - if pruneOpts == nil { - pruneOpts = store.DefaultPruneOptions() - } - +func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, logger log.Logger) (*CommitStore, error) { return &CommitStore{ - logger: logger, - db: db, - multiTrees: trees, - pruneOptions: pruneOpts, + logger: logger, + db: db, + multiTrees: trees, }, nil } -func (c *CommitStore) WriteBatch(cs *corestore.Changeset) error { +func (c *CommitStore) WriteChangeset(cs *corestore.Changeset) error { for _, pairs := range cs.Changes { key := conv.UnsafeBytesToStr(pairs.Actor) @@ -237,13 +230,6 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) { return nil, err } - // Prune the old versions. - if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune { - if err := c.Prune(pruneVersion); err != nil { - c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err) - } - } - return cInfo, nil } @@ -297,6 +283,7 @@ func (c *CommitStore) Get(storeKey []byte, version uint64, key []byte) ([]byte, return bz, nil } +// Prune implements store.Pruner. func (c *CommitStore) Prune(version uint64) (ferr error) { // prune the metadata batch := c.db.NewBatch() @@ -322,6 +309,15 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { return ferr } +// PausePruning implements store.PausablePruner. +func (c *CommitStore) PausePruning(pause bool) { + for _, tree := range c.multiTrees { + if pruner, ok := tree.(store.PausablePruner); ok { + pruner.PausePruning(pause) + } + } +} + // Snapshot implements snapshotstypes.CommitSnapshotter. func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error { if version == 0 { diff --git a/store/v2/commitment/store_test_suite.go b/store/v2/commitment/store_test_suite.go index aa35a65dfa52..686327d2f6d9 100644 --- a/store/v2/commitment/store_test_suite.go +++ b/store/v2/commitment/store_test_suite.go @@ -25,12 +25,12 @@ const ( type CommitStoreTestSuite struct { suite.Suite - NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) + NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*CommitStore, error) } func (s *CommitStoreTestSuite) TestStore_Snapshotter() { storeKeys := []string{storeKey1, storeKey2} - commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger()) + commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) s.Require().NoError(err) latestVersion := uint64(10) @@ -45,7 +45,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() { kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value}) } } - s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs))) + s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs))) _, err = commitStore.Commit(i) s.Require().NoError(err) @@ -64,7 +64,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() { }, } - targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger()) + targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) s.Require().NoError(err) chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) @@ -129,7 +129,7 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() { KeepRecent: 10, Interval: 5, } - commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger()) + commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) s.Require().NoError(err) latestVersion := uint64(100) @@ -144,10 +144,15 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() { kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value}) } } - s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs))) + s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs))) _, err = commitStore.Commit(i) s.Require().NoError(err) + + if prune, pruneVersion := pruneOpts.ShouldPrune(i); prune { + s.Require().NoError(commitStore.Prune(pruneVersion)) + } + } pruneVersion := latestVersion - pruneOpts.KeepRecent - 1 diff --git a/store/v2/database.go b/store/v2/database.go index 52e7dd848ed5..a0466de18dea 100644 --- a/store/v2/database.go +++ b/store/v2/database.go @@ -20,11 +20,6 @@ type VersionedDatabase interface { ApplyChangeset(version uint64, cs *corestore.Changeset) error - // Prune attempts to prune all versions up to and including the provided - // version argument. The operation should be idempotent. An error should be - // returned upon failure. - Prune(version uint64) error - // Close releases associated resources. It should NOT be idempotent. It must // only be called once and any call after may panic. io.Closer @@ -32,8 +27,8 @@ type VersionedDatabase interface { // Committer defines an API for committing state. type Committer interface { - // WriteBatch writes a batch of key-value pairs to the tree. - WriteBatch(cs *corestore.Changeset) error + // WriteChangeset writes the changeset to the commitment state. + WriteChangeset(cs *corestore.Changeset) error // WorkingCommitInfo returns the CommitInfo for the working tree. WorkingCommitInfo(version uint64) *proof.CommitInfo @@ -62,11 +57,6 @@ type Committer interface { // GetCommitInfo returns the CommitInfo for the given version. GetCommitInfo(version uint64) (*proof.CommitInfo, error) - // Prune attempts to prune all versions up to and including the provided - // version argument. The operation should be idempotent. An error should be - // returned upon failure. - Prune(version uint64) error - // Close releases associated resources. It should NOT be idempotent. It must // only be called once and any call after may panic. io.Closer diff --git a/store/v2/go.mod b/store/v2/go.mod index 14c6ffdc5d91..2cb69d47b978 100644 --- a/store/v2/go.mod +++ b/store/v2/go.mod @@ -8,7 +8,7 @@ require ( cosmossdk.io/log v1.3.1 github.com/cockroachdb/pebble v1.1.0 github.com/cosmos/gogoproto v1.4.12 - github.com/cosmos/iavl v1.1.4 + github.com/cosmos/iavl v1.2.0 github.com/cosmos/ics23/go v0.10.0 github.com/google/btree v1.1.2 github.com/hashicorp/go-metrics v0.5.3 diff --git a/store/v2/go.sum b/store/v2/go.sum index 511f01825944..b14adf821a85 100644 --- a/store/v2/go.sum +++ b/store/v2/go.sum @@ -36,8 +36,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA= github.com/cosmos/gogoproto v1.4.12 h1:vB6Lbe/rtnYGjQuFxkPiPYiCybqFT8QvLipDZP8JpFE= github.com/cosmos/gogoproto v1.4.12/go.mod h1:LnZob1bXRdUoqMMtwYlcR3wjiElmlC+FkjaZRv1/eLY= -github.com/cosmos/iavl v1.1.4 h1:Z0cVVjeQqOUp78/nWt/uhQy83vYluWlAMGQ4zbH9G34= -github.com/cosmos/iavl v1.1.4/go.mod h1:vCYmRQUJU1wwj0oRD3wMEtOM9sJNDP+GFMaXmIxZ/rU= +github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM= +github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI= github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM= github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/store/v2/migration/manager.go b/store/v2/migration/manager.go index 4a719c06f438..a55524821a01 100644 --- a/store/v2/migration/manager.go +++ b/store/v2/migration/manager.go @@ -233,7 +233,7 @@ func (m *Manager) Sync() error { return fmt.Errorf("failed to unmarshal changeset: %w", err) } if m.stateCommitment != nil { - if err := m.stateCommitment.WriteBatch(cs); err != nil { + if err := m.stateCommitment.WriteChangeset(cs); err != nil { return fmt.Errorf("failed to write changeset to commitment: %w", err) } if _, err := m.stateCommitment.Commit(version); err != nil { diff --git a/store/v2/migration/manager_test.go b/store/v2/migration/manager_test.go index 419e26bbbf43..dea337f703b5 100644 --- a/store/v2/migration/manager_test.go +++ b/store/v2/migration/manager_test.go @@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig()) } - commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger()) + commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger()) require.NoError(t, err) snapshotsStore, err := snapshots.NewStore(t.TempDir()) @@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm storageDB, err := pebbledb.New(t.TempDir()) require.NoError(t, err) - newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2 + newStorageStore := storage.NewStorageStore(storageDB, log.NewNopLogger()) // for store/v2 db1 := dbm.NewMemDB() multiTrees1 := make(map[string]commitment.Tree) @@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig()) } - newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2 + newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2 require.NoError(t, err) if noCommitStore { newCommitStore = nil @@ -71,7 +71,7 @@ func TestMigrateState(t *testing.T) { cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) } } - require.NoError(t, orgCommitStore.WriteBatch(cs)) + require.NoError(t, orgCommitStore.WriteChangeset(cs)) _, err := orgCommitStore.Commit(version) require.NoError(t, err) } diff --git a/store/v2/pruning/README.md b/store/v2/pruning/README.md new file mode 100644 index 000000000000..31a4086eb7e9 --- /dev/null +++ b/store/v2/pruning/README.md @@ -0,0 +1,52 @@ +# Pruning Manager + +The `pruning` package defines the `PruningManager` struct which is responsible for +pruning the state storage (SS) and the state commitment (SC) based on the current +height of the chain. The `PruneOptions` struct defines the configuration for pruning +and is passed to the `PruningManager` during initialization. + +## Prune Options + +The `PruneOptions` struct includes the following fields: + +* `KeepRecent` (uint64): The number of recent heights to keep in the state. +* `Interval` (uint64): The interval of how often to prune the state. 0 means no pruning. + +## Pausable Pruner + +The `PausablePruner` interface defines the `PausePruning` method, which is used to pause +the pruning process. The `PruningManager` will check if the pruner is a `PausablePruner` +and call the `PausePruning` method before and after `Commit` to pause and resume pruning. +This is useful for when the pruning process is asynchronous and needs to be paused during +a commit to prevent parallel writes. + +## Pruning Flow + +```mermaid +sequenceDiagram + autonumber + + participant A as RootStore + participant B as PruningManager + participant C as CommitmentStore + participant D as StorageStore + + loop Commit + A->>B: SignalCommit(true, height) + alt SC is PausablePruner + B->>C: PausePruning(true) + else SS is PausablePruner + B->>D: PausePruing(true) + end + A->>C: Commit Changeset + A->>D: Write Changeset + A->>B: SignalCommit(false, height) + alt SC is PausablePruner + B->>C: PausePruning(false) + else SS is PausablePruner + B->>D: PausePruing(false) + end + B->>C: Prune(height) + B->>D: Prune(height) + end +``` diff --git a/store/v2/pruning/manager.go b/store/v2/pruning/manager.go new file mode 100644 index 000000000000..5e3c0d0d0223 --- /dev/null +++ b/store/v2/pruning/manager.go @@ -0,0 +1,69 @@ +package pruning + +import "cosmossdk.io/store/v2" + +// Manager is a struct that manages the pruning of old versions of the SC and SS. +type Manager struct { + // scPruner is the pruner for the SC. + scPruner store.Pruner + // scPruningOptions are the pruning options for the SC. + scPruningOptions *store.PruneOptions + // ssPruner is the pruner for the SS. + ssPruner store.Pruner + // ssPruningOptions are the pruning options for the SS. + ssPruningOptions *store.PruneOptions +} + +// NewManager creates a new Pruning Manager. +func NewManager(scPruner, ssPruner store.Pruner, scPruningOptions, ssPruningOptions *store.PruneOptions) *Manager { + return &Manager{ + scPruner: scPruner, + scPruningOptions: scPruningOptions, + ssPruner: ssPruner, + ssPruningOptions: ssPruningOptions, + } +} + +// Prune prunes the SC and SS to the provided version. +// +// NOTE: It can be called outside of the store manually. +func (m *Manager) Prune(version uint64) error { + // Prune the SC. + if m.scPruningOptions != nil { + if prune, pruneTo := m.scPruningOptions.ShouldPrune(version); prune { + if err := m.scPruner.Prune(pruneTo); err != nil { + return err + } + } + } + + // Prune the SS. + if m.ssPruningOptions != nil { + if prune, pruneTo := m.ssPruningOptions.ShouldPrune(version); prune { + if err := m.ssPruner.Prune(pruneTo); err != nil { + return err + } + } + } + + return nil +} + +// SignalCommit signals to the manager that a commit has started or finished. +// It is used to trigger the pruning of the SC and SS. +// It pauses or resumes the pruning of the SC and SS if the pruner implements +// the PausablePruner interface. +func (m *Manager) SignalCommit(start bool, version uint64) error { + if scPausablePruner, ok := m.scPruner.(store.PausablePruner); ok { + scPausablePruner.PausePruning(start) + } + if ssPausablePruner, ok := m.ssPruner.(store.PausablePruner); ok { + ssPausablePruner.PausePruning(start) + } + + if !start { + return m.Prune(version) + } + + return nil +} diff --git a/store/v2/pruning/manager_test.go b/store/v2/pruning/manager_test.go new file mode 100644 index 000000000000..a8010c10bfc4 --- /dev/null +++ b/store/v2/pruning/manager_test.go @@ -0,0 +1,172 @@ +package pruning + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + corestore "cosmossdk.io/core/store" + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" + "cosmossdk.io/store/v2/storage" + "cosmossdk.io/store/v2/storage/sqlite" +) + +var storeKeys = []string{"store1", "store2", "store3"} + +type PruningManagerTestSuite struct { + suite.Suite + + manager *Manager + sc *commitment.CommitStore + ss *storage.StorageStore +} + +func TestPruningManagerTestSuite(t *testing.T) { + suite.Run(t, &PruningManagerTestSuite{}) +} + +func (s *PruningManagerTestSuite) SetupTest() { + nopLog := log.NewNopLogger() + var err error + + mdb := dbm.NewMemDB() + multiTrees := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) + } + s.sc, err = commitment.NewCommitStore(multiTrees, mdb, nopLog) + s.Require().NoError(err) + + sqliteDB, err := sqlite.New(s.T().TempDir()) + s.Require().NoError(err) + s.ss = storage.NewStorageStore(sqliteDB, nopLog) + scPruneOptions := &store.PruneOptions{ + KeepRecent: 0, + Interval: 1, + } // prune all + ssPruneOptions := &store.PruneOptions{ + KeepRecent: 5, + Interval: 10, + } // prune some + s.manager = NewManager(s.sc, s.ss, scPruneOptions, ssPruneOptions) +} + +func (s *PruningManagerTestSuite) TestPrune() { + // commit changesets with pruning + toVersion := uint64(100) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + s.Require().NoError(s.sc.WriteChangeset(cs)) + _, err := s.sc.Commit(version) + s.Require().NoError(err) + + s.Require().NoError(s.ss.ApplyChangeset(version, cs)) + + s.Require().NoError(s.manager.Prune(version)) + } + + // wait for the pruning to finish in the commitment store + checkSCPrune := func() bool { + count := 0 + for _, storeKey := range storeKeys { + _, err := s.sc.GetProof([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", toVersion-1, 0))) + if err != nil { + count++ + } + } + + return count == len(storeKeys) + } + s.Require().Eventually(checkSCPrune, 10*time.Second, 1*time.Second) + + // check the storage store + _, pruneVersion := s.manager.ssPruningOptions.ShouldPrune(toVersion) + for version := uint64(1); version <= toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + key := []byte(fmt.Sprintf("key-%d-%d", version, i)) + value, err := s.ss.Get([]byte(storeKey), version, key) + if version <= pruneVersion { + s.Require().Nil(value) + s.Require().Error(err) + } else { + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), value) + } + } + } + } +} + +func TestPruneOptions(t *testing.T) { + testCases := []struct { + name string + options *store.PruneOptions + version uint64 + pruning bool + pruneVersion uint64 + }{ + { + name: "no pruning", + options: &store.PruneOptions{ + KeepRecent: 100, + Interval: 0, + }, + version: 100, + pruning: false, + pruneVersion: 0, + }, + { + name: "prune all", + options: &store.PruneOptions{ + KeepRecent: 0, + Interval: 1, + }, + version: 19, + pruning: true, + pruneVersion: 18, + }, + { + name: "prune none", + options: &store.PruneOptions{ + KeepRecent: 100, + Interval: 10, + }, + version: 19, + pruning: false, + pruneVersion: 0, + }, + { + name: "prune some", + options: &store.PruneOptions{ + KeepRecent: 10, + Interval: 50, + }, + version: 100, + pruning: true, + pruneVersion: 89, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pruning, pruneVersion := tc.options.ShouldPrune(tc.version) + require.Equal(t, tc.pruning, pruning) + require.Equal(t, tc.pruneVersion, pruneVersion) + }) + } +} diff --git a/store/v2/root/factory.go b/store/v2/root/factory.go index 6263cfd592fe..ec81208f0109 100644 --- a/store/v2/root/factory.go +++ b/store/v2/root/factory.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/store/v2/commitment/mem" "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/internal" + "cosmossdk.io/store/v2/pruning" "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/pebbledb" "cosmossdk.io/store/v2/storage/sqlite" @@ -31,14 +32,15 @@ const ( ) type FactoryOptions struct { - Logger log.Logger - RootDir string - SSType SSType - SCType SCType - PruneOptions *store.PruneOptions - IavlConfig *iavl.Config - StoreKeys []string - SCRawDB corestore.KVStoreWithBatch + Logger log.Logger + RootDir string + SSType SSType + SCType SCType + SSPruneOptions *store.PruneOptions + SCPruneOptions *store.PruneOptions + IavlConfig *iavl.Config + StoreKeys []string + SCRawDB corestore.KVStoreWithBatch } // CreateRootStore is a convenience function to create a root store based on the @@ -48,8 +50,8 @@ type FactoryOptions struct { func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) { var ( ssDb storage.Database - ss store.VersionedDatabase - sc store.Committer + ss *storage.StorageStore + sc *commitment.CommitStore err error ensureDir = func(dir string) error { if err := os.MkdirAll(dir, 0x0755); err != nil { @@ -71,7 +73,7 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) { if err = ensureDir(dir); err != nil { return nil, err } - ssDb, err = pebbledb.New(fmt.Sprintf("%s/data/ss/pebble", opts.RootDir)) + ssDb, err = pebbledb.New(dir) case SSTypeRocks: // TODO: rocksdb requires build tags so is not supported here by default return nil, fmt.Errorf("rocksdb not supported") @@ -79,7 +81,7 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) { if err != nil { return nil, err } - ss = storage.NewStorageStore(ssDb, opts.PruneOptions, opts.Logger) + ss = storage.NewStorageStore(ssDb, opts.Logger) trees := make(map[string]commitment.Tree) for _, key := range opts.StoreKeys { @@ -93,12 +95,13 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) { return nil, fmt.Errorf("iavl v2 not supported") } } - sc, err = commitment.NewCommitStore(trees, opts.SCRawDB, opts.PruneOptions, opts.Logger) } - + sc, err = commitment.NewCommitStore(trees, opts.SCRawDB, opts.Logger) if err != nil { return nil, err } - return New(opts.Logger, ss, sc, nil, nil) + pm := pruning.NewManager(sc, ss, opts.SCPruneOptions, opts.SSPruneOptions) + + return New(opts.Logger, ss, sc, pm, nil, nil) } diff --git a/store/v2/root/migrate_test.go b/store/v2/root/migrate_test.go index 3af0d86c6aff..fb22ae5ded17 100644 --- a/store/v2/root/migrate_test.go +++ b/store/v2/root/migrate_test.go @@ -15,6 +15,7 @@ import ( "cosmossdk.io/store/v2/commitment/iavl" dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/migration" + "cosmossdk.io/store/v2/pruning" "cosmossdk.io/store/v2/snapshots" "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" @@ -42,7 +43,7 @@ func (s *MigrateStoreTestSuite) SetupTest() { prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) } - orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, testLog) + orgSC, err := commitment.NewCommitStore(multiTrees, mdb, testLog) s.Require().NoError(err) // apply changeset against the original store @@ -55,7 +56,7 @@ func (s *MigrateStoreTestSuite) SetupTest() { cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) } } - s.Require().NoError(orgSC.WriteBatch(cs)) + s.Require().NoError(orgSC.WriteChangeset(cs)) _, err = orgSC.Commit(version) s.Require().NoError(err) } @@ -63,22 +64,23 @@ func (s *MigrateStoreTestSuite) SetupTest() { // create a new storage and commitment stores sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) - ss := storage.NewStorageStore(sqliteDB, nil, testLog) + ss := storage.NewStorageStore(sqliteDB, testLog) multiTrees1 := make(map[string]commitment.Tree) for _, storeKey := range storeKeys { multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), nopLog, iavl.DefaultConfig()) } - sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog) + sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), testLog) s.Require().NoError(err) snapshotsStore, err := snapshots.NewStore(s.T().TempDir()) s.Require().NoError(err) snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, testLog) migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, testLog) + pm := pruning.NewManager(sc, ss, nil, nil) // assume no storage store, simulate the migration process - s.rootStore, err = New(testLog, ss, orgSC, migrationManager, nil) + s.rootStore, err = New(testLog, ss, orgSC, pm, migrationManager, nil) s.Require().NoError(err) } @@ -143,10 +145,6 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { } } - // prune the old versions - err = s.rootStore.Prune(latestVersion - 1) - s.Require().NoError(err) - // apply changeset against the migrated store for version := latestVersion + 1; version <= latestVersion+10; version++ { cs := corestore.NewChangeset() diff --git a/store/v2/root/store.go b/store/v2/root/store.go index 9a4bd9fa9588..8c77cd0d7570 100644 --- a/store/v2/root/store.go +++ b/store/v2/root/store.go @@ -16,6 +16,7 @@ import ( "cosmossdk.io/store/v2/metrics" "cosmossdk.io/store/v2/migration" "cosmossdk.io/store/v2/proof" + "cosmossdk.io/store/v2/pruning" ) var _ store.RootStore = (*Store)(nil) @@ -43,6 +44,9 @@ type Store struct { // telemetry reflects a telemetry agent responsible for emitting metrics (if any) telemetry metrics.StoreMetrics + // pruningManager reflects the pruning manager used to prune state of the SS and SC backends + pruningManager *pruning.Manager + // Migration related fields // migrationManager reflects the migration manager used to migrate state from v1 to v2 migrationManager *migration.Manager @@ -62,6 +66,7 @@ func New( logger log.Logger, ss store.VersionedDatabase, sc store.Committer, + pm *pruning.Manager, mm *migration.Manager, m metrics.StoreMetrics, ) (store.RootStore, error) { @@ -70,6 +75,7 @@ func New( initialVersion: 1, stateStorage: ss, stateCommitment: sc, + pruningManager: pm, migrationManager: mm, telemetry: m, isMigrating: mm != nil, @@ -272,6 +278,13 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { s.logger.Debug("commit header and version mismatch", "header_height", s.commitHeader.Height, "version", version) } + // signal to the pruning manager that a new version is about to be committed + // this may be required if the SS and SC backends implementation have the + // background pruning process which must be paused during the commit + if err := s.pruningManager.SignalCommit(true, version); err != nil { + s.logger.Error("failed to signal commit to pruning manager", "err", err) + } + eg := new(errgroup.Group) // if we're migrating, we don't want to commit to the state storage to avoid @@ -300,6 +313,11 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { return nil, err } + // signal to the pruning manager that the commit is done + if err := s.pruningManager.SignalCommit(false, version); err != nil { + s.logger.Error("failed to signal commit done to pruning manager", "err", err) + } + if s.commitHeader != nil { s.lastCommitInfo.Timestamp = s.commitHeader.Time } @@ -307,24 +325,6 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { return s.lastCommitInfo.Hash(), nil } -// Prune prunes the root store to the provided version. -func (s *Store) Prune(version uint64) error { - if s.telemetry != nil { - now := time.Now() - defer s.telemetry.MeasureSince(now, "root_store", "prune") - } - - if err := s.stateStorage.Prune(version); err != nil { - return fmt.Errorf("failed to prune SS store: %w", err) - } - - if err := s.stateCommitment.Prune(version); err != nil { - return fmt.Errorf("failed to prune SC store: %w", err) - } - - return nil -} - // startMigration starts a migration process to migrate the RootStore/v1 to the // SS and SC backends of store/v2 and initializes the channels. // It runs in a separate goroutine and replaces the current RootStore with the @@ -384,7 +384,7 @@ func (s *Store) writeSC(cs *corestore.Changeset) error { } } - if err := s.stateCommitment.WriteBatch(cs); err != nil { + if err := s.stateCommitment.WriteChangeset(cs); err != nil { return fmt.Errorf("failed to write batch to SC store: %w", err) } diff --git a/store/v2/root/store_test.go b/store/v2/root/store_test.go index 2bbf404f9d80..a0766c249561 100644 --- a/store/v2/root/store_test.go +++ b/store/v2/root/store_test.go @@ -13,6 +13,7 @@ import ( "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" dbm "cosmossdk.io/store/v2/db" + "cosmossdk.io/store/v2/pruning" "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -44,15 +45,16 @@ func (s *RootStoreTestSuite) SetupTest() { sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) - ss := storage.NewStorageStore(sqliteDB, nil, noopLog) + ss := storage.NewStorageStore(sqliteDB, noopLog) tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) tree2 := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) tree3 := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) - sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog) + sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), noopLog) s.Require().NoError(err) - rs, err := New(noopLog, ss, sc, nil, nil) + pm := pruning.NewManager(sc, ss, nil, nil) + rs, err := New(noopLog, ss, sc, pm, nil, nil) s.Require().NoError(err) s.rootStore = rs @@ -112,7 +114,7 @@ func (s *RootStoreTestSuite) TestGetFallback() { cs := corestore.NewChangeset() cs.Add(testStoreKeyBytes, []byte("foo"), []byte("bar"), false) - err := sc.WriteBatch(cs) + err := sc.WriteChangeset(cs) s.Require().NoError(err) ci := sc.WorkingCommitInfo(1) diff --git a/store/v2/storage/pebbledb/db_test.go b/store/v2/storage/pebbledb/db_test.go index a5aaf6ffd98a..5857c7f511f7 100644 --- a/store/v2/storage/pebbledb/db_test.go +++ b/store/v2/storage/pebbledb/db_test.go @@ -6,13 +6,12 @@ import ( "github.com/stretchr/testify/suite" "cosmossdk.io/core/log" - "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/storage" ) func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ - NewDB: func(dir string) (store.VersionedDatabase, error) { + NewDB: func(dir string) (*storage.StorageStore, error) { db, err := New(dir) if err == nil && db != nil { // We set sync=false just to speed up CI tests. Operators should take @@ -20,7 +19,7 @@ func TestStorageTestSuite(t *testing.T) { db.SetSync(false) } - return storage.NewStorageStore(db, nil, log.NewNopLogger()), err + return storage.NewStorageStore(db, log.NewNopLogger()), err }, EmptyBatchSize: 12, } diff --git a/store/v2/storage/rocksdb/db_test.go b/store/v2/storage/rocksdb/db_test.go index 000b83e3da42..df98c6d316d4 100644 --- a/store/v2/storage/rocksdb/db_test.go +++ b/store/v2/storage/rocksdb/db_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/suite" "cosmossdk.io/core/log" - "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/storage" ) @@ -19,9 +18,9 @@ var storeKey1 = []byte("store1") func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ - NewDB: func(dir string) (store.VersionedDatabase, error) { + NewDB: func(dir string) (*storage.StorageStore, error) { db, err := New(dir) - return storage.NewStorageStore(db, nil, log.NewNopLogger()), err + return storage.NewStorageStore(db, log.NewNopLogger()), err }, EmptyBatchSize: 12, } diff --git a/store/v2/storage/sqlite/db_test.go b/store/v2/storage/sqlite/db_test.go index c9944add1a36..62a8b220648e 100644 --- a/store/v2/storage/sqlite/db_test.go +++ b/store/v2/storage/sqlite/db_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/suite" "cosmossdk.io/core/log" - "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/storage" ) @@ -17,9 +16,9 @@ var storeKey1 = []byte("store1") func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ - NewDB: func(dir string) (store.VersionedDatabase, error) { + NewDB: func(dir string) (*storage.StorageStore, error) { db, err := New(dir) - return storage.NewStorageStore(db, nil, log.NewNopLogger()), err + return storage.NewStorageStore(db, log.NewNopLogger()), err }, EmptyBatchSize: 0, } diff --git a/store/v2/storage/storage_bench_test.go b/store/v2/storage/storage_bench_test.go index a172f4ae69d6..3084f6f87a29 100644 --- a/store/v2/storage/storage_bench_test.go +++ b/store/v2/storage/storage_bench_test.go @@ -27,7 +27,7 @@ var ( backends = map[string]func(dataDir string) (store.VersionedDatabase, error){ "rocksdb_versiondb_opts": func(dataDir string) (store.VersionedDatabase, error) { db, err := rocksdb.New(dataDir) - return storage.NewStorageStore(db, nil, log.NewNopLogger()), err + return storage.NewStorageStore(db, log.NewNopLogger()), err }, "pebbledb_default_opts": func(dataDir string) (store.VersionedDatabase, error) { db, err := pebbledb.New(dataDir) @@ -35,11 +35,11 @@ var ( db.SetSync(false) } - return storage.NewStorageStore(db, nil, log.NewNopLogger()), err + return storage.NewStorageStore(db, log.NewNopLogger()), err }, "btree_sqlite": func(dataDir string) (store.VersionedDatabase, error) { db, err := sqlite.New(dataDir) - return storage.NewStorageStore(db, nil, log.NewNopLogger()), err + return storage.NewStorageStore(db, log.NewNopLogger()), err }, } rng = rand.New(rand.NewSource(567320)) diff --git a/store/v2/storage/storage_test_suite.go b/store/v2/storage/storage_test_suite.go index b7d748386e2b..dc3b5f3e81af 100644 --- a/store/v2/storage/storage_test_suite.go +++ b/store/v2/storage/storage_test_suite.go @@ -22,7 +22,7 @@ var storeKey1Bytes = []byte(storeKey1) type StorageTestSuite struct { suite.Suite - NewDB func(dir string) (store.VersionedDatabase, error) + NewDB func(dir string) (*StorageStore, error) EmptyBatchSize int SkipTests []string } diff --git a/store/v2/storage/store.go b/store/v2/storage/store.go index e8469f92eabb..25381ee18582 100644 --- a/store/v2/storage/store.go +++ b/store/v2/storage/store.go @@ -17,27 +17,20 @@ const ( var ( _ store.VersionedDatabase = (*StorageStore)(nil) _ snapshots.StorageSnapshotter = (*StorageStore)(nil) + _ store.Pruner = (*StorageStore)(nil) ) // StorageStore is a wrapper around the store.VersionedDatabase interface. type StorageStore struct { logger log.Logger db Database - - // pruneOptions defines the pruning configuration. - pruneOptions *store.PruneOptions } // NewStorageStore returns a reference to a new StorageStore. -func NewStorageStore(db Database, pruneOpts *store.PruneOptions, logger log.Logger) *StorageStore { - if pruneOpts == nil { - pruneOpts = store.DefaultPruneOptions() - } - +func NewStorageStore(db Database, logger log.Logger) *StorageStore { return &StorageStore{ - logger: logger, - db: db, - pruneOptions: pruneOpts, + logger: logger, + db: db, } } @@ -76,12 +69,6 @@ func (ss *StorageStore) ApplyChangeset(version uint64, cs *corestore.Changeset) return err } - if prune, pruneVersion := ss.pruneOptions.ShouldPrune(version); prune { - if err := ss.Prune(pruneVersion); err != nil { - ss.logger.Info("failed to prune SS", "prune_version", pruneVersion, "err", err) - } - } - return nil } diff --git a/store/v2/store.go b/store/v2/store.go index e7c557dd533b..e331a153e0af 100644 --- a/store/v2/store.go +++ b/store/v2/store.go @@ -59,10 +59,6 @@ type RootStore interface { // LastCommitID returns a CommitID pertaining to the last commitment. LastCommitID() (proof.CommitID, error) - // Prune prunes the RootStore to the provided version. It is used to remove - // old versions of the RootStore by the CLI. - Prune(version uint64) error - // SetMetrics sets the telemetry handler on the RootStore. SetMetrics(m metrics.Metrics) @@ -83,6 +79,22 @@ type UpgradeableRootStore interface { LoadVersionAndUpgrade(version uint64, upgrades *corestore.StoreUpgrades) error } +// Pruner defines the interface for pruning old versions of the store or database. +type Pruner interface { + // Prune prunes the store to the provided version. + Prune(version uint64) error +} + +// PausablePruner extends the Pruner interface to include the API for pausing +// the pruning process. +type PausablePruner interface { + Pruner + + // PausePruning pauses or resumes the pruning process to avoid the parallel writes + // while committing the state. + PausePruning(pause bool) +} + // QueryResult defines the response type to performing a query on a RootStore. type QueryResult struct { Key []byte