Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Implement read VM pool #546

Merged
merged 16 commits into from
Oct 6, 2021
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ require (
github.com/tendermint/tendermint v0.34.12
github.com/tendermint/tm-db v0.6.4
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0
google.golang.org/grpc v1.40.0
gopkg.in/yaml.v2 v2.4.0
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
44 changes: 31 additions & 13 deletions x/wasm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (

// config default values
const (
DefaultContractQueryGasLimit = uint64(3000000)
DefaultContractDebugMode = false
DefaultContractMemoryCacheSize = uint32(500)
DefaultContractQueryGasLimit = uint64(3000000)
DefaultContractDebugMode = false
DefaultWriteVMMemoryCacheSize = uint32(500)
DefaultReadVMMemoryCacheSize = uint32(300)
DefaultNumReadVM = uint32(1)
)

// DBDir used to store wasm data to
Expand All @@ -26,25 +28,35 @@ type Config struct {
// The flag to specify whether print contract logs or not
ContractDebugMode bool `mapstructure:"contract-debug-mode"`

// The WASM VM memory cache size in MiB not bytes
ContractMemoryCacheSize uint32 `mapstructure:"contract-memory-cache-size"`
// The write WASM VM memory cache size in MiB not bytes
WriteVMMemoryCacheSize uint32 `mapstructure:"write-vm-memory-cache-size"`

// The read WASM VM memory cache size in MiB not bytes
ReadVMMemoryCacheSize uint32 `mapstructure:"read-vm-memory-cache-size"`

// The number of read WASM VMs
NumReadVMs uint32 `mapstructure:"num-read-vms"`
}

// DefaultConfig returns the default settings for WasmConfig
func DefaultConfig() *Config {
return &Config{
ContractQueryGasLimit: DefaultContractQueryGasLimit,
ContractDebugMode: DefaultContractDebugMode,
ContractMemoryCacheSize: DefaultContractMemoryCacheSize,
ContractQueryGasLimit: DefaultContractQueryGasLimit,
ContractDebugMode: DefaultContractDebugMode,
WriteVMMemoryCacheSize: DefaultWriteVMMemoryCacheSize,
ReadVMMemoryCacheSize: DefaultReadVMMemoryCacheSize,
NumReadVMs: DefaultNumReadVM,
}
}

// GetConfig load config values from the app options
func GetConfig(appOpts servertypes.AppOptions) *Config {
return &Config{
ContractQueryGasLimit: cast.ToUint64(appOpts.Get("wasm.contract-query-gas-limit")),
ContractDebugMode: cast.ToBool(appOpts.Get("wasm.contract-debug-mode")),
ContractMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.contract-memory-cache-size")),
ContractQueryGasLimit: cast.ToUint64(appOpts.Get("wasm.contract-query-gas-limit")),
ContractDebugMode: cast.ToBool(appOpts.Get("wasm.contract-debug-mode")),
WriteVMMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.write-vm-memory-cache-size")),
ReadVMMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.read-vm-memory-cache-size")),
NumReadVMs: cast.ToUint32(appOpts.Get("wasm.num-read-vms")),
}
}

Expand All @@ -59,6 +71,12 @@ contract-query-gas-limit = "{{ .WASMConfig.ContractQueryGasLimit }}"
# The flag to specify whether print contract logs or not
contract-debug-mode = "{{ .WASMConfig.ContractDebugMode }}"

# The WASM VM memory cache size in MiB not bytes
contract-memory-cache-size = "{{ .WASMConfig.ContractMemoryCacheSize }}"
# The write WASM VM memory cache size in MiB not bytes
write-vm-memory-cache-size = "{{ .WASMConfig.WriteVMMemoryCacheSize }}"

# The read WASM VM memory cache size in MiB not bytes
read-vm-memory-cache-size = "{{ .WASMConfig.WriteVMMemoryCacheSize }}"

# The number of read WASM VMs
num-read-vms = "{{ .WASMConfig.NumReadVMs }}"
`
15 changes: 11 additions & 4 deletions x/wasm/keeper/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (k Keeper) StoreCode(ctx sdk.Context, creator sdk.AccAddress, wasmCode []by

// MigrateCode uploads and compiles a WASM contract bytecode for the existing code id.
// After columbus-5 update, all contract code will be removed from the store
// due to in-compatibility between CosmWasm@v0.10.x and CosmWasm@v0.14.x
// The migration only can be executed by once after columbus-5 update.
// due to in-compatibility between CosmWasm@v0.10.x and CosmWasm@v0.16.x
// The migration can be executed by once after columbus-5 update.
// TODO - remove after columbus-5 update
func (k Keeper) MigrateCode(ctx sdk.Context, codeID uint64, creator sdk.AccAddress, wasmCode []byte) error {
codeInfo, err := k.GetCodeInfo(ctx, codeID)
Expand Down Expand Up @@ -420,7 +420,7 @@ func (k Keeper) queryToStore(ctx sdk.Context, contractAddress sdk.AccAddress, ke
return prefixStore.Get(key)
}

func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress, queryMsg []byte) ([]byte, error) {
func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress, queryMsg []byte, wasmVMs ...types.WasmerEngine) ([]byte, error) {
defer telemetry.MeasureSince(time.Now(), "wasm", "contract", "query-smart")
ctx.GasMeter().ConsumeGas(types.InstantiateContractCosts(len(queryMsg)), "Loading CosmWasm module: query")

Expand All @@ -430,7 +430,14 @@ func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress,
}

env := types.NewEnv(ctx, contractAddress)
queryResult, gasUsed, err := k.wasmVM.Query(

// when the vm is given, use that given vm
wasmVM := k.wasmVM
if len(wasmVMs) != 0 {
wasmVM = wasmVMs[0]
}

queryResult, gasUsed, err := wasmVM.Query(
codeInfo.CodeHash,
env,
queryMsg,
Expand Down
73 changes: 55 additions & 18 deletions x/wasm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"encoding/binary"
"fmt"
"path/filepath"
"sync"

"github.com/tendermint/tendermint/libs/log"
"golang.org/x/sync/semaphore"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
Expand All @@ -32,7 +34,11 @@ type Keeper struct {
serviceRouter types.MsgServiceRouter
queryRouter types.GRPCQueryRouter

wasmVM types.WasmerEngine
wasmVM types.WasmerEngine
wasmReadVMPool []types.WasmerEngine
wasmReadVMSemaphore *semaphore.Weighted
wasmReadVMMutex *sync.Mutex

querier types.Querier
msgParser types.MsgParser

Expand All @@ -53,35 +59,66 @@ func NewKeeper(
supportedFeatures string,
homePath string,
wasmConfig *config.Config) Keeper {
wasmVM, err := wasmvm.NewVM(

// set KeyTable if it has not already been set
if !paramspace.HasKeyTable() {
paramspace = paramspace.WithKeyTable(types.ParamKeyTable())
}

writeWasmVM, err := wasmvm.NewVM(
filepath.Join(homePath, config.DBDir),
supportedFeatures,
types.ContractMemoryLimit,
wasmConfig.ContractDebugMode,
wasmConfig.ContractMemoryCacheSize,
wasmConfig.WriteVMMemoryCacheSize,
)

if err != nil {
panic(err)
}

// set KeyTable if it has not already been set
if !paramspace.HasKeyTable() {
paramspace = paramspace.WithKeyTable(types.ParamKeyTable())
// prevent zero read vm
if wasmConfig.NumReadVMs == 0 {
wasmConfig.NumReadVMs = config.DefaultNumReadVM
}

// prevent zero read vm cache
if wasmConfig.ReadVMMemoryCacheSize == 0 {
wasmConfig.ReadVMMemoryCacheSize = config.DefaultReadVMMemoryCacheSize
}

numReadVms := wasmConfig.NumReadVMs
wasmReadVMPool := make([]types.WasmerEngine, numReadVms)
for i := uint32(0); i < numReadVms; i++ {
wasmReadVMPool[i], err = wasmvm.NewVM(
filepath.Join(homePath, config.DBDir),
supportedFeatures,
types.ContractMemoryLimit,
wasmConfig.ContractDebugMode,
wasmConfig.ReadVMMemoryCacheSize,
)

if err != nil {
panic(err)
}
}

return Keeper{
storeKey: storeKey,
cdc: cdc,
paramSpace: paramspace,
wasmVM: wasmVM,
accountKeeper: accountKeeper,
bankKeeper: bankKeeper,
treasuryKeeper: treasuryKeeper,
serviceRouter: serviceRouter,
queryRouter: queryRouter,
wasmConfig: wasmConfig,
msgParser: types.NewWasmMsgParser(),
querier: types.NewWasmQuerier(),
storeKey: storeKey,
cdc: cdc,
paramSpace: paramspace,
wasmVM: writeWasmVM,
wasmReadVMPool: wasmReadVMPool,
wasmReadVMSemaphore: semaphore.NewWeighted(int64(numReadVms)),
wasmReadVMMutex: &sync.Mutex{},
accountKeeper: accountKeeper,
bankKeeper: bankKeeper,
treasuryKeeper: treasuryKeeper,
serviceRouter: serviceRouter,
queryRouter: queryRouter,
wasmConfig: wasmConfig,
msgParser: types.NewWasmMsgParser(),
querier: types.NewWasmQuerier(),
}
}

Expand Down
9 changes: 8 additions & 1 deletion x/wasm/keeper/legacy_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ func queryContractStore(ctx sdk.Context, req abci.RequestQuery, k Keeper, legacy
return nil, sdkerrors.Wrap(sdkerrors.ErrJSONUnmarshal, err.Error())
}

wasmVM, err := k.getWasmVM(sdk.WrapSDKContext(ctx))
if err != nil {
return nil, sdkerrors.Wrap(types.ErrContractQueryFailed, err.Error())
}

// recover from out-of-gas panic
defer func() {
k.putWasmVM(wasmVM)

if r := recover(); r != nil {
switch rType := r.(type) {
case sdk.ErrorOutOfGas:
Expand All @@ -140,7 +147,7 @@ func queryContractStore(ctx sdk.Context, req abci.RequestQuery, k Keeper, legacy
}
}()

bz, err = k.queryToContract(ctx, params.ContractAddress, params.Msg)
bz, err = k.queryToContract(ctx, params.ContractAddress, params.Msg, wasmVM)

return
}
Expand Down
55 changes: 55 additions & 0 deletions x/wasm/keeper/legacy_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -94,3 +95,57 @@ func TestLegacyParams(t *testing.T) {
require.NoError(t, err)
require.Equal(t, input.WasmKeeper.GetParams(input.Ctx), params)
}

func TestLegacyMultipleGoroutines(t *testing.T) {
input := CreateTestInput(t)
ctx, accKeeper, bankKeeper, keeper := input.Ctx, input.AccKeeper, input.BankKeeper, input.WasmKeeper

deposit := sdk.NewCoins(sdk.NewInt64Coin("denom", 100000))
topUp := sdk.NewCoins(sdk.NewInt64Coin("denom", 5000))
creator := createFakeFundedAccount(ctx, accKeeper, bankKeeper, deposit.Add(deposit...))
anyAddr := createFakeFundedAccount(ctx, accKeeper, bankKeeper, topUp)

wasmCode, err := ioutil.ReadFile("./testdata/hackatom.wasm")
require.NoError(t, err)

contractID, err := keeper.StoreCode(ctx, creator, wasmCode)
require.NoError(t, err)

_, _, bob := keyPubAddr()
initMsg := HackatomExampleInitMsg{
Verifier: anyAddr,
Beneficiary: bob,
}
initMsgBz, err := json.Marshal(initMsg)
require.NoError(t, err)

addr, _, err := keeper.InstantiateContract(ctx, contractID, creator, sdk.AccAddress{}, initMsgBz, deposit)
require.NoError(t, err)

contractModel := []types.Model{
{Key: []byte("foo"), Value: []byte(`"bar"`)},
{Key: []byte{0x0, 0x1}, Value: []byte(`{"count":8}`)},
}

keeper.SetContractStore(ctx, addr, contractModel)

querier := NewLegacyQuerier(keeper, input.Cdc)

wg := &sync.WaitGroup{}
testCases := 100
wg.Add(testCases)
for n := 0; n < testCases; n++ {
go func() {
// query contract []byte(`{"verifier":{}}`)
bz, err := input.Cdc.MarshalJSON(types.NewQueryContractParams(addr, []byte(`{"verifier":{}}`)))
require.NoError(t, err)

res, err := querier(ctx, []string{types.QueryContractStore}, abci.RequestQuery{Data: []byte(bz)})
require.NoError(t, err)
require.Equal(t, fmt.Sprintf(`{"verifier":"%s"}`, anyAddr.String()), string(res))

wg.Done()
}()
}
wg.Wait()
}
31 changes: 31 additions & 0 deletions x/wasm/keeper/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package keeper

import (
"context"

"github.com/terra-money/core/x/wasm/types"
)

var n = 0

func (k Keeper) getWasmVM(ctx context.Context) (types.WasmerEngine, error) {
err := k.wasmReadVMSemaphore.Acquire(ctx, 1)
if err != nil {
return nil, err
}

k.wasmReadVMMutex.Lock()
wasmVM := k.wasmReadVMPool[0]
k.wasmReadVMPool = k.wasmReadVMPool[1:]
k.wasmReadVMMutex.Unlock()

return wasmVM, nil
}

func (k Keeper) putWasmVM(wasmVM types.WasmerEngine) {
k.wasmReadVMMutex.Lock()
k.wasmReadVMPool = append(k.wasmReadVMPool, wasmVM)
k.wasmReadVMMutex.Unlock()

k.wasmReadVMSemaphore.Release(1)
}
9 changes: 8 additions & 1 deletion x/wasm/keeper/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,15 @@ func (q querier) ContractStore(c context.Context, req *types.QueryContractStoreR
return nil, status.Error(codes.InvalidArgument, err.Error())
}

wasmVM, err := q.getWasmVM(c)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// recover from out-of-gas panic
defer func() {
q.putWasmVM(wasmVM)

if r := recover(); r != nil {
switch rType := r.(type) {
// TODO: Use ErrOutOfGas instead of ErrorOutOfGas which would allow us
Expand All @@ -114,7 +121,7 @@ func (q querier) ContractStore(c context.Context, req *types.QueryContractStoreR
}()

var bz []byte
bz, err = q.queryToContract(ctx, contractAddr, req.QueryMsg)
bz, err = q.queryToContract(ctx, contractAddr, req.QueryMsg, wasmVM)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
Loading