diff --git a/Makefile b/Makefile index 94ea62b5e1..e92f965247 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,10 @@ ifeq (boltdb,$(findstring boltdb,$(COSMOS_BUILD_OPTIONS))) BUILD_TAGS += boltdb endif +ifeq (mdbx,$(findstring mdbx,$(COSMOS_BUILD_OPTIONS))) + BUILD_TAGS += mdbx +endif + ifeq (,$(findstring nostrip,$(COSMOS_BUILD_OPTIONS))) ldflags += -w -s endif diff --git a/app/app.go b/app/app.go index 7cd77b35e4..8d423a3a44 100644 --- a/app/app.go +++ b/app/app.go @@ -5,12 +5,15 @@ import ( "net/http" "os" "path/filepath" + "strings" "sync" + "github.com/crypto-org-chain/cronos/x/cronos" "github.com/crypto-org-chain/cronos/x/cronos/middleware" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/server" "github.com/gorilla/mux" "github.com/rakyll/statik/fs" "github.com/spf13/cast" @@ -122,7 +125,8 @@ import ( // this line is used by starport scaffolding # stargate/app/moduleImport cronosappclient "github.com/crypto-org-chain/cronos/client" - "github.com/crypto-org-chain/cronos/x/cronos" + "github.com/crypto-org-chain/cronos/versiondb" + "github.com/crypto-org-chain/cronos/versiondb/tmdb" cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client" cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper" evmhandlers "github.com/crypto-org-chain/cronos/x/cronos/keeper/evmhandlers" @@ -350,7 +354,8 @@ func New( // configure state listening capabilities using AppOptions // we are doing nothing with the returned streamingServices and waitGroup in this case // Only support file streamer right now. - if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" { + streamers := cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) + if strings.Contains(streamers, "file") { streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory) if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil { panic(err) @@ -361,7 +366,7 @@ func New( for _, storeKey := range keys { exposeStoreKeys = append(exposeStoreKeys, storeKey) } - service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec) + service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec, false) if err != nil { panic(err) } @@ -373,6 +378,40 @@ func New( } } + if strings.Contains(streamers, "versiondb") { + rootDir := cast.ToString(appOpts.Get(flags.FlagHome)) + dataDir := filepath.Join(rootDir, "data", "versiondb") + if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { + panic(err) + } + backendType := server.GetAppDBBackend(appOpts) + plainDB, err := dbm.NewDB("plain", backendType, dataDir) + if err != nil { + panic(err) + } + historyDB, err := dbm.NewDB("history", backendType, dataDir) + if err != nil { + panic(err) + } + changesetDB, err := dbm.NewDB("changeset", backendType, dataDir) + if err != nil { + panic(err) + } + versionDB := tmdb.NewStore(plainDB, historyDB, changesetDB) + + // default to exposing all + exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + service := versiondb.NewStreamingService(versionDB, exposeStoreKeys) + bApp.SetStreamingService(service) + qms := versiondb.NewMultiStore(versionDB, exposeStoreKeys) + qms.MountTransientStores(tkeys) + qms.MountMemoryStores(memKeys) + bApp.SetQueryMultiStore(qms) + } + app := &App{ BaseApp: bApp, cdc: cdc, diff --git a/default.nix b/default.nix index fcbab2154a..846e83eea0 100644 --- a/default.nix +++ b/default.nix @@ -8,7 +8,7 @@ let version = "v0.9.0"; pname = "cronosd"; - tags = [ "ledger" "netgo" network ] + tags = [ "ledger" "netgo" network "mdbx" ] ++ lib.lists.optionals (rocksdb != null) [ "rocksdb" "rocksdb_build" ]; ldflags = lib.concatStringsSep "\n" ([ "-X github.com/cosmos/cosmos-sdk/version.Name=cronos" @@ -27,6 +27,7 @@ buildGoApplication rec { "!/app/" "!/cmd/" "!/client/" + "!/versiondb/" "!go.mod" "!go.sum" "!gomod2nix.toml" diff --git a/go.mod b/go.mod index 17476fe929..40fa2911d7 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( cosmossdk.io/math v1.0.0-beta.3 + github.com/RoaringBitmap/roaring v1.2.1 github.com/armon/go-metrics v0.4.1 github.com/cosmos/cosmos-sdk v0.46.3 github.com/cosmos/ibc-go/v5 v5.0.0 @@ -45,10 +46,12 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/bgentry/speakeasy v0.1.0 // indirect + github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/btcsuite/btcd v0.22.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce // indirect + github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -69,9 +72,8 @@ require ( github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect - github.com/dgraph-io/badger/v2 v2.2007.4 // indirect + github.com/dgraph-io/badger/v3 v3.2103.2 // indirect github.com/dgraph-io/ristretto v0.1.0 // indirect - github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -91,7 +93,8 @@ require ( github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/btree v1.0.1 // indirect + github.com/google/btree v1.1.2 // indirect + github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/google/go-cmp v0.5.8 // indirect github.com/google/orderedcode v0.0.1 // indirect github.com/google/uuid v1.3.0 // indirect @@ -135,7 +138,9 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/natefinch/atomic v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect @@ -165,6 +170,7 @@ require ( github.com/tendermint/go-amino v0.16.0 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect + github.com/torquem-ch/mdbx-go v0.26.0 // indirect github.com/tyler-smith/go-bip39 v1.1.0 // indirect github.com/ulikunitz/xz v0.5.8 // indirect github.com/zondax/hid v0.9.1-0.20220302062450-5552068d2266 // indirect @@ -189,7 +195,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.46.2 + github.com/cosmos/cosmos-sdk => github.com/yihuang/cosmos-sdk v0.43.0-beta1.0.20221014023203-2c6b9d06b12d github.com/ethereum/go-ethereum => github.com/ethereum/go-ethereum v1.10.19 // Fix upstream GHSA-h395-qcrw-5vmq vulnerability. @@ -199,6 +205,11 @@ replace ( github.com/peggyjv/gravity-bridge/module/v2 => github.com/crypto-org-chain/gravity-bridge/module/v2 v2.0.1-0.20221027085649-2107c6bd6bc4 + // https://github.com/tendermint/tm-db/pull/297 + github.com/tendermint/tm-db => github.com/yihuang/tm-db v0.0.0-20221006023748-f6214ae9454d + + github.com/torquem-ch/mdbx-go => github.com/yihuang/mdbx-go v0.0.0-20221010042614-b72b4f091d88 + // TODO: remove after fixed https://github.com/cosmos/cosmos-sdk/issues/11364 github.com/zondax/hid => github.com/zondax/hid v0.9.0 ) diff --git a/gomod2nix.toml b/gomod2nix.toml index a05555ba67..b98b5ed14b 100644 --- a/gomod2nix.toml +++ b/gomod2nix.toml @@ -31,6 +31,9 @@ schema = 3 [mod."github.com/ChainSafe/go-schnorrkel"] version = "v0.0.0-20200405005733-88cbf1b4c40d" hash = "sha256-i8RXZemJGlSjBT35oPm0SawFiBoIU5Pkq5xp4n/rzCY=" + [mod."github.com/RoaringBitmap/roaring"] + version = "v1.2.1" + hash = "sha256-0/R956wrCW71eOE36CbxGJJRuQjKwvvIQ/D8QTn2A6w=" [mod."github.com/StackExchange/wmi"] version = "v1.2.1" hash = "sha256-1BoEeWAWyebH+1mMuyPhWZut8nWHb6r73MgcqlGuUEY=" @@ -58,6 +61,9 @@ schema = 3 [mod."github.com/bgentry/speakeasy"] version = "v0.1.0" hash = "sha256-Gt1vj6CFovLnO6wX5u2O4UfecY9V2J9WGw1ez4HMrgk=" + [mod."github.com/bits-and-blooms/bitset"] + version = "v1.2.0" + hash = "sha256-IxNmtELycM+XVzg4qBv04hAJUT3nSWuyP9R+8zc9LmU=" [mod."github.com/btcsuite/btcd"] version = "v0.22.1" hash = "sha256-hBU+roIELcmbW2Gz7eGZzL9qNA1bakq5wNxqCgs4TKc=" @@ -70,6 +76,9 @@ schema = 3 [mod."github.com/btcsuite/btcutil"] version = "v1.0.3-0.20201208143702-a53e38424cce" hash = "sha256-4kasJReFcj25JRHx9dJMct3yDkHqVoHGUx5cu45Msfo=" + [mod."github.com/c2h5oh/datasize"] + version = "v0.0.0-20220606134207-859f65c6625b" + hash = "sha256-1uH+D3w0Y/B3poXm545XGrT4S4c+msTbj7gKgu9pbPM=" [mod."github.com/cenkalti/backoff/v4"] version = "v4.1.3" hash = "sha256-u6MEDopHoTWAZoVvvXOKnAg++xre53YgQx0gmf6t2KU=" @@ -98,12 +107,15 @@ schema = 3 version = "v1.0.0-alpha7" hash = "sha256-2wCH+toTF2A6MfFjOa13muEH5oBCcxAhZEqirNOrBA0=" [mod."github.com/cosmos/cosmos-sdk"] - version = "v0.46.2" - hash = "sha256-Lgn4+Vd5PUUkfHc+lTdK2G6/nymZekFVTe1FxWRqh2w=" - replaced = "github.com/cosmos/cosmos-sdk" + version = "v0.43.0-beta1.0.20221014023203-2c6b9d06b12d" + hash = "sha256-t/QEOJgATr82KFXes+AzNlNL6w2tGHfiwZAE4PdO5GE=" + replaced = "github.com/yihuang/cosmos-sdk" [mod."github.com/cosmos/go-bip39"] version = "v1.0.0" hash = "sha256-Qm2aC2vaS8tjtMUbHmlBSagOSqbduEEDwc51qvQaBmA=" + [mod."github.com/cosmos/gogoproto"] + version = "v1.4.2" + hash = "sha256-hOY+mhPDYWcSYSdth2AW7IONdgicqQir0z/1XrXt9NY=" [mod."github.com/cosmos/gorocksdb"] version = "v1.2.0" hash = "sha256-209TcVuXc5s/TcOvNlaQ1HEJAUDTEK3nxPhs+d8TEcY=" @@ -137,15 +149,12 @@ schema = 3 [mod."github.com/desertbit/timer"] version = "v0.0.0-20180107155436-c41aec40b27f" hash = "sha256-abLOtEcomAqCWLphd2X6WkD/ED764w6sa6unox4BXss=" - [mod."github.com/dgraph-io/badger/v2"] - version = "v2.2007.4" - hash = "sha256-+KwqZJZpViv8S3TqUVvPXrFoMgWFyS3NoLsi4RR5fGk=" + [mod."github.com/dgraph-io/badger/v3"] + version = "v3.2103.2" + hash = "sha256-F6pvsaSKwXOl9RfnUQFqAl6xpCVu9+rthQgOxhKVk1g=" [mod."github.com/dgraph-io/ristretto"] version = "v0.1.0" hash = "sha256-01jneg1+1x8tTfUTBZ+6mHkQaqXVnPYxLJyJhJQcvt4=" - [mod."github.com/dgryski/go-farm"] - version = "v0.0.0-20200201041132-a6ae2369ad13" - hash = "sha256-aOMlPwFY36bLiiIx4HonbCYRAhagk5N6HAWN7Ygif+E=" [mod."github.com/dlclark/regexp2"] version = "v1.4.1-0.20201116162257-a2a8dda75c91" hash = "sha256-VNNMZIc7NkDg3DVLnqeJNM/KZqkkaZu2/HTLBL8X2xE=" @@ -164,6 +173,7 @@ schema = 3 [mod."github.com/ethereum/go-ethereum"] version = "v1.10.19" hash = "sha256-7FPnTGcCb8Xd1QVR+6PmGTaHdTY1mm/8osFTW1JLuG8=" + replaced = "github.com/ethereum/go-ethereum" [mod."github.com/evmos/ethermint"] version = "v0.6.1-0.20221003153722-491c3da7ebd7" hash = "sha256-vnfjk57gYa+F8nn0LByX/B1LV8PY2Jvm8vXV6be4ufc=" @@ -217,8 +227,11 @@ schema = 3 version = "v0.0.4" hash = "sha256-Umx+5xHAQCN/Gi4HbtMhnDCSPFAXSsjVbXd8n5LhjAA=" [mod."github.com/google/btree"] - version = "v1.0.1" - hash = "sha256-1PIeFGgUL4BK/StL/D12pg9bEQ5HfMT/fMLdus4pZTs=" + version = "v1.1.2" + hash = "sha256-K7V2obq3pLM71Mg0vhhHtZ+gtaubwXPQx3xcIyZDCjM=" + [mod."github.com/google/flatbuffers"] + version = "v2.0.0+incompatible" + hash = "sha256-4Db9FdOL60Da4H1+K4Qv02w4omxdsh3uzpmY1vtqHeA=" [mod."github.com/google/go-cmp"] version = "v0.5.8" hash = "sha256-8zkIo+Sr1NXMnj3PNmvjX2sZKnAKWXOFvmnX7D9bwxQ=" @@ -354,9 +367,15 @@ schema = 3 [mod."github.com/mitchellh/mapstructure"] version = "v1.5.0" hash = "sha256-ztVhGQXs67MF8UadVvG72G3ly0ypQW0IRDdOOkjYwoE=" + [mod."github.com/mschoch/smat"] + version = "v0.2.0" + hash = "sha256-DZvUJXjIcta3U+zxzgU3wpoGn/V4lpBY7Xme8aQUi+E=" [mod."github.com/mtibben/percent"] version = "v0.2.1" hash = "sha256-Zj1lpCP6mKQ0UUTMs2By4LC414ou+iJzKkK+eBHfEcc=" + [mod."github.com/natefinch/atomic"] + version = "v1.0.1" + hash = "sha256-fbOVHCwRNI8PFjC4o0YXpKZO0JU2aWTfH5c7WXXKMHg=" [mod."github.com/olekukonko/tablewriter"] version = "v0.0.5" hash = "sha256-/5i70IkH/qSW5KjGzv8aQNKh9tHoz98tqtL0K2DMFn4=" @@ -461,14 +480,19 @@ schema = 3 version = "v0.34.22" hash = "sha256-4p4cpyCWjBbNQUpYN2gDJvnyj+Pov9hw5uRjHrrO++Y=" [mod."github.com/tendermint/tm-db"] - version = "v0.6.7" - hash = "sha256-hl/3RrBrpkk2zA6dmrNlIYKs1/GfqegSscDSkA5Pjlo=" + version = "v0.0.0-20221006023748-f6214ae9454d" + hash = "sha256-mtTVR3f3A9CmcyJBXTeurQuHGiVA5hIzqlxskz1M1qk=" + replaced = "github.com/yihuang/tm-db" [mod."github.com/tklauser/go-sysconf"] version = "v0.3.10" hash = "sha256-Zf2NsgM9+HeM949vCce4HQtSbfUiFpeiQ716yKcFyx4=" [mod."github.com/tklauser/numcpus"] version = "v0.4.0" hash = "sha256-ndE82nOb3agubhEV7aRzEqqTlN4DPbKFHEm2+XZLn8k=" + [mod."github.com/torquem-ch/mdbx-go"] + version = "v0.0.0-20221010042614-b72b4f091d88" + hash = "sha256-HWsrhzSGoYgEUUziQPHEtQGwBQ/upg3/f1fY2NGiC0g=" + replaced = "github.com/yihuang/mdbx-go" [mod."github.com/tyler-smith/go-bip39"] version = "v1.1.0" hash = "sha256-3YhWBtSwRLGwm7vNwqumphZG3uLBW1vwT9QkQ8JuSjU=" @@ -478,6 +502,7 @@ schema = 3 [mod."github.com/zondax/hid"] version = "v0.9.0" hash = "sha256-PvXtxXo/3C+DS9ZeGBlr4zXbIpaYNtMqLzxYhusFXNY=" + replaced = "github.com/zondax/hid" [mod."go.etcd.io/bbolt"] version = "v1.3.6" hash = "sha256-DenVAmyN22xUiivk6fdJp4C9ZnUJXCMDUf8E0goRRV4=" diff --git a/integration_tests/configs/default.jsonnet b/integration_tests/configs/default.jsonnet index 12ca83f47d..23b183f7d5 100644 --- a/integration_tests/configs/default.jsonnet +++ b/integration_tests/configs/default.jsonnet @@ -2,7 +2,7 @@ dotenv: '../../scripts/.env', 'cronos_777-1': { cmd: 'cronosd', - 'start-flags': '--trace --streamers file', + 'start-flags': '--trace --streamers versiondb,file', config: { mempool: { version: 'v1', diff --git a/integration_tests/configs/pruned-node.jsonnet b/integration_tests/configs/pruned-node.jsonnet index 2a3edd147f..6e87b2288b 100644 --- a/integration_tests/configs/pruned-node.jsonnet +++ b/integration_tests/configs/pruned-node.jsonnet @@ -2,6 +2,8 @@ local config = import 'default.jsonnet'; config { 'cronos_777-1'+: { + // don't enable versiondb, since it don't do pruning right now + 'start-flags': '--trace --streamers file', 'app-config'+: { pruning: 'everything', 'state-sync'+: { diff --git a/integration_tests/configs/state_benchmark.jsonnet b/integration_tests/configs/state_benchmark.jsonnet new file mode 100644 index 0000000000..ba8dbf1741 --- /dev/null +++ b/integration_tests/configs/state_benchmark.jsonnet @@ -0,0 +1,36 @@ +local config = import 'default.jsonnet'; + +config { + 'cronos_777-1'+: { + 'start-flags': '--trace --streamers file,versiondb', + 'app-config'+: { + 'app-db-backend': 'rocksdb', + 'state-sync'+: { + 'snapshot-interval': 0, + }, + }, + validators: [ + super.validators[0], + super.validators[1] { + 'app-config'+: { + pruning: 'everything', + }, + }, + ] + super.validators[2:], + genesis+: { + consensus_params+: { + block+: { + max_gas: '163000000', + }, + }, + app_state+: { + feemarket+: { + params+: { + no_base_fee: true, + min_gas_multiplier: '0', + }, + }, + }, + }, + }, +} diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index 980d4a9627..a496fe7a12 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -13,6 +13,28 @@ def pytest_configure(config): config.addinivalue_line("markers", "slow: marks tests as slow") config.addinivalue_line("markers", "gravity: gravity bridge test cases") + config.addinivalue_line( + "markers", "benchmark: benchmarks, only run if '--run-benchmark' is passed" + ) + + +def pytest_addoption(parser): + parser.addoption( + "--run-benchmark", + action="store_true", + default=False, + help="include benchmark cases", + ) + + +def pytest_collection_modifyitems(config, items): + if config.getoption("--run-benchmark"): + # run benchmarks + return + skip = pytest.mark.skip(reason="need --run-benchmark option to run") + for item in items: + if "benchmark" in item.keywords: + item.add_marker(skip) @pytest.fixture(scope="session") diff --git a/integration_tests/contracts/contracts/BenchmarkStorage.sol b/integration_tests/contracts/contracts/BenchmarkStorage.sol new file mode 100644 index 0000000000..69dcfde451 --- /dev/null +++ b/integration_tests/contracts/contracts/BenchmarkStorage.sol @@ -0,0 +1,15 @@ +pragma solidity 0.8.10; + +contract BenchmarkStorage { + uint seed; + mapping(uint => uint) state; + function random(uint i) private view returns (uint) { + return uint(keccak256(abi.encodePacked(i, seed))); + } + function batch_set(uint _seed, uint n, uint range) public { + seed = _seed; + for (uint i=0; i< n; i++) { + state[random(i) % range] = random(i+i); + } + } +} diff --git a/integration_tests/pyproject.toml b/integration_tests/pyproject.toml index 65ca14c207..54fb3b3f60 100644 --- a/integration_tests/pyproject.toml +++ b/integration_tests/pyproject.toml @@ -5,7 +5,7 @@ description = "" authors = ["chain-dev "] [tool.poetry.dependencies] -python = "^3.8" +python = "^3.10" pytest = "^7.0.1" pytest-github-actions-annotate-failures = "^0.1.1" flake8 = "^4.0.1" @@ -28,6 +28,9 @@ jsonnet = "^0.18.0" eth-account = "^0.7.0" cprotobuf = "^0.1.11" pathspec = "^0.10.1" +rocksdb = { git = "https://github.com/HathorNetwork/python-rocksdb.git", branch = "master" } +click = "^8.1.2" +roaring64 = { git = "https://github.com/yihuang/python-roaring64.git", branch = "main" } [tool.poetry.dev-dependencies] diff --git a/integration_tests/test_benchmark_storage.py b/integration_tests/test_benchmark_storage.py new file mode 100644 index 0000000000..41668a94ac --- /dev/null +++ b/integration_tests/test_benchmark_storage.py @@ -0,0 +1,53 @@ +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +import pytest +from web3 import Web3 + +from .network import setup_custom_cronos +from .utils import ( + ACCOUNTS, + CONTRACTS, + deploy_contract, + send_transaction, + w3_wait_for_block, +) + + +@pytest.fixture(scope="module") +def custom_cronos(tmp_path_factory): + path = tmp_path_factory.mktemp("benchmark") + yield from setup_custom_cronos( + path, 26200, Path(__file__).parent / "configs/state_benchmark.jsonnet" + ) + + +@pytest.mark.benchmark +def test_benchmark_storage(custom_cronos): + w3: Web3 = custom_cronos.w3 + w3_wait_for_block(w3, 1) + contract = deploy_contract(w3, CONTRACTS["BenchmarkStorage"]) + + n = 3000 + gas = 81500000 + iterations = 200 + parity = 100 + + def task(acct, acct_i): + for i in range(iterations): + seed = i * 10 + acct_i + tx = contract.functions.batch_set(seed, n, n * parity).buildTransaction( + {"from": acct.address, "gas": gas} + ) + print(send_transaction(w3, tx, acct.key)) + + accounts = [ + ACCOUNTS["validator"], + ACCOUNTS["community"], + ACCOUNTS["signer1"], + ACCOUNTS["signer2"], + ] + with ThreadPoolExecutor(len(accounts)) as exec: + tasks = [exec.submit(task, acct, i) for i, acct in enumerate(accounts)] + for t in tasks: + t.result() diff --git a/integration_tests/test_streamer.py b/integration_tests/test_streamer.py index 249aa346f2..f0128d1e29 100644 --- a/integration_tests/test_streamer.py +++ b/integration_tests/test_streamer.py @@ -13,40 +13,20 @@ class StoreKVPairs(ProtoEntity): value = Field("bytes", 4) -def decode_stream_file(data, body_cls=StoreKVPairs, header_cls=None, footer_cls=None): +def decode_stream_file(data, entry_cls=StoreKVPairs): """ - header, body*, footer + StoreKVPairs, StoreKVPairs, ... """ - header = footer = None - body = [] + items = [] offset = 0 - size, n = decode_primitive(data, "uint64") - offset += n - - # header - if header_cls is not None: - header = header_cls() - header.ParseFromString(data[offset : offset + size]) - offset += size - - while True: + while offset < len(data): size, n = decode_primitive(data[offset:], "uint64") offset += n - if offset + size == len(data): - # footer - if footer_cls is not None: - footer = footer_cls() - footer.ParseFromString(data[offset : offset + size]) - offset += size - break - else: - # body - if body_cls is not None: - item = body_cls() - item.ParseFromString(data[offset : offset + size]) - body.append(item) - offset += size - return header, body, footer + item = entry_cls() + item.ParseFromString(data[offset : offset + size]) + items.append(item) + offset += size + return items def test_streamers(cronos): @@ -55,23 +35,22 @@ def test_streamers(cronos): - try to parse the state change sets """ # inspect the first state change of the first tx in genesis - path = cronos.node_home(0) / "data/file_streamer/block-0-tx-0" - _, body, _ = decode_stream_file(open(path, "rb").read()) + # the InitChainer is committed together with the first block. + path = cronos.node_home(0) / "data/file_streamer/block-1-data" + items = decode_stream_file(open(path, "rb").read()) # creation of the validator account - assert body[0].store_key == "acc" - # the order in gen_txs is undeterministic, could be either one. - assert body[0].key in ( - b"\x01" + HexBytes(ADDRS["validator"]), - b"\x01" + HexBytes(ADDRS["validator2"]), - ) + assert items[0].store_key == "acc" + # the writes are sorted by key, find the minimal address + min_addr = min(ADDRS.values()) + assert items[0].key == b"\x01" + HexBytes(min_addr) if __name__ == "__main__": import binascii import sys - _, body, _ = decode_stream_file(open(sys.argv[1], "rb").read()) - for item in body: + items = decode_stream_file(open(sys.argv[1], "rb").read()) + for item in items: print( item.store_key, item.delete, diff --git a/integration_tests/utils.py b/integration_tests/utils.py index 462d472bfa..a714ff3b89 100644 --- a/integration_tests/utils.py +++ b/integration_tests/utils.py @@ -47,6 +47,7 @@ "TestBlackListERC20": "TestBlackListERC20.sol", "CroBridge": "CroBridge.sol", "CronosGravityCancellation": "CronosGravityCancellation.sol", + "BenchmarkStorage": "BenchmarkStorage.sol", } diff --git a/integration_tests/versiondb.py b/integration_tests/versiondb.py new file mode 100644 index 0000000000..40c0c672e0 --- /dev/null +++ b/integration_tests/versiondb.py @@ -0,0 +1,86 @@ +""" +cli utilities for versiondb +""" +import binascii +from pathlib import Path + +import click +import rocksdb +from cprotobuf import decode_primitive +from roaring64 import BitMap64 + + +def rocksdb_stats(path): + db = rocksdb.DB(str(path), rocksdb.Options()) + for field in ["rocksdb.stats", "rocksdb.sstables"]: + print(f"############# {field}") + print(db.get_property(field.encode()).decode()) + + # space amplification + it = db.iteritems() + it.seek_to_first() + count = 0 + size = 0 + for k, v in it: + count += 1 + size += len(k) + len(v) + # directory size + fsize = sum(f.stat().st_size for f in path.glob("**/*") if f.is_file()) + print( + f"space_amplification: {fsize / size:.2f}, kv pairs: {count}, " + f"data size: {size}, file size: {fsize}" + ) + + +@click.group() +def cli(): + pass + + +@cli.command() +@click.option("--dbpath", help="path of plain db") +def latest_version(dbpath): + db = rocksdb.DB(dbpath, rocksdb.Options()) + bz = db.get(b"s/latest") + # gogoproto std int64, the first byte is field tag + print(decode_primitive(bz[1:], "int64")[0]) + + +@cli.command() +@click.option("--dbpath", help="path of version db") +@click.option("--version", help="version of the value, optional") +@click.argument("store-key") +@click.argument("hex-key") +def get(dbpath, version, store_key, hex_key): + """ + get a value at version + """ + key = f"s/k:{store_key}/".decode() + binascii.unhexlify(hex_key) + plain_db = rocksdb.DB(dbpath + "plain.db", rocksdb.Options()) + if version is None: + v = plain_db.get(key) + else: + version = int(version) + print(binascii.hexlify(v)) + + history_db = rocksdb.DB(dbpath + "history.db", rocksdb.Options()) + bz = history_db.get(key) + bm = BitMap64.deserialize(bz) + + # seek in bitmap + bm.Rank(version) + + +@cli.command() +def sync(path): + pass + + +@cli.command() +@click.option("--dbpath", help="path of rocksdb") +def rocksdbstats(dbpath): + rocksdb_stats(Path(dbpath)) + + +if __name__ == "__main__": + cli() diff --git a/nix/testenv.nix b/nix/testenv.nix index e951eccb7f..0f84ddc388 100644 --- a/nix/testenv.nix +++ b/nix/testenv.nix @@ -1,4 +1,4 @@ -{ poetry2nix, lib, python310 }: +{ poetry2nix, lib, python310, rocksdb }: poetry2nix.mkPoetryEnv { projectDir = ../integration_tests; python = python310; @@ -14,6 +14,9 @@ poetry2nix.mkPoetryEnv { pytest-github-actions-annotate-failures = [ "setuptools" ]; flake8-black = [ "setuptools" ]; multiaddr = [ "setuptools" ]; + rocksdb = [ "setuptools" "cython" "pkgconfig" ]; + pyroaring = [ "setuptools" ]; + roaring64 = [ "poetry" ]; }; in lib.mapAttrs @@ -34,6 +37,11 @@ poetry2nix.mkPoetryEnv { substituteInPlace setup.py --replace "setup()" "setup(version=\"1.3\")" ''; }; + rocksdb = super.rocksdb.overridePythonAttrs ( + old: { + buildInputs = (old.buildInputs or [ ]) ++ [ rocksdb ]; + } + ); }) ]); } diff --git a/scripts/cronos-devnet.yaml b/scripts/cronos-devnet.yaml index 0b9b865e1a..ec2f3c9fe0 100644 --- a/scripts/cronos-devnet.yaml +++ b/scripts/cronos-devnet.yaml @@ -1,7 +1,7 @@ dotenv: .env cronos_777-1: cmd: cronosd - start-flags: "--trace" + start-flags: "--trace --streamers versiondb,file" app-config: minimum-gas-prices: 0basetcro index-events: diff --git a/versiondb/backend_test_utils.go b/versiondb/backend_test_utils.go new file mode 100644 index 0000000000..a5cff0cb00 --- /dev/null +++ b/versiondb/backend_test_utils.go @@ -0,0 +1,276 @@ +package versiondb + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +var ( + key1 = []byte("key1") + key2 = []byte("key2") + value1 = []byte("value1") + value2 = []byte("value2") + value3 = []byte("value3") + key1_subkey = []byte("key1/subkey") +) + +func SetupTestDB(t *testing.T, store VersionStore) { + changeSets := [][]types.StoreKVPair{ + { + {StoreKey: "evm", Key: []byte("delete-in-block2"), Value: []byte("1")}, + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Value: []byte("1")}, + {StoreKey: "evm", Key: []byte("z-genesis-only"), Value: []byte("2")}, + {StoreKey: "evm", Key: []byte("modify-in-block2"), Value: []byte("1")}, + {StoreKey: "staking", Key: []byte("key1"), Value: []byte("value1")}, + {StoreKey: "staking", Key: []byte("key1/subkey"), Value: []byte("value1")}, + }, + { + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Delete: true}, + {StoreKey: "evm", Key: []byte("add-in-block1"), Value: []byte("1")}, + {StoreKey: "staking", Key: []byte("key1"), Delete: true}, + }, + { + {StoreKey: "evm", Key: []byte("add-in-block2"), Value: []byte("1")}, + {StoreKey: "evm", Key: []byte("delete-in-block2"), Delete: true}, + {StoreKey: "evm", Key: []byte("modify-in-block2"), Value: []byte("2")}, + {StoreKey: "evm", Key: []byte("key2"), Delete: true}, + {StoreKey: "staking", Key: []byte("key1"), Value: []byte("value2")}, + }, + { + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Value: []byte("2")}, + }, + { + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Delete: true}, + }, + } + for i, changeSet := range changeSets { + require.NoError(t, store.PutAtVersion(int64(i), changeSet)) + } +} + +func Run(t *testing.T, storeCreator func() VersionStore) { + testBasics(t, storeCreator()) + testIterator(t, storeCreator()) + testHeightInFuture(t, storeCreator()) + + // test delete in genesis + store := storeCreator() + err := store.PutAtVersion(0, []types.StoreKVPair{ + {StoreKey: "evm", Key: []byte{1}, Delete: true}, + }) + require.Error(t, err) +} + +func testBasics(t *testing.T, store VersionStore) { + var v int64 + + SetupTestDB(t, store) + + value, err := store.GetAtVersion("evm", []byte("z-genesis-only"), nil) + require.NoError(t, err) + require.Equal(t, value, []byte("2")) + + v = 4 + ok, err := store.HasAtVersion("evm", []byte("z-genesis-only"), &v) + require.NoError(t, err) + require.True(t, ok) + value, err = store.GetAtVersion("evm", []byte("z-genesis-only"), &v) + require.NoError(t, err) + require.Equal(t, value, []byte("2")) + + value, err = store.GetAtVersion("evm", []byte("re-add-in-block3"), nil) + require.NoError(t, err) + require.Empty(t, value) + + ok, err = store.HasAtVersion("staking", key1, nil) + require.NoError(t, err) + require.True(t, ok) + + value, err = store.GetAtVersion("staking", key1, nil) + require.NoError(t, err) + require.Equal(t, value, []byte("value2")) + + v = 2 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value, []byte("value2")) + + ok, err = store.HasAtVersion("staking", key1, &v) + require.NoError(t, err) + require.True(t, ok) + + v = 0 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value, []byte("value1")) + + v = 1 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Empty(t, value) + + ok, err = store.HasAtVersion("staking", key1, &v) + require.NoError(t, err) + require.False(t, ok) + + v = 0 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value1, value) + value, err = store.GetAtVersion("staking", key1_subkey, &v) + require.NoError(t, err) + require.Equal(t, value1, value) +} + +type KVPair struct { + Key []byte + Value []byte +} + +func testIterator(t *testing.T, store VersionStore) { + SetupTestDB(t, store) + + expItems := [][]KVPair{ + { + KVPair{[]byte("delete-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("1")}, + KVPair{[]byte("re-add-in-block3"), []byte("1")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("delete-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("1")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("add-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("2")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("add-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("2")}, + KVPair{[]byte("re-add-in-block3"), []byte("2")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("add-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("2")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + } + for i, exp := range expItems { + t.Run(fmt.Sprintf("block-%d", i), func(t *testing.T) { + v := int64(i) + it, err := store.IteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + require.Equal(t, exp, consumeIterator(it)) + + it, err = store.ReverseIteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + actual := consumeIterator(it) + require.Equal(t, len(exp), len(actual)) + require.Equal(t, reversed(exp), actual) + }) + } + + it, err := store.IteratorAtVersion("evm", nil, nil, nil) + require.Equal(t, expItems[len(expItems)-1], consumeIterator(it)) + + it, err = store.ReverseIteratorAtVersion("evm", nil, nil, nil) + require.Equal(t, reversed(expItems[len(expItems)-1]), consumeIterator(it)) + + // with start parameter + v := int64(2) + it, err = store.IteratorAtVersion("evm", []byte("\xff"), nil, &v) + require.NoError(t, err) + require.Empty(t, consumeIterator(it)) + it, err = store.ReverseIteratorAtVersion("evm", nil, []byte("\x00"), &v) + require.NoError(t, err) + require.Empty(t, consumeIterator(it)) + + it, err = store.IteratorAtVersion("evm", []byte("modify-in-block2"), nil, &v) + require.NoError(t, err) + require.Equal(t, expItems[2][len(expItems[2])-2:], consumeIterator(it)) + + it, err = store.ReverseIteratorAtVersion("evm", nil, []byte("mp"), &v) + require.NoError(t, err) + require.Equal(t, + reversed(expItems[2][:len(expItems[2])-1]), + consumeIterator(it), + ) + + it, err = store.ReverseIteratorAtVersion("evm", nil, []byte("modify-in-block3"), &v) + require.NoError(t, err) + require.Equal(t, + reversed(expItems[2][:len(expItems[2])-1]), + consumeIterator(it), + ) + + // delete the last key, cover some edge cases + v = int64(len(expItems)) + err = store.PutAtVersion( + v, + []types.StoreKVPair{ + {StoreKey: "evm", Key: []byte("z-genesis-only"), Delete: true}, + }, + ) + require.NoError(t, err) + it, err = store.IteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + require.Equal(t, + expItems[v-1][:len(expItems[v-1])-1], + consumeIterator(it), + ) + v -= 1 + it, err = store.IteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + require.Equal(t, + expItems[v], + consumeIterator(it), + ) +} + +func testHeightInFuture(t *testing.T, store VersionStore) { + SetupTestDB(t, store) + + latest, err := store.GetLatestVersion() + require.NoError(t, err) + + v := latest + 1 + _, err = store.GetAtVersion("staking", key1, &v) + require.Error(t, err) + _, err = store.HasAtVersion("staking", key1, &v) + require.Error(t, err) + _, err = store.IteratorAtVersion("staking", nil, nil, &v) + require.Error(t, err) + _, err = store.ReverseIteratorAtVersion("staking", nil, nil, &v) + require.Error(t, err) +} + +func consumeIterator(it dbm.Iterator) []KVPair { + var result []KVPair + for ; it.Valid(); it.Next() { + result = append(result, KVPair{it.Key(), it.Value()}) + } + it.Close() + return result +} + +// reversed clone and reverse the slice +func reversed[S ~[]E, E any](s S) []E { + r := make([]E, len(s)) + for i, j := 0, len(s)-1; i <= j; i, j = i+1, j-1 { + r[i], r[j] = s[j], s[i] + } + return r +} diff --git a/versiondb/dbutils.go b/versiondb/dbutils.go new file mode 100644 index 0000000000..2334d0f2c1 --- /dev/null +++ b/versiondb/dbutils.go @@ -0,0 +1,84 @@ +package versiondb + +import ( + "encoding/binary" + "sort" + + "github.com/RoaringBitmap/roaring/roaring64" +) + +var ChunkLimit = uint64(1950) // threshold beyond which MDBX overflow pages appear: 4096 / 2 - (keySize + 8) + +// CutLeft - cut from bitmap `targetSize` bytes from left +// removing lft part from `bm` +// returns nil on zero cardinality +func CutLeft64(bm *roaring64.Bitmap, sizeLimit uint64) *roaring64.Bitmap { + if bm.GetCardinality() == 0 { + return nil + } + + sz := bm.GetSerializedSizeInBytes() + if sz <= sizeLimit { + lft := roaring64.New() + lft.AddRange(bm.Minimum(), bm.Maximum()+1) + lft.And(bm) + lft.RunOptimize() + bm.Clear() + return lft + } + + from := bm.Minimum() + minMax := bm.Maximum() - bm.Minimum() + to := sort.Search(int(minMax), func(i int) bool { // can be optimized to avoid "too small steps", but let's leave it for readability + lft := roaring64.New() // bitmap.Clear() method intentionally not used here, because then serialized size of bitmap getting bigger + lft.AddRange(from, from+uint64(i)+1) + lft.And(bm) + lft.RunOptimize() + return lft.GetSerializedSizeInBytes() > sizeLimit + }) + + lft := roaring64.New() + lft.AddRange(from, from+uint64(to)) // no +1 because sort.Search returns element which is just higher threshold - but we need lower + lft.And(bm) + bm.RemoveRange(from, from+uint64(to)) + lft.RunOptimize() + return lft +} + +func WalkChunks64(bm *roaring64.Bitmap, sizeLimit uint64, f func(chunk *roaring64.Bitmap, isLast bool) error) error { + for bm.GetCardinality() > 0 { + if err := f(CutLeft64(bm, sizeLimit), bm.GetCardinality() == 0); err != nil { + return err + } + } + return nil +} + +func WalkChunkWithKeys64(k []byte, m *roaring64.Bitmap, sizeLimit uint64, f func(chunkKey []byte, chunk *roaring64.Bitmap) error) error { + return WalkChunks64(m, sizeLimit, func(chunk *roaring64.Bitmap, isLast bool) error { + chunkKey := make([]byte, len(k)+8) + copy(chunkKey, k) + if isLast { + binary.BigEndian.PutUint64(chunkKey[len(k):], ^uint64(0)) + } else { + binary.BigEndian.PutUint64(chunkKey[len(k):], chunk.Maximum()) + } + return f(chunkKey, chunk) + }) +} + +// SeekInBitmap64 - returns value in bitmap which is >= n +func SeekInBitmap64(m *roaring64.Bitmap, n uint64) (found uint64, ok bool) { + if m == nil || m.IsEmpty() { + return 0, false + } + if n == 0 { + return m.Minimum(), true + } + searchRank := m.Rank(n - 1) + if searchRank >= m.GetCardinality() { + return 0, false + } + found, _ = m.Select(searchRank) + return found, true +} diff --git a/versiondb/multistore.go b/versiondb/multistore.go new file mode 100644 index 0000000000..4871558fd9 --- /dev/null +++ b/versiondb/multistore.go @@ -0,0 +1,135 @@ +package versiondb + +import ( + "io" + "sync" + + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/cosmos/cosmos-sdk/store/mem" + "github.com/cosmos/cosmos-sdk/store/transient" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ sdk.MultiStore = (*MultiStore)(nil) + +type MultiStore struct { + versionDB VersionStore + storeKeys []types.StoreKey + + // transient or memory stores + transientStores map[types.StoreKey]types.KVStore + + traceWriter io.Writer + traceContext types.TraceContext + traceContextMutex sync.Mutex +} + +func NewMultiStore(versionDB VersionStore, storeKeys []types.StoreKey) *MultiStore { + return &MultiStore{versionDB: versionDB, storeKeys: storeKeys, transientStores: make(map[types.StoreKey]types.KVStore)} +} + +func (s *MultiStore) GetStoreType() types.StoreType { + return types.StoreTypeMulti +} + +func (s *MultiStore) cacheMultiStore(version *int64) sdk.CacheMultiStore { + stores := make(map[types.StoreKey]types.CacheWrapper, len(s.transientStores)+len(s.storeKeys)) + for k, v := range s.transientStores { + stores[k] = v + } + for _, k := range s.storeKeys { + stores[k] = NewKVStore(s.versionDB, k, version) + } + return cachemulti.NewStore(nil, stores, nil, s.traceWriter, s.getTracingContext()) +} + +func (s *MultiStore) CacheMultiStore() sdk.CacheMultiStore { + return s.cacheMultiStore(nil) +} + +func (s *MultiStore) CacheMultiStoreWithVersion(version int64) (sdk.CacheMultiStore, error) { + return s.cacheMultiStore(&version), nil +} + +// CacheWrap implements CacheWrapper/MultiStore/CommitStore. +func (s *MultiStore) CacheWrap() types.CacheWrap { + return s.CacheMultiStore().(types.CacheWrap) +} + +// CacheWrapWithTrace implements the CacheWrapper interface. +func (s *MultiStore) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap { + return s.CacheWrap() +} + +func (s *MultiStore) GetStore(storeKey types.StoreKey) sdk.Store { + return s.GetKVStore(storeKey) +} + +func (s *MultiStore) GetKVStore(storeKey types.StoreKey) sdk.KVStore { + store, ok := s.transientStores[storeKey] + if ok { + return store + } + return NewKVStore(s.versionDB, storeKey, nil) +} + +func (s *MultiStore) MountTransientStores(keys map[string]*types.TransientStoreKey) { + for _, key := range keys { + s.transientStores[key] = transient.NewStore() + } +} + +func (s *MultiStore) MountMemoryStores(keys map[string]*types.MemoryStoreKey) { + for _, key := range keys { + s.transientStores[key] = mem.NewStore() + } +} + +// SetTracer sets the tracer for the MultiStore that the underlying +// stores will utilize to trace operations. A MultiStore is returned. +func (s *MultiStore) SetTracer(w io.Writer) types.MultiStore { + s.traceWriter = w + return s +} + +// SetTracingContext updates the tracing context for the MultiStore by merging +// the given context with the existing context by key. Any existing keys will +// be overwritten. It is implied that the caller should update the context when +// necessary between tracing operations. It returns a modified MultiStore. +func (s *MultiStore) SetTracingContext(tc types.TraceContext) types.MultiStore { + s.traceContextMutex.Lock() + defer s.traceContextMutex.Unlock() + s.traceContext = s.traceContext.Merge(tc) + + return s +} + +func (s *MultiStore) getTracingContext() types.TraceContext { + s.traceContextMutex.Lock() + defer s.traceContextMutex.Unlock() + + if s.traceContext == nil { + return nil + } + + ctx := types.TraceContext{} + for k, v := range s.traceContext { + ctx[k] = v + } + + return ctx +} + +// TracingEnabled returns if tracing is enabled for the MultiStore. +func (s *MultiStore) TracingEnabled() bool { + return s.traceWriter != nil +} + +func (s *MultiStore) LatestVersion() int64 { + version, err := s.versionDB.GetLatestVersion() + if err != nil { + panic(err) + } + return version +} diff --git a/versiondb/store.go b/versiondb/store.go new file mode 100644 index 0000000000..906423fb9e --- /dev/null +++ b/versiondb/store.go @@ -0,0 +1,94 @@ +package versiondb + +import ( + "io" + "time" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" +) + +var _ types.KVStore = (*Store)(nil) + +// Store Implements types.KVStore +type Store struct { + store VersionStore + storeKey types.StoreKey + version *int64 +} + +func NewKVStore(store VersionStore, storeKey types.StoreKey, version *int64) *Store { + return &Store{store, storeKey, version} +} + +// Implements Store. +func (st *Store) GetStoreType() types.StoreType { + // FIXME + return types.StoreTypeIAVL +} + +// Implements Store. +func (st *Store) CacheWrap() types.CacheWrap { + return cachekv.NewStore(st) +} + +// CacheWrapWithTrace implements the Store interface. +func (st *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.CacheWrap { + return cachekv.NewStore(tracekv.NewStore(st, w, tc)) +} + +// CacheWrapWithListeners implements the CacheWrapper interface. +func (st *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return cachekv.NewStore(listenkv.NewStore(st, storeKey, listeners)) +} + +// Implements types.KVStore. +func (st *Store) Get(key []byte) []byte { + defer telemetry.MeasureSince(time.Now(), "store", "iavl", "get") + value, err := st.store.GetAtVersion(st.storeKey.Name(), key, st.version) + if err != nil { + panic(err) + } + return value +} + +// Implements types.KVStore. +func (st *Store) Has(key []byte) (exists bool) { + defer telemetry.MeasureSince(time.Now(), "store", "iavl", "has") + has, err := st.store.HasAtVersion(st.storeKey.Name(), key, st.version) + if err != nil { + panic(err) + } + return has +} + +// Implements types.KVStore. +func (st *Store) Iterator(start, end []byte) types.Iterator { + itr, err := st.store.IteratorAtVersion(st.storeKey.Name(), start, end, st.version) + if err != nil { + panic(err) + } + return itr +} + +// Implements types.KVStore. +func (st *Store) ReverseIterator(start, end []byte) types.Iterator { + itr, err := st.store.ReverseIteratorAtVersion(st.storeKey.Name(), start, end, st.version) + if err != nil { + panic(err) + } + return itr +} + +// Implements types.KVStore. +func (st *Store) Set(key, value []byte) { + panic("write operation is not supported") +} + +// Implements types.KVStore. +func (st *Store) Delete(key []byte) { + panic("write operation is not supported") +} diff --git a/versiondb/streaming_service.go b/versiondb/streaming_service.go new file mode 100644 index 0000000000..e56aef0e18 --- /dev/null +++ b/versiondb/streaming_service.go @@ -0,0 +1,84 @@ +package versiondb + +import ( + "sort" + "strings" + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ baseapp.StreamingService = &StreamingService{} + +// StreamingService is a concrete implementation of StreamingService that accumulate the state changes in current block, +// writes the ordered changeset out to version storage. +type StreamingService struct { + listeners []*types.MemoryListener // the listeners that will be initialized with BaseApp + versionStore VersionStore + currentBlockNumber int64 // the current block number +} + +// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewStreamingService(versionStore VersionStore, storeKeys []types.StoreKey) *StreamingService { + // sort by the storeKeys first + sort.SliceStable(storeKeys, func(i, j int) bool { + return strings.Compare(storeKeys[i].Name(), storeKeys[j].Name()) < 0 + }) + + listeners := make([]*types.MemoryListener, len(storeKeys)) + for i, key := range storeKeys { + listeners[i] = types.NewMemoryListener(key) + } + return &StreamingService{listeners, versionStore, 0} +} + +// Listeners satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { + listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.listeners)) + for _, listener := range fss.listeners { + listeners[listener.StoreKey()] = []types.WriteListener{listener} + } + return listeners +} + +// ListenBeginBlock satisfies the baseapp.ABCIListener interface +// It sets the currentBlockNumber. +func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + fss.currentBlockNumber = req.GetHeader().Height + return nil +} + +// ListenDeliverTx satisfies the baseapp.ABCIListener interface +func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { + return nil +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It merge the state caches of all the listeners together, and write out to the versionStore. +func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { + return nil +} + +func (fss *StreamingService) ListenCommit(ctx sdk.Context, res abci.ResponseCommit) error { + // concat the state caches + var changeSet []types.StoreKVPair + for _, listener := range fss.listeners { + changeSet = append(changeSet, listener.PopStateCache()...) + } + + return fss.versionStore.PutAtVersion(fss.currentBlockNumber, changeSet) +} + +// Stream satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { + return nil +} + +// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Close() error { + return nil +} diff --git a/versiondb/sync.go b/versiondb/sync.go new file mode 100644 index 0000000000..4c89590e4c --- /dev/null +++ b/versiondb/sync.go @@ -0,0 +1,31 @@ +package versiondb + +import ( + "bufio" + "io" + + protoio "github.com/gogo/protobuf/io" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +const maxItemSize = 64000000 // SDK has no key/value size limit, so we set an arbitrary limit + +// ReadFileStreamer parse a binary stream dumped by file streamer to changeset, +// which can be feeded to version store. +func ReadFileStreamer(input *bufio.Reader) ([]types.StoreKVPair, error) { + var changeSet []types.StoreKVPair + reader := protoio.NewDelimitedReader(input, maxItemSize) + for { + var msg types.StoreKVPair + err := reader.ReadMsg(&msg) + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + changeSet = append(changeSet, msg) + } + return changeSet, nil +} diff --git a/versiondb/sync_test.go b/versiondb/sync_test.go new file mode 100644 index 0000000000..85498ee07f --- /dev/null +++ b/versiondb/sync_test.go @@ -0,0 +1,340 @@ +package versiondb + +import ( + "bufio" + "bytes" + "encoding/hex" + "strings" + "testing" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/stretchr/testify/require" +) + +const data = ` +a2380aff030a20ff2dd95def59a0c5265f30768a1c6fbdab519c96fb98df +33d524a94575bac1e91292030a02080b120c63726f6e6f735f3737372d31 +1802220c08d9a39a9a0610b8d89eca022a480a202d8f46f61152696c6dc8 +0c6367c7ddc926dc3ed7ab23bd37fb7ca83eb00ba207122408011220df77 +0b6cb0a9694f550ae515d5fb0ebc08902e0fa47ceec816928e74fd8fee44 +322004943bc3709104b52f30f83ca3d30d8bf57551edd48797aef5cee50b +b532c15c3a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934c +a495991b7852b8554220f537a6e0561fa0edd12b30ec9b6479e659f6f1fa +1587e69556201bfaf4cf97404a20f537a6e0561fa0edd12b30ec9b6479e6 +59f6f1fa1587e69556201bfaf4cf97405220252fe7cf36dd1bb85dafc47a +08961df0cfd8c027defa5e01e958be121599db9d5a209048462db52bb809 +6c92f8733fcf26455a000a550d9f3748dddadcc2267917146220e3b0c442 +98fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8556a20 +e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852 +b8557214ae7ccd8d599769209074b81d0c2f3f28624742b71a4612210a1d +0a142861b93c776d100695688250c5b1ba4d44ef2a791880a094a58d1d10 +0112210a1d0a14ae7ccd8d599769209074b81d0c2f3f28624742b71880a0 +94a58d1d100112ae100a630a0d636f696e5f726563656976656412360a08 +7265636569766572122a637263316d33683330776c767366386c6c727578 +7470756b64767379306b6d326b756d386c3061773437121a0a06616d6f75 +6e74121034313139343530383537377374616b650a5c0a08636f696e6261 +736512340a066d696e746572122a637263316d33683330776c767366386c +6c7275787470756b64767379306b6d326b756d386c3061773437121a0a06 +616d6f756e74121034313139343530383537377374616b650a5f0a0a636f +696e5f7370656e7412350a077370656e646572122a637263316d33683330 +776c767366386c6c7275787470756b64767379306b6d326b756d386c3061 +773437121a0a06616d6f756e74121034313139343530383537377374616b +650a630a0d636f696e5f726563656976656412360a087265636569766572 +122a637263313778706676616b6d32616d67393632796c73366638347a33 +6b656c6c3863356c3838656b6572121a0a06616d6f756e74121034313139 +343530383537377374616b650a95010a087472616e7366657212370a0972 +6563697069656e74122a637263313778706676616b6d32616d6739363279 +6c73366638347a336b656c6c3863356c3838656b657212340a0673656e64 +6572122a637263316d33683330776c767366386c6c7275787470756b6476 +7379306b6d326b756d386c3061773437121a0a06616d6f756e7412103431 +3139343530383537377374616b650a3f0a076d65737361676512340a0673 +656e646572122a637263316d33683330776c767366386c6c727578747075 +6b64767379306b6d326b756d386c30617734370aa2010a046d696e741224 +0a0c626f6e6465645f726174696f1214302e393939393939393739343032 +37343439353212210a09696e666c6174696f6e1214302e31323939393939 +3739373130313635333031123a0a11616e6e75616c5f70726f766973696f +6e7312253235393939393936343737353631363138382e37363031383234 +363033383333383838343312150a06616d6f756e74120b34313139343530 +383537370a5f0a0a636f696e5f7370656e7412350a077370656e64657212 +2a637263313778706676616b6d32616d67393632796c73366638347a336b +656c6c3863356c3838656b6572121a0a06616d6f756e7412103832333839 +3031393532307374616b650a630a0d636f696e5f72656365697665641236 +0a087265636569766572122a637263316a7636357333677271663676366a +6c33647034743663397439726b3939636438737037326d70121a0a06616d +6f756e74121038323338393031393532307374616b650a95010a08747261 +6e7366657212370a09726563697069656e74122a637263316a7636357333 +677271663676366a6c33647034743663397439726b393963643873703732 +6d7012340a0673656e646572122a637263313778706676616b6d32616d67 +393632796c73366638347a336b656c6c3863356c3838656b6572121a0a06 +616d6f756e74121038323338393031393532307374616b650a3f0a076d65 +737361676512340a0673656e646572122a637263313778706676616b6d32 +616d67393632796c73366638347a336b656c6c3863356c3838656b65720a +7f0a0f70726f706f7365725f726577617264122c0a06616d6f756e741222 +343131393435303937362e30303030303030303030303030303030303073 +74616b65123e0a0976616c696461746f72123163726376616c6f70657231 +326c756b75367578656868616b303270793472637a36357a753073776837 +776a36756c726c670a790a0a636f6d6d697373696f6e122b0a06616d6f75 +6e7412213431313934353039372e36303030303030303030303030303030 +30307374616b65123e0a0976616c696461746f72123163726376616c6f70 +657231326c756b75367578656868616b303270793472637a36357a753073 +776837776a36756c726c670a770a0772657761726473122c0a06616d6f75 +6e741222343131393435303937362e303030303030303030303030303030 +3030307374616b65123e0a0976616c696461746f72123163726376616c6f +70657231326c756b75367578656868616b303270793472637a36357a7530 +73776837776a36756c726c670a7a0a0a636f6d6d697373696f6e122c0a06 +616d6f756e741222333833313038393430372e3638303030303030303030 +303030303030307374616b65123e0a0976616c696461746f721231637263 +76616c6f70657231326c756b75367578656868616b303270793472637a36 +357a753073776837776a36756c726c670a780a0772657761726473122d0a +06616d6f756e74122333383331303839343037362e383030303030303030 +3030303030303030307374616b65123e0a0976616c696461746f72123163 +726376616c6f70657231326c756b75367578656868616b30327079347263 +7a36357a753073776837776a36756c726c670a7a0a0a636f6d6d69737369 +6f6e122c0a06616d6f756e741222333833313038393430372e3638303030 +303030303030303030303030307374616b65123e0a0976616c696461746f +72123163726376616c6f70657231387a367133386d68767473767972356d +616b38666a38733867346777376b6a6a7030653064680a780a0772657761 +726473122d0a06616d6f756e74122333383331303839343037362e383030 +3030303030303030303030303030307374616b65123e0a0976616c696461 +746f72123163726376616c6f70657231387a367133386d68767473767972 +356d616b38666a38733867346777376b6a6a7030653064680a250a0a6665 +655f6d61726b657412170a08626173655f666565120b3736353632353030 +3030301aa9100aa4040aa1040afa020aba020a2a2f636f736d6f732e7374 +616b696e672e763162657461312e4d736743726561746556616c69646174 +6f72128b020a070a056e6f646530123b0a12313030303030303030303030 +30303030303012123230303030303030303030303030303030301a113130 +3030303030303030303030303030301a0131222a63726331326c756b7536 +7578656868616b303270793472637a36357a753073776837776a73727730 +70702a3163726376616c6f70657231326c756b75367578656868616b3032 +70793472637a36357a753073776837776a36756c726c6732430a1d2f636f +736d6f732e63727970746f2e656432353531392e5075624b657912220a20 +9b86839433f4229b7b7b51554a25ed32549126eb80fe53497ae336a1e67b +66843a1c0a057374616b6512133130303030303030303030303030303030 +3030123b3439643130313933363334303730393664303337396438643863 +3833336438626633396334383866403139322e3136382e302e35343a3236 +363536125f0a570a4f0a282f65746865726d696e742e63727970746f2e76 +312e657468736563703235366b312e5075624b657912230a21026e710a62 +a342de0ed4d7c4532dcbcbbafbf19652ed67b237efab70e8b207efac1204 +0a020801120410c09a0c1a4119ffd1b342c44e183b8113561595186e1f72 +c7273ac4e9ccb04c2b8a4d31b3780eff63cb490102ed39f7482084b4ddb8 +4d672ee2607262a312724f0e5b2b79aa0012ff0b123612340a322f636f73 +6d6f732e7374616b696e672e763162657461312e4d736743726561746556 +616c696461746f72526573706f6e73651ae0055b7b226d73675f696e6465 +78223a302c226576656e7473223a5b7b2274797065223a22636f696e5f72 +65636569766564222c2261747472696275746573223a5b7b226b6579223a +227265636569766572222c2276616c7565223a22637263317479676d7333 +78686873337976343837706878336477346139356a6e3774376c6b393067 +6161227d2c7b226b6579223a22616d6f756e74222c2276616c7565223a22 +313030303030303030303030303030303030307374616b65227d5d7d2c7b +2274797065223a22636f696e5f7370656e74222c22617474726962757465 +73223a5b7b226b6579223a227370656e646572222c2276616c7565223a22 +63726331326c756b75367578656868616b303270793472637a36357a7530 +73776837776a737277307070227d2c7b226b6579223a22616d6f756e7422 +2c2276616c7565223a223130303030303030303030303030303030303073 +74616b65227d5d7d2c7b2274797065223a226372656174655f76616c6964 +61746f72222c2261747472696275746573223a5b7b226b6579223a227661 +6c696461746f72222c2276616c7565223a2263726376616c6f7065723132 +6c756b75367578656868616b303270793472637a36357a75307377683777 +6a36756c726c67227d2c7b226b6579223a22616d6f756e74222c2276616c +7565223a22313030303030303030303030303030303030307374616b6522 +7d5d7d2c7b2274797065223a226d657373616765222c2261747472696275 +746573223a5b7b226b6579223a22616374696f6e222c2276616c7565223a +222f636f736d6f732e7374616b696e672e763162657461312e4d73674372 +6561746556616c696461746f72227d2c7b226b6579223a226d6f64756c65 +222c2276616c7565223a227374616b696e67227d2c7b226b6579223a2273 +656e646572222c2276616c7565223a2263726331326c756b753675786568 +68616b303270793472637a36357a753073776837776a737277307070227d +5d7d5d7d5d28ffffffffffffffffff0130bca10d3a440a02747812050a03 +66656512370a096665655f7061796572122a63726331326c756b75367578 +656868616b303270793472637a36357a753073776837776a737277307070 +3a3d0a02747812370a076163635f736571122c63726331326c756b753675 +78656868616b303270793472637a36357a753073776837776a7372773070 +702f303a6b0a02747812650a097369676e6174757265125847662f527330 +4c455468673767524e57465a55596268397978796336784f6e4d73457772 +696b30787333674f2f32504c5351454337546e3353434345744e32345457 +6375346d427959714d53636b384f577974357167413d3a3f0a076d657373 +61676512340a06616374696f6e122a2f636f736d6f732e7374616b696e67 +2e763162657461312e4d736743726561746556616c696461746f723a670a +0a636f696e5f7370656e7412350a077370656e646572122a63726331326c +756b75367578656868616b303270793472637a36357a753073776837776a +73727730707012220a06616d6f756e741218313030303030303030303030 +303030303030307374616b653a6b0a0d636f696e5f726563656976656412 +360a087265636569766572122a637263317479676d733378686873337976 +343837706878336477346139356a6e3774376c6b393067616112220a0661 +6d6f756e741218313030303030303030303030303030303030307374616b +653a760a106372656174655f76616c696461746f72123e0a0976616c6964 +61746f72123163726376616c6f70657231326c756b75367578656868616b +303270793472637a36357a753073776837776a36756c726c6712220a0661 +6d6f756e741218313030303030303030303030303030303030307374616b +653a520a076d65737361676512110a066d6f64756c6512077374616b696e +6712340a0673656e646572122a63726331326c756b75367578656868616b +303270793472637a36357a753073776837776a7372773070701aa9100aa4 +040aa1040afa020aba020a2a2f636f736d6f732e7374616b696e672e7631 +62657461312e4d736743726561746556616c696461746f72128b020a070a +056e6f646531123b0a123130303030303030303030303030303030301212 +3230303030303030303030303030303030301a1131303030303030303030 +303030303030301a0131222a63726331387a367133386d68767473767972 +356d616b38666a38733867346777376b6a6a747367726e372a3163726376 +616c6f70657231387a367133386d68767473767972356d616b38666a3873 +3867346777376b6a6a70306530646832430a1d2f636f736d6f732e637279 +70746f2e656432353531392e5075624b657912220a2072b50cf0ed1863ff +c937af99b6ad779a2c223e59459eab7768bda7c2da6f836e3a1c0a057374 +616b65121331303030303030303030303030303030303030123b35396566 +333139663464383334396466626532613233653131356163353835356430 +303938613065403139322e3136382e302e35343a3236363536125f0a570a +4f0a282f65746865726d696e742e63727970746f2e76312e657468736563 +703235366b312e5075624b657912230a210242785a75074452d62a6ac222 +70ffb8fb01c9375d0ba72887ae800dc619315d1b12040a020801120410c0 +9a0c1a41e7019fd760970e02f8967aa0f9820c0b98de32d8e72601aa34fe +60df52356d19591eb2bd8516037d2c52c22170ca533abf72d50d4c7f770d +1d5e045df51ff89c0112ff0b123612340a322f636f736d6f732e7374616b +696e672e763162657461312e4d736743726561746556616c696461746f72 +526573706f6e73651ae0055b7b226d73675f696e646578223a302c226576 +656e7473223a5b7b2274797065223a22636f696e5f726563656976656422 +2c2261747472696275746573223a5b7b226b6579223a2272656365697665 +72222c2276616c7565223a22637263317479676d73337868687333797634 +3837706878336477346139356a6e3774376c6b3930676161227d2c7b226b +6579223a22616d6f756e74222c2276616c7565223a223130303030303030 +30303030303030303030307374616b65227d5d7d2c7b2274797065223a22 +636f696e5f7370656e74222c2261747472696275746573223a5b7b226b65 +79223a227370656e646572222c2276616c7565223a2263726331387a3671 +33386d68767473767972356d616b38666a38733867346777376b6a6a7473 +67726e37227d2c7b226b6579223a22616d6f756e74222c2276616c756522 +3a22313030303030303030303030303030303030307374616b65227d5d7d +2c7b2274797065223a226372656174655f76616c696461746f72222c2261 +747472696275746573223a5b7b226b6579223a2276616c696461746f7222 +2c2276616c7565223a2263726376616c6f70657231387a367133386d6876 +7473767972356d616b38666a38733867346777376b6a6a70306530646822 +7d2c7b226b6579223a22616d6f756e74222c2276616c7565223a22313030 +303030303030303030303030303030307374616b65227d5d7d2c7b227479 +7065223a226d657373616765222c2261747472696275746573223a5b7b22 +6b6579223a22616374696f6e222c2276616c7565223a222f636f736d6f73 +2e7374616b696e672e763162657461312e4d736743726561746556616c69 +6461746f72227d2c7b226b6579223a226d6f64756c65222c2276616c7565 +223a227374616b696e67227d2c7b226b6579223a2273656e646572222c22 +76616c7565223a2263726331387a367133386d68767473767972356d616b +38666a38733867346777376b6a6a747367726e37227d5d7d5d7d5d28ffff +ffffffffffffff0130fc8c0d3a440a02747812050a0366656512370a0966 +65655f7061796572122a63726331387a367133386d68767473767972356d +616b38666a38733867346777376b6a6a747367726e373a3d0a0274781237 +0a076163635f736571122c63726331387a367133386d6876747376797235 +6d616b38666a38733867346777376b6a6a747367726e372f303a6b0a0274 +7812650a097369676e61747572651258357747663132435844674c346c6e +71672b59494d43356a654d746a6e4a6747714e5035673331493162526c5a +48724b39685259446653785377694677796c4d3676334c564455782f6477 +30645867526439522f346e41453d3a3f0a076d65737361676512340a0661 +6374696f6e122a2f636f736d6f732e7374616b696e672e76316265746131 +2e4d736743726561746556616c696461746f723a670a0a636f696e5f7370 +656e7412350a077370656e646572122a63726331387a367133386d687674 +73767972356d616b38666a38733867346777376b6a6a747367726e371222 +0a06616d6f756e7412183130303030303030303030303030303030303073 +74616b653a6b0a0d636f696e5f726563656976656412360a087265636569 +766572122a637263317479676d7333786868733379763438377068783364 +77346139356a6e3774376c6b393067616112220a06616d6f756e74121831 +3030303030303030303030303030303030307374616b653a760a10637265 +6174655f76616c696461746f72123e0a0976616c696461746f7212316372 +6376616c6f70657231387a367133386d68767473767972356d616b38666a +38733867346777376b6a6a70306530646812220a06616d6f756e74121831 +3030303030303030303030303030303030307374616b653a520a076d6573 +7361676512110a066d6f64756c6512077374616b696e6712340a0673656e +646572122a63726331387a367133386d68767473767972356d616b38666a +38733867346777376b6a6a747367726e37220208022aec0212260a090880 +804010e0aeee26120e08a08d0612040880c60a188080401a090a07656432 +353531391a9a020a0b626c6f636b5f626c6f6f6d128a020a05626c6f6f6d +128002000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000001a250a09626c6f636b5f67 +6173120b0a06686569676874120132120b0a06616d6f756e741201303222 +1220b995a75e975242574ff1730feaf1e9255743e86ff9d949b04e2906ad +fd10f824230a0462616e6b1a06007374616b652213323030303030303038 +32333839303139353230300a0462616e6b1a1b021493354845030274cd4b +f1686abd60ab28ec52e1a77374616b65220b383233383930313935323025 +0a0462616e6b10011a1b0214dc6f17bbec824fff8f86587966b2047db6ab +73677374616b65250a0462616e6b10011a1b0214f1829676db577682e944 +fc3493d451b67ff3e29f7374616b65270a0462616e6b1a1c037374616b65 +001493354845030274cd4bf1686abd60ab28ec52e1a7220100260a046261 +6e6b10011a1c037374616b650014dc6f17bbec824fff8f86587966b2047d +b6ab7367260a0462616e6b10011a1c037374616b650014f1829676db5776 +82e944fc3493d451b67ff3e29f3a0a0c646973747269627574696f6e1a01 +0022270a250a057374616b65121c31363437373830333930343030303030 +303030303030303030303030290a0c646973747269627574696f6e1a0101 +22160a14ae7ccd8d599769209074b81d0c2f3f28624742b7500a0c646973 +747269627574696f6e1a16021438b4089f7762e0c20e9bed8e991e074550 +ef5a5222280a260a057374616b65121d3338333130383934303736383030 +303030303030303030303030303030500a0c646973747269627574696f6e +1a16021457f96e6b86cdefdb3d412547816a82e3e0ebf9d222280a260a05 +7374616b65121d3432343330333435303532383030303030303030303030 +303030303030520a0c646973747269627574696f6e1a16061438b4089f77 +62e0c20e9bed8e991e074550ef5a52222a0a260a057374616b65121d3334 +343739383034363639313230303030303030303030303030303030100252 +0a0c646973747269627574696f6e1a16061457f96e6b86cdefdb3d412547 +816a82e3e0ebf9d2222a0a260a057374616b65121d333831383733313035 +343735323030303030303030303030303030303010024f0a0c6469737472 +69627574696f6e1a16071438b4089f7762e0c20e9bed8e991e074550ef5a +5222270a250a057374616b65121c33383331303839343037363830303030 +3030303030303030303030304f0a0c646973747269627574696f6e1a1607 +1457f96e6b86cdefdb3d412547816a82e3e0ebf9d222270a250a05737461 +6b65121c3432343330333435303532383030303030303030303030303030 +3030180a096665656d61726b65741a010122080000000000000000450a04 +6d696e741a0100223a0a1231323939393939373937313031363533303112 +243235393939393936343737353631363138383736303138323436303338 +333338383834332a0a06706172616d731a116665656d61726b65742f4261 +7365466565220d223736353632353030303030225b0a08736c617368696e +671a1601142861b93c776d100695688250c5b1ba4d44ef2a7922370a3163 +726376616c636f6e73313970736d6a307268643567716439746773666776 +7476643666347a7737326e656674666d6730180122005b0a08736c617368 +696e671a160114ae7ccd8d599769209074b81d0c2f3f28624742b722370a +3163726376616c636f6e7331346537766d7232656a61356a707972356871 +77736374656c397033797773346874333468797a18012200cd070a077374 +616b696e671a02503222bd070a92030a02080b120c63726f6e6f735f3737 +372d311802220c08d9a39a9a0610b8d89eca022a480a202d8f46f6115269 +6c6dc80c6367c7ddc926dc3ed7ab23bd37fb7ca83eb00ba2071224080112 +20df770b6cb0a9694f550ae515d5fb0ebc08902e0fa47ceec816928e74fd +8fee44322004943bc3709104b52f30f83ca3d30d8bf57551edd48797aef5 +cee50bb532c15c3a20e3b0c44298fc1c149afbf4c8996fb92427ae41e464 +9b934ca495991b7852b8554220f537a6e0561fa0edd12b30ec9b6479e659 +f6f1fa1587e69556201bfaf4cf97404a20f537a6e0561fa0edd12b30ec9b +6479e659f6f1fa1587e69556201bfaf4cf97405220252fe7cf36dd1bb85d +afc47a08961df0cfd8c027defa5e01e958be121599db9d5a209048462db5 +2bb8096c92f8733fcf26455a000a550d9f3748dddadcc2267917146220e3 +b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8 +556a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca49599 +1b7852b8557214ae7ccd8d599769209074b81d0c2f3f28624742b7129102 +0a3163726376616c6f70657231326c756b75367578656868616b30327079 +3472637a36357a753073776837776a36756c726c6712430a1d2f636f736d +6f732e63727970746f2e656432353531392e5075624b657912220a209b86 +839433f4229b7b7b51554a25ed32549126eb80fe53497ae336a1e67b6684 +20032a133130303030303030303030303030303030303032253130303030 +303030303030303030303030303030303030303030303030303030303030 +30303a070a056e6f6465304a00524b0a3b0a123130303030303030303030 +3030303030303012123230303030303030303030303030303030301a1131 +30303030303030303030303030303030120c08d1a39a9a0610a89eb0a602 +5a01311291020a3163726376616c6f70657231387a367133386d68767473 +767972356d616b38666a38733867346777376b6a6a70306530646812430a +1d2f636f736d6f732e63727970746f2e656432353531392e5075624b6579 +12220a2072b50cf0ed1863ffc937af99b6ad779a2c223e59459eab7768bd +a7c2da6f836e20032a133130303030303030303030303030303030303032 +253130303030303030303030303030303030303030303030303030303030 +30303030303030303a070a056e6f6465314a00524b0a3b0a123130303030 +303030303030303030303030301212323030303030303030303030303030 +3030301a113130303030303030303030303030303030120c08d1a39a9a06 +10a89eb0a6025a0131 +` + +func TestReadFileStreamer(t *testing.T) { + buf, err := hex.DecodeString(strings.Replace(data, "\n", "", -1)) + require.NoError(t, err) + + changeSet, err := ReadFileStreamer(bufio.NewReader(bytes.NewReader(buf))) + require.NoError(t, err) + + require.Equal(t, 21, len(changeSet)) + expItem := types.StoreKVPair{StoreKey: "bank", Delete: false, Key: []uint8{0x0, 0x73, 0x74, 0x61, 0x6b, 0x65}, Value: []uint8{0x32, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x38, 0x32, 0x33, 0x38, 0x39, 0x30, 0x31, 0x39, 0x35, 0x32, 0x30}} + require.Equal(t, expItem, changeSet[0]) +} diff --git a/versiondb/tmdb/history.go b/versiondb/tmdb/history.go new file mode 100644 index 0000000000..0b0438b3ec --- /dev/null +++ b/versiondb/tmdb/history.go @@ -0,0 +1,67 @@ +package tmdb + +import ( + "bytes" + + "github.com/RoaringBitmap/roaring/roaring64" + + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +// GetHistoryIndex returns the history index bitmap. +func GetHistoryIndex(db dbm.DB, key []byte) (*roaring64.Bitmap, error) { + // try to seek the first chunk whose maximum is bigger or equal to the target height. + bz, err := db.Get(key) + if err != nil { + return nil, err + } + if len(bz) == 0 { + return nil, nil + } + m := roaring64.New() + _, err = m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return nil, err + } + return m, nil +} + +// SeekHistoryIndex locate the minimal version that changed the key and is larger than the target version, +// using the returned version can find the value for the target version in changeset table. +// If not found, return -1 +func SeekHistoryIndex(db dbm.DB, key []byte, version uint64) (int64, error) { + m, err := GetHistoryIndex(db, key) + if err != nil { + return -1, err + } + found, ok := versiondb.SeekInBitmap64(m, version+1) + if !ok { + return -1, nil + } + return int64(found), nil +} + +// WriteHistoryIndex set the block height to the history bitmap. +// it try to set to the last chunk, if the last chunk exceeds chunk limit, split it. +func WriteHistoryIndex(db dbm.DB, batch dbm.Batch, key []byte, height uint64) error { + bz, err := db.Get(key) + if err != nil { + return err + } + + m := roaring64.New() + if len(bz) > 0 { + _, err = m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return err + } + } + m.Add(height) + m.RunOptimize() + bz, err = m.ToBytes() + if err != nil { + return err + } + return batch.Set(key, bz) +} diff --git a/versiondb/tmdb/iterator.go b/versiondb/tmdb/iterator.go new file mode 100644 index 0000000000..bdbc9a6e6d --- /dev/null +++ b/versiondb/tmdb/iterator.go @@ -0,0 +1,183 @@ +package tmdb + +import ( + "bytes" + + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +type Iterator struct { + storeKey string + version int64 + + start, end []byte + + plain, history types.Iterator + changesetDB dbm.DB + + key, value []byte + + reverse bool + status int + err error +} + +var _ types.Iterator = (*Iterator)(nil) + +func NewIterator(storeKey string, version int64, plainDB, historyDB types.KVStore, changesetDB dbm.DB, start, end []byte, reverse bool) (types.Iterator, error) { + var plain, history types.Iterator + + if reverse { + plain = plainDB.ReverseIterator(start, end) + } else { + plain = plainDB.Iterator(start, end) + } + + if reverse { + history = historyDB.ReverseIterator(start, end) + } else { + history = historyDB.Iterator(start, end) + } + iter := &Iterator{ + storeKey: storeKey, version: version, + reverse: reverse, + start: start, end: end, + plain: plain, history: history, + changesetDB: changesetDB, + } + iter.err = iter.resolve() + return iter, nil +} + +// Domain implements types.Iterator. +func (iter *Iterator) Domain() ([]byte, []byte) { + return iter.start, iter.end +} + +func (iter *Iterator) Valid() bool { + return iter.err == nil && len(iter.key) > 0 +} + +func (iter *Iterator) Next() { + switch iter.status { + case -2: + return + case 0: + iter.plain.Next() + iter.history.Next() + case 1: + iter.history.Next() + case -1: + iter.plain.Next() + } + iter.err = iter.resolve() +} + +func (iter *Iterator) Key() []byte { + return iter.key +} + +func (iter *Iterator) Value() []byte { + return iter.value +} + +func (iter *Iterator) Close() error { + err1 := iter.plain.Close() + err2 := iter.history.Close() + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return nil +} + +func (iter *Iterator) Error() error { + return iter.err +} + +func (iter *Iterator) getFromHistory(key []byte, bz []byte, getLatestValue func() []byte) ([]byte, error) { + m := roaring64.New() + _, err := m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return nil, err + } + found, ok := versiondb.SeekInBitmap64(m, uint64(iter.version)+1) + if !ok { + // not changed, use the latest one + return getLatestValue(), nil + } + changesetKey := ChangesetKey(found, prependStoreKey(iter.storeKey, key)) + return iter.changesetDB.Get(changesetKey) +} + +func (iter *Iterator) resolve() (err error) { + for { + var pkey, hkey []byte + if iter.plain.Valid() { + pkey = iter.plain.Key() + } + if iter.history.Valid() { + hkey = iter.history.Key() + } + + iter.status = compareKey(pkey, hkey, iter.reverse) + switch iter.status { + case -2: + // end of iteration + iter.key = nil + iter.value = nil + return nil + case 0: + // find the historial value, or fallback to latest one. + iter.key = hkey + iter.value, err = iter.getFromHistory(hkey, iter.history.Value(), func() []byte { + return iter.plain.Value() + }) + if len(iter.value) > 0 { + return + } + iter.plain.Next() + iter.history.Next() + case 1: + // plain state exhausted or history cursor lag behind + // the key is deleted in plain state, use the history state. + iter.key = hkey + iter.value, err = iter.getFromHistory(hkey, iter.history.Value(), func() []byte { + return nil + }) + if len(iter.value) > 0 { + return + } + iter.history.Next() + case -1: + // history state exhausted or plain cursor lag behind + // the key don't exist in history state, use the plain state value. + iter.key = pkey + iter.value = iter.plain.Value() + return + } + } +} + +// compareKey is similar to bytes.Compare, but it treat empty slice as biggest value. +func compareKey(k1, k2 []byte, reverse bool) int { + switch { + case len(k1) == 0 && len(k2) == 0: + return -2 + case len(k1) == 0: + return 1 + case len(k2) == 0: + return -1 + default: + result := bytes.Compare(k1, k2) + if reverse { + result = -result + } + return result + } +} diff --git a/versiondb/tmdb/store.go b/versiondb/tmdb/store.go new file mode 100644 index 0000000000..d86b2eddbe --- /dev/null +++ b/versiondb/tmdb/store.go @@ -0,0 +1,266 @@ +package tmdb + +import ( + "bytes" + "errors" + "fmt" + + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/prefix" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + gogotypes "github.com/cosmos/gogoproto/types" + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +const latestVersionKey = "s/latest" + +var _ versiondb.VersionStore = (*Store)(nil) + +// Store implements `VersionStore`. +type Store struct { + // latest key-value pairs + plainDB dbm.DB + // history bitmap index of keys + historyDB dbm.DB + // changesets of each blocks + changesetDB dbm.DB +} + +func NewStore(plainDB, historyDB, changesetDB dbm.DB) *Store { + return &Store{plainDB, historyDB, changesetDB} +} + +// PutAtVersion implements VersionStore interface +// TODO reduce allocation within iterations. +func (s *Store) PutAtVersion(version int64, changeSet []types.StoreKVPair) error { + plainBatch := s.plainDB.NewBatch() + defer plainBatch.Close() + historyBatch := s.historyDB.NewBatch() + defer historyBatch.Close() + changesetBatch := s.changesetDB.NewBatch() + defer changesetBatch.Close() + + for _, pair := range changeSet { + key := prependStoreKey(pair.StoreKey, pair.Key) + + if version == 0 { + // genesis state is written into plain state directly + if pair.Delete { + return errors.New("can't delete at genesis") + } + if err := plainBatch.Set(key, pair.Value); err != nil { + return err + } + continue + } + + original, err := s.plainDB.Get(key) + if err != nil { + return err + } + if bytes.Equal(original, pair.Value) { + // do nothing if the value is not changed + continue + } + + // write history index + if err := WriteHistoryIndex(s.historyDB, historyBatch, key, uint64(version)); err != nil { + return err + } + + // write the old value to changeset + if len(original) > 0 { + changesetKey := append(sdk.Uint64ToBigEndian(uint64(version)), key...) + if err := changesetBatch.Set(changesetKey, original); err != nil { + return err + } + } + + // write the new value to plain state + if pair.Delete { + if err := plainBatch.Delete(key); err != nil { + return err + } + } else { + if err := plainBatch.Set(key, pair.Value); err != nil { + return err + } + } + } + + // write latest version to plain state + if err := s.setLatestVersion(plainBatch, version); err != nil { + return err + } + + if err := changesetBatch.WriteSync(); err != nil { + return err + } + if err := historyBatch.WriteSync(); err != nil { + return err + } + return plainBatch.WriteSync() +} + +// GetAtVersion implements VersionStore interface +func (s *Store) GetAtVersion(storeKey string, key []byte, version *int64) ([]byte, error) { + rawKey := prependStoreKey(storeKey, key) + if version == nil { + return s.plainDB.Get(rawKey) + } + + height := *version + + // optimize for latest version + latest, err := s.GetLatestVersion() + if err != nil { + return nil, err + } + if height > latest { + return nil, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return s.plainDB.Get(rawKey) + } + + found, err := SeekHistoryIndex(s.historyDB, rawKey, uint64(height)) + if err != nil { + return nil, err + } + if found < 0 { + // there's no change records found after the target version, query the latest state. + return s.plainDB.Get(rawKey) + } + // get from changeset + changesetKey := ChangesetKey(uint64(found), rawKey) + return s.changesetDB.Get(changesetKey) +} + +// HasAtVersion implements VersionStore interface +func (s *Store) HasAtVersion(storeKey string, key []byte, version *int64) (bool, error) { + rawKey := prependStoreKey(storeKey, key) + if version == nil { + return s.plainDB.Has(rawKey) + } + + height := *version + + // optimize for latest version + latest, err := s.GetLatestVersion() + if err != nil { + return false, err + } + if height > latest { + return false, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return s.plainDB.Has(rawKey) + } + + found, err := SeekHistoryIndex(s.historyDB, rawKey, uint64(height)) + if err != nil { + return false, err + } + if found < 0 { + // there's no change records after the target version, query the latest state. + return s.plainDB.Has(rawKey) + } + // get from changeset + changesetKey := ChangesetKey(uint64(found), rawKey) + return s.changesetDB.Has(changesetKey) +} + +// IteratorAtVersion implements VersionStore interface +func (s *Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) { + storePrefix := StoreKeyPrefix(storeKey) + prefixPlain := prefix.NewStore(dbadapter.Store{DB: s.plainDB}, storePrefix) + if version == nil { + return prefixPlain.Iterator(start, end), nil + } + + // optimize for latest version + height := *version + latest, err := s.GetLatestVersion() + if err != nil { + return nil, err + } + if height > latest { + return nil, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return prefixPlain.Iterator(start, end), nil + } + + prefixHistory := prefix.NewStore(dbadapter.Store{DB: s.historyDB}, storePrefix) + return NewIterator(storeKey, height, prefixPlain, prefixHistory, s.changesetDB, start, end, false) +} + +// ReverseIteratorAtVersion implements VersionStore interface +func (s *Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) { + storePrefix := StoreKeyPrefix(storeKey) + prefixPlain := prefix.NewStore(dbadapter.Store{DB: s.plainDB}, storePrefix) + if version == nil { + return prefixPlain.ReverseIterator(start, end), nil + } + + // optimize for latest version + height := *version + latest, err := s.GetLatestVersion() + if err != nil { + return nil, err + } + if height > latest { + return nil, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return prefixPlain.ReverseIterator(start, end), nil + } + + prefixHistory := prefix.NewStore(dbadapter.Store{DB: s.historyDB}, storePrefix) + return NewIterator(storeKey, height, prefixPlain, prefixHistory, s.changesetDB, start, end, true) +} + +// GetLatestVersion returns the latest version stored in plain state, +// it's committed after the changesets, so the data for this version is guaranteed to be persisted. +// returns -1 if the key don't exists. +func (s *Store) GetLatestVersion() (int64, error) { + bz, err := s.plainDB.Get([]byte(latestVersionKey)) + if err != nil { + return -1, err + } else if bz == nil { + return -1, nil + } + + var latestVersion int64 + + if err := gogotypes.StdInt64Unmarshal(&latestVersion, bz); err != nil { + return -1, err + } + + return latestVersion, nil +} + +func (s *Store) setLatestVersion(plainBatch dbm.Batch, version int64) error { + // write latest version to plain state + bz, err := gogotypes.StdInt64Marshal(version) + if err != nil { + return err + } + return plainBatch.Set([]byte(latestVersionKey), bz) +} + +// ChangesetKey build key changeset db +func ChangesetKey(version uint64, key []byte) []byte { + return append(sdk.Uint64ToBigEndian(version), key...) +} + +func StoreKeyPrefix(storeKey string) []byte { + return []byte("s/k:" + storeKey + "/") +} + +// prependStoreKey prepends storeKey to the key +func prependStoreKey(storeKey string, key []byte) []byte { + return append(StoreKeyPrefix(storeKey), key...) +} diff --git a/versiondb/tmdb/store_test.go b/versiondb/tmdb/store_test.go new file mode 100644 index 0000000000..24453198ff --- /dev/null +++ b/versiondb/tmdb/store_test.go @@ -0,0 +1,14 @@ +package tmdb + +import ( + "testing" + + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +func TestTMDB(t *testing.T) { + versiondb.Run(t, func() versiondb.VersionStore { + return NewStore(dbm.NewMemDB(), dbm.NewMemDB(), dbm.NewMemDB()) + }) +} diff --git a/versiondb/types.go b/versiondb/types.go new file mode 100644 index 0000000000..fab4a6a21d --- /dev/null +++ b/versiondb/types.go @@ -0,0 +1,21 @@ +package versiondb + +import ( + "github.com/cosmos/cosmos-sdk/store/types" +) + +// VersionStore is a versioned storage of a flat key-value pairs. +// it don't need to support merkle proof, so could be implemented in a much more efficient way. +// `nil` version means the latest version. +type VersionStore interface { + GetAtVersion(storeKey string, key []byte, version *int64) ([]byte, error) + HasAtVersion(storeKey string, key []byte, version *int64) (bool, error) + IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) + ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) + GetLatestVersion() (int64, error) + + // Persist the change set of a block, + // the `changeSet` should be ordered by (storeKey, key), + // the version should be latest version plus one. + PutAtVersion(version int64, changeSet []types.StoreKVPair) error +}