Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: version mismatch happen occasionally #1759

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [#1748](https://github.com/crypto-org-chain/cronos/pull/1748) Query with GetCFWithTS to compare both timestamp and key to avoid run fixdata multiple times.
* (versiondb) [#1751](https://github.com/crypto-org-chain/cronos/pull/1751) Add missing Destroy for read options to properly hold and release options reference.
* (versiondb) [#1758](https://github.com/crypto-org-chain/cronos/pull/1758) Avoid ReadOptions mutated by reference release in iterator.
* [#1759](https://github.com/crypto-org-chain/cronos/pull/1759) Fix version mismatch happen occasionally.

### Improvements

Expand Down
40 changes: 25 additions & 15 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,9 +959,27 @@ func New(
panic(err)
}

// wire up the versiondb's `StreamingService` and `MultiStore`.
if cast.ToBool(appOpts.Get("versiondb.enable")) {
var err error
app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys)
if err != nil {
panic(err)
}
}

var qmsVersion int64
if app.qms != nil {
qmsVersion = app.qms.LatestVersion()
}

// RegisterUpgradeHandlers is used for registering any on-chain upgrades.
// Make sure it's called after `app.mm` and `app.configurator` are set.
app.RegisterUpgradeHandlers(app.appCodec)
storeLoaderOverritten := app.RegisterUpgradeHandlers(app.appCodec, qmsVersion)
if !storeLoaderOverritten {
// Register the default store loader
app.SetStoreLoader(MaxVersionStoreLoader(qmsVersion))
}

// add test gRPC service for testing gRPC queries in isolation
// testdata.RegisterQueryServer(app.GRPCQueryRouter(), testdata.QueryImpl{})
Expand Down Expand Up @@ -992,15 +1010,6 @@ func New(
app.MountMemoryStores(memKeys)
app.MountObjectStores(okeys)

// wire up the versiondb's `StreamingService` and `MultiStore`.
if cast.ToBool(appOpts.Get("versiondb.enable")) {
var err error
app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys)
if err != nil {
panic(err)
}
}

// initialize BaseApp
app.SetInitChainer(app.InitChainer)
app.SetPreBlocker(app.PreBlocker)
Expand Down Expand Up @@ -1045,12 +1054,13 @@ func New(
tmos.Exit(err.Error())
}

if app.qms != nil {
v1 := app.qms.LatestVersion()
v2 := app.LastBlockHeight()
if v1 > 0 && v1 < v2 {
if qmsVersion > 0 {
// it should not happens since we constraint the loaded iavl version to not exceed the versiondb version,
// still keep the check for safety.
iavlVersion := app.LastBlockHeight()
if qmsVersion < iavlVersion {
// try to prevent gap being created in versiondb
Comment on lines +1057 to 1062
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider more user-friendly handling of version mismatch.
Forcing an exit on mismatch provides immediate safety but may hinder operation in production environments. Logging the discrepancy and offering a guided resolution could improve the troubleshooting experience.

tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", v1, v2))
tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", qmsVersion, iavlVersion))
}
}

Expand Down
37 changes: 37 additions & 0 deletions app/storeloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package app

import (
storetypes "cosmossdk.io/store/types"
upgradetypes "cosmossdk.io/x/upgrade/types"
"github.com/cosmos/cosmos-sdk/baseapp"
)

// MaxVersionStoreLoader will be used when there's versiondb to cap the loaded iavl version
func MaxVersionStoreLoader(version int64) baseapp.StoreLoader {
if version == 0 {
return baseapp.DefaultStoreLoader
}

return func(ms storetypes.CommitMultiStore) error {
return ms.LoadVersion(version)
}

Check warning on line 17 in app/storeloader.go

View check run for this annotation

Codecov / codecov/patch

app/storeloader.go#L15-L17

Added lines #L15 - L17 were not covered by tests
}

// MaxVersionUpgradeStoreLoader is used to prepare baseapp with a fixed StoreLoader
func MaxVersionUpgradeStoreLoader(version int64, upgradeHeight int64, storeUpgrades *storetypes.StoreUpgrades) baseapp.StoreLoader {
if version == 0 {
return upgradetypes.UpgradeStoreLoader(upgradeHeight, storeUpgrades)
}

Check warning on line 24 in app/storeloader.go

View check run for this annotation

Codecov / codecov/patch

app/storeloader.go#L21-L24

Added lines #L21 - L24 were not covered by tests

return func(ms storetypes.CommitMultiStore) error {
if upgradeHeight == ms.LastCommitID().Version+1 {
// Check if the current commit version and upgrade height matches
if len(storeUpgrades.Renamed) > 0 || len(storeUpgrades.Deleted) > 0 || len(storeUpgrades.Added) > 0 {
return ms.LoadLatestVersionAndUpgrade(storeUpgrades)
}

Check warning on line 31 in app/storeloader.go

View check run for this annotation

Codecov / codecov/patch

app/storeloader.go#L26-L31

Added lines #L26 - L31 were not covered by tests
}

// Otherwise load default store loader
return MaxVersionStoreLoader(version)(ms)

Check warning on line 35 in app/storeloader.go

View check run for this annotation

Codecov / codecov/patch

app/storeloader.go#L35

Added line #L35 was not covered by tests
}
}
9 changes: 7 additions & 2 deletions app/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
evmtypes "github.com/evmos/ethermint/x/evm/types"
)

func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) {
// RegisterUpgradeHandlers returns if store loader is overridden
func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec, maxVersion int64) bool {
planName := "v1.4"
app.UpgradeKeeper.SetUpgradeHandler(planName, func(ctx context.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
m, err := app.ModuleManager.RunMigrations(ctx, app.configurator, fromVM)
Expand Down Expand Up @@ -54,14 +55,18 @@
}
if !app.UpgradeKeeper.IsSkipHeight(upgradeInfo.Height) {
if upgradeInfo.Name == planName {
app.SetStoreLoader(upgradetypes.UpgradeStoreLoader(upgradeInfo.Height, &storetypes.StoreUpgrades{
app.SetStoreLoader(MaxVersionUpgradeStoreLoader(maxVersion, upgradeInfo.Height, &storetypes.StoreUpgrades{

Check warning on line 58 in app/upgrades.go

View check run for this annotation

Codecov / codecov/patch

app/upgrades.go#L58

Added line #L58 was not covered by tests
Added: []string{
icahosttypes.StoreKey,
},
Deleted: []string{"icaauth"},
}))

return true

Check warning on line 65 in app/upgrades.go

View check run for this annotation

Codecov / codecov/patch

app/upgrades.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}
}

return false
}

func UpdateExpeditedParams(ctx context.Context, gov govkeeper.Keeper) error {
Expand Down
1 change: 1 addition & 0 deletions integration_tests/shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pkgs.mkShell {
shellHook = ''
mkdir ./coverage
export GOCOVERDIR=./coverage
export TMPDIR=/tmp
'';
}
10 changes: 8 additions & 2 deletions integration_tests/test_versiondb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pystarport import ports

from .network import Cronos
from .utils import ADDRS, send_transaction, wait_for_port
from .utils import ADDRS, send_transaction, w3_wait_for_new_blocks, wait_for_port


def test_versiondb_migration(cronos: Cronos):
Expand Down Expand Up @@ -37,6 +37,9 @@ def test_versiondb_migration(cronos: Cronos):
balance1 = w3.eth.get_balance(community)
block1 = w3.eth.block_number

# wait for a few blocks
w3_wait_for_new_blocks(w3, 5)

# stop the network first
print("stop all nodes")
print(cronos.supervisorctl("stop", "all"))
Expand All @@ -45,7 +48,10 @@ def test_versiondb_migration(cronos: Cronos):

changeset_dir = tempfile.mkdtemp(dir=cronos.base_dir)
print("dump to:", changeset_dir)
print(cli1.changeset_dump(changeset_dir))

# only restore to an intermidiate version to test version mismatch behavior
print(cli1.changeset_dump(changeset_dir, end_version=block1 + 1))

snapshot_dir = tempfile.mkdtemp(dir=cronos.base_dir)
print("verify and save to snapshot:", snapshot_dir)
_, commit_info = cli0.changeset_verify(changeset_dir, save_snapshot=snapshot_dir)
Expand Down
49 changes: 41 additions & 8 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
mtx sync.Mutex
// worker goroutine IdleTimeout = 5s
snapshotWriterPool *pond.WorkerPool

// reusable write batch
wbatch wal.Batch
}

type Options struct {
Expand Down Expand Up @@ -440,8 +443,13 @@
db.snapshotRewriteCancel = nil

if result.mtree == nil {
// background snapshot rewrite failed
return fmt.Errorf("background snapshot rewriting failed: %w", result.err)
if result.err != nil {
// background snapshot rewrite failed
return fmt.Errorf("background snapshot rewriting failed: %w", result.err)
}

Check warning on line 449 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L446-L449

Added lines #L446 - L449 were not covered by tests

// background snapshot rewrite don't success, but no error to propogate, ignore it.

Check failure on line 451 in memiavl/db.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

`propogate` is a misspelling of `propagate` (misspell)
return nil

Check warning on line 452 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L452

Added line #L452 was not covered by tests
}

// wait for potential pending wal writings to finish, to make sure we catch up to latest state.
Expand Down Expand Up @@ -556,11 +564,17 @@
// async wal writing
db.walChan <- &entry
} else {
bz, err := entry.data.Marshal()
lastIndex, err := db.wal.LastIndex()
if err != nil {
return 0, err
}
if err := db.wal.Write(entry.index, bz); err != nil {

db.wbatch.Clear()
if err := writeEntry(&db.wbatch, db.logger, lastIndex, &entry); err != nil {
return 0, err
}

Check warning on line 575 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L574-L575

Added lines #L574 - L575 were not covered by tests

if err := db.wal.WriteBatch(&db.wbatch); err != nil {
return 0, err
}
}
Expand Down Expand Up @@ -591,13 +605,17 @@
break
}

lastIndex, err := db.wal.LastIndex()
if err != nil {
walQuit <- err
return
}

Check warning on line 612 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L610-L612

Added lines #L610 - L612 were not covered by tests

for _, entry := range entries {
bz, err := entry.data.Marshal()
if err != nil {
if err := writeEntry(&batch, db.logger, lastIndex, entry); err != nil {
walQuit <- err
return
}
batch.Write(entry.index, bz)
}

if err := db.wal.WriteBatch(&batch); err != nil {
Expand Down Expand Up @@ -749,7 +767,8 @@

cloned.logger.Info("start rewriting snapshot", "version", cloned.Version())
if err := cloned.RewriteSnapshotWithContext(ctx); err != nil {
ch <- snapshotResult{err: err}
// write error log but don't stop the client, it could happen when load an old version.
cloned.logger.Error("failed to rewrite snapshot", "err", err)

Check warning on line 771 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L770-L771

Added lines #L770 - L771 were not covered by tests
return
}
cloned.logger.Info("finished rewriting snapshot", "version", cloned.Version())
Expand Down Expand Up @@ -1093,3 +1112,17 @@

return result
}

func writeEntry(batch *wal.Batch, logger Logger, lastIndex uint64, entry *walEntry) error {
bz, err := entry.data.Marshal()
if err != nil {
return err
}

Check warning on line 1120 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L1119-L1120

Added lines #L1119 - L1120 were not covered by tests

if entry.index <= lastIndex {
logger.Error("commit old version idempotently", "lastIndex", lastIndex, "version", entry.index)
} else {
batch.Write(entry.index, bz)
}
return nil
}
71 changes: 71 additions & 0 deletions memiavl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"encoding/hex"
"errors"
fmt "fmt"
"os"
"path/filepath"
"runtime/debug"
Expand Down Expand Up @@ -497,3 +498,73 @@
})
require.NoError(t, err)
}

func TestIdempotentWrite(t *testing.T) {
for _, asyncCommit := range []bool{false, true} {
t.Run(fmt.Sprintf("asyncCommit=%v", asyncCommit), func(t *testing.T) {
testIdempotentWrite(t, asyncCommit)
})
}
}

func testIdempotentWrite(t *testing.T, asyncCommit bool) {
dir := t.TempDir()

asyncCommitBuffer := -1
if asyncCommit {
asyncCommitBuffer = 10
}

db, err := Load(dir, Options{
CreateIfMissing: true,
InitialStores: []string{"test1", "test2"},
AsyncCommitBuffer: asyncCommitBuffer,
})
require.NoError(t, err)

// generate some data into db
var changes [][]*NamedChangeSet
for i := 0; i < 10; i++ {
cs := []*NamedChangeSet{
{
Name: "test1",
Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))},
},
{
Name: "test2",
Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))},
},
}
changes = append(changes, cs)
}

for _, cs := range changes {
require.NoError(t, db.ApplyChangeSets(cs))
_, err := db.Commit()
require.NoError(t, err)
}

commitInfo := *db.LastCommitInfo()
require.NoError(t, db.Close())

// reload db from disk at an intermidiate version

Check failure on line 550 in memiavl/db_test.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

`intermidiate` is a misspelling of `intermediate` (misspell)
db, err = Load(dir, Options{TargetVersion: 5})
require.NoError(t, err)

// replay some random writes to reach same version
for i := 0; i < 5; i++ {
require.NoError(t, db.ApplyChangeSets(changes[i+5]))
_, err := db.Commit()
require.NoError(t, err)
}

// it should reach same result
require.Equal(t, commitInfo, *db.LastCommitInfo())

require.NoError(t, db.Close())

// reload db again, it should reach same result
db, err = Load(dir, Options{})
require.NoError(t, err)
require.Equal(t, commitInfo, *db.LastCommitInfo())
}
Loading