Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for warp off chain messages #1028

Merged
merged 2 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ava-labs/subnet-evm/core/txpool"
"github.com/ava-labs/subnet-evm/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/spf13/cast"
)

Expand Down Expand Up @@ -208,6 +209,12 @@ type Config struct {
// * 0: means no limit
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
TxLookupLimit uint64 `json:"tx-lookup-limit"`

// WarpOffChainMessages encodes off-chain messages (unrelated to any on-chain event ie. block or AddressedCall)
// that the node should be willing to sign.
// Note: only supports AddressedCall payloads as defined here:
// https://github.com/ava-labs/avalanchego/tree/7623ffd4be915a5185c9ed5e11fa9be15a6e1f00/vms/platformvm/warp/payload#addressedcall
WarpOffChainMessages []hexutil.Bytes `json:"warp-off-chain-messages"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down
11 changes: 9 additions & 2 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,15 @@ func (vm *VM) Initialize(
vm.Network = peer.NewNetwork(p2pNetwork, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.client = peer.NewNetworkClient(vm.Network)

// initialize warp backend
vm.warpBackend = warp.NewBackend(vm.ctx.NetworkID, vm.ctx.ChainID, vm.ctx.WarpSigner, vm, vm.warpDB, warpSignatureCacheSize)
// Initialize warp backend
offchainWarpMessages := make([][]byte, len(vm.config.WarpOffChainMessages))
for i, hexMsg := range vm.config.WarpOffChainMessages {
offchainWarpMessages[i] = []byte(hexMsg)
}
vm.warpBackend, err = warp.NewBackend(vm.ctx.NetworkID, vm.ctx.ChainID, vm.ctx.WarpSigner, vm, vm.warpDB, warpSignatureCacheSize, offchainWarpMessages)
if err != nil {
return err
}

// clear warpdb on initialization if config enabled
if vm.config.PruneWarpDB {
Expand Down
73 changes: 54 additions & 19 deletions warp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package warp

import (
"context"
"errors"
"fmt"

"github.com/ava-labs/avalanchego/cache"
Expand All @@ -19,7 +20,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)

var _ Backend = &backend{}
var (
_ Backend = &backend{}
errParsingOffChainMessage = errors.New("failed to parse off-chain message")
)

const batchSize = ethdb.IdealBatchSize

Expand Down Expand Up @@ -48,28 +52,56 @@ type Backend interface {

// backend implements Backend, keeps track of warp messages, and generates message signatures.
type backend struct {
networkID uint32
sourceChainID ids.ID
db database.Database
warpSigner avalancheWarp.Signer
blockClient BlockClient
messageSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
blockSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
messageCache *cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]
networkID uint32
sourceChainID ids.ID
db database.Database
warpSigner avalancheWarp.Signer
blockClient BlockClient
messageSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
blockSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
messageCache *cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]
offchainAddressedCallMsgs map[ids.ID]*avalancheWarp.UnsignedMessage
}

// NewBackend creates a new Backend, and initializes the signature cache and message tracking database.
func NewBackend(networkID uint32, sourceChainID ids.ID, warpSigner avalancheWarp.Signer, blockClient BlockClient, db database.Database, cacheSize int) Backend {
return &backend{
networkID: networkID,
sourceChainID: sourceChainID,
db: db,
warpSigner: warpSigner,
blockClient: blockClient,
messageSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
blockSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
messageCache: &cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]{Size: cacheSize},
func NewBackend(
networkID uint32,
sourceChainID ids.ID,
warpSigner avalancheWarp.Signer,
blockClient BlockClient,
db database.Database,
cacheSize int,
offchainMessages [][]byte,
) (Backend, error) {
b := &backend{
networkID: networkID,
sourceChainID: sourceChainID,
db: db,
warpSigner: warpSigner,
blockClient: blockClient,
messageSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
blockSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
messageCache: &cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]{Size: cacheSize},
offchainAddressedCallMsgs: make(map[ids.ID]*avalancheWarp.UnsignedMessage),
}
return b, b.initOffChainMessages(offchainMessages)
}

func (b *backend) initOffChainMessages(offchainMessages [][]byte) error {
for i, offchainMsg := range offchainMessages {
unsignedMsg, err := avalancheWarp.ParseUnsignedMessage(offchainMsg)
if err != nil {
return fmt.Errorf("%w at index %d: %w", errParsingOffChainMessage, i, err)
}

_, err = payload.ParseAddressedCall(unsignedMsg.Payload)
if err != nil {
return fmt.Errorf("%w at index %d as AddressedCall: %w", errParsingOffChainMessage, i, err)
}
b.offchainAddressedCallMsgs[unsignedMsg.ID()] = unsignedMsg
}

return nil
}

func (b *backend) Clear() error {
Expand Down Expand Up @@ -160,6 +192,9 @@ func (b *backend) GetMessage(messageID ids.ID) (*avalancheWarp.UnsignedMessage,
if message, ok := b.messageCache.Get(messageID); ok {
return message, nil
}
if message, ok := b.offchainAddressedCallMsgs[messageID]; ok {
return message, nil
}

unsignedMessageBytes, err := b.db.Get(messageID[:])
if err != nil {
Expand Down
103 changes: 81 additions & 22 deletions warp/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/hashing"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
Expand All @@ -22,18 +23,32 @@ import (
)

var (
networkID uint32 = 54321
sourceChainID = ids.GenerateTestID()
testPayload = []byte("test")
networkID uint32 = 54321
sourceChainID = ids.GenerateTestID()
testSourceAddress = utils.RandomBytes(20)
testPayload = []byte("test")
testUnsignedMessage *avalancheWarp.UnsignedMessage
)

func init() {
testAddressedCallPayload, err := payload.NewAddressedCall(testSourceAddress, testPayload)
if err != nil {
panic(err)
}
testUnsignedMessage, err = avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testAddressedCallPayload.Bytes())
if err != nil {
panic(err)
}
}

func TestClearDB(t *testing.T) {
db := memdb.New()

sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backendIntf := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500)
backendIntf, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500, nil)
require.NoError(t, err)
backend, ok := backendIntf.(*backend)
require.True(t, ok)

Expand Down Expand Up @@ -76,20 +91,19 @@ func TestAddAndGetValidMessage(t *testing.T) {
sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backend := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500)

// Create a new unsigned message and add it to the warp backend.
unsignedMsg, err := avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testPayload)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500, nil)
require.NoError(t, err)
err = backend.AddMessage(unsignedMsg)

// Add testUnsignedMessage to the warp backend
err = backend.AddMessage(testUnsignedMessage)
require.NoError(t, err)

// Verify that a signature is returned successfully, and compare to expected signature.
messageID := unsignedMsg.ID()
messageID := testUnsignedMessage.ID()
signature, err := backend.GetMessageSignature(messageID)
require.NoError(t, err)

expectedSig, err := warpSigner.Sign(unsignedMsg)
expectedSig, err := warpSigner.Sign(testUnsignedMessage)
require.NoError(t, err)
require.Equal(t, expectedSig, signature[:])
}
Expand All @@ -100,12 +114,11 @@ func TestAddAndGetUnknownMessage(t *testing.T) {
sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backend := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500)
unsignedMsg, err := avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testPayload)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500, nil)
require.NoError(t, err)

// Try getting a signature for a message that was not added.
messageID := unsignedMsg.ID()
messageID := testUnsignedMessage.ID()
_, err = backend.GetMessageSignature(messageID)
require.Error(t, err)
}
Expand Down Expand Up @@ -133,7 +146,8 @@ func TestGetBlockSignature(t *testing.T) {
sk, err := bls.NewSecretKey()
require.NoError(err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backend := NewBackend(networkID, sourceChainID, warpSigner, testVM, db, 500)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, testVM, db, 500, nil)
require.NoError(err)

blockHashPayload, err := payload.NewHash(blkID)
require.NoError(err)
Expand All @@ -158,20 +172,65 @@ func TestZeroSizedCache(t *testing.T) {
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)

// Verify zero sized cache works normally, because the lru cache will be initialized to size 1 for any size parameter <= 0.
backend := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 0)

// Create a new unsigned message and add it to the warp backend.
unsignedMsg, err := avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testPayload)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 0, nil)
require.NoError(t, err)
err = backend.AddMessage(unsignedMsg)

// Add testUnsignedMessage to the warp backend
err = backend.AddMessage(testUnsignedMessage)
require.NoError(t, err)

// Verify that a signature is returned successfully, and compare to expected signature.
messageID := unsignedMsg.ID()
messageID := testUnsignedMessage.ID()
signature, err := backend.GetMessageSignature(messageID)
require.NoError(t, err)

expectedSig, err := warpSigner.Sign(unsignedMsg)
expectedSig, err := warpSigner.Sign(testUnsignedMessage)
require.NoError(t, err)
require.Equal(t, expectedSig, signature[:])
}

func TestOffChainMessages(t *testing.T) {
type test struct {
offchainMessages [][]byte
check func(require *require.Assertions, b Backend)
err error
}
sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)

for name, test := range map[string]test{
"no offchain messages": {},
"single off-chain message": {
offchainMessages: [][]byte{
testUnsignedMessage.Bytes(),
},
check: func(require *require.Assertions, b Backend) {
msg, err := b.GetMessage(testUnsignedMessage.ID())
require.NoError(err)
require.Equal(testUnsignedMessage.Bytes(), msg.Bytes())

signature, err := b.GetMessageSignature(testUnsignedMessage.ID())
require.NoError(err)
expectedSignatureBytes, err := warpSigner.Sign(msg)
require.NoError(err)
require.Equal(expectedSignatureBytes, signature[:])
},
},
"invalid message": {
offchainMessages: [][]byte{{1, 2, 3}},
err: errParsingOffChainMessage,
},
} {
t.Run(name, func(t *testing.T) {
require := require.New(t)
db := memdb.New()

backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 0, test.offchainMessages)
require.ErrorIs(err, test.err)
if test.check != nil {
test.check(require, backend)
}
})
}
}
34 changes: 30 additions & 4 deletions warp/handlers/signature_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/avalanchego/vms/platformvm/warp/payload"
"github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/ava-labs/subnet-evm/utils"
"github.com/ava-labs/subnet-evm/warp"
Expand All @@ -27,17 +28,25 @@ func TestMessageSignatureHandler(t *testing.T) {
snowCtx := utils.TestSnowContext()
blsSecretKey, err := bls.NewSecretKey()
require.NoError(t, err)

warpSigner := avalancheWarp.NewSigner(blsSecretKey, snowCtx.NetworkID, snowCtx.ChainID)
backend := warp.NewBackend(snowCtx.NetworkID, snowCtx.ChainID, warpSigner, &block.TestVM{TestVM: common.TestVM{T: t}}, database, 100)

msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, []byte("test"))
addressedPayload, err := payload.NewAddressedCall([]byte{1, 2, 3}, []byte{1, 2, 3})
require.NoError(t, err)
offchainMessage, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, addressedPayload.Bytes())
require.NoError(t, err)

backend, err := warp.NewBackend(snowCtx.NetworkID, snowCtx.ChainID, warpSigner, &block.TestVM{TestVM: common.TestVM{T: t}}, database, 100, [][]byte{offchainMessage.Bytes()})
require.NoError(t, err)

msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, []byte("test"))
require.NoError(t, err)
messageID := msg.ID()
require.NoError(t, backend.AddMessage(msg))
signature, err := backend.GetMessageSignature(messageID)
require.NoError(t, err)
offchainSignature, err := backend.GetMessageSignature(offchainMessage.ID())
require.NoError(t, err)

unknownMessageID := ids.GenerateTestID()

emptySignature := [bls.SignatureLen]byte{}
Expand All @@ -61,6 +70,21 @@ func TestMessageSignatureHandler(t *testing.T) {
require.EqualValues(t, 0, stats.blockSignatureMiss.Count())
},
},
"offchain message": {
setup: func() (request message.MessageSignatureRequest, expectedResponse []byte) {
return message.MessageSignatureRequest{
MessageID: offchainMessage.ID(),
}, offchainSignature[:]
},
verifyStats: func(t *testing.T, stats *handlerStats) {
require.EqualValues(t, 1, stats.messageSignatureRequest.Count())
require.EqualValues(t, 1, stats.messageSignatureHit.Count())
require.EqualValues(t, 0, stats.messageSignatureMiss.Count())
require.EqualValues(t, 0, stats.blockSignatureRequest.Count())
require.EqualValues(t, 0, stats.blockSignatureHit.Count())
require.EqualValues(t, 0, stats.blockSignatureMiss.Count())
},
},
"unknown message": {
setup: func() (request message.MessageSignatureRequest, expectedResponse []byte) {
return message.MessageSignatureRequest{
Expand Down Expand Up @@ -125,14 +149,16 @@ func TestBlockSignatureHandler(t *testing.T) {
return nil, errors.New("invalid blockID")
},
}
backend := warp.NewBackend(
backend, err := warp.NewBackend(
snowCtx.NetworkID,
snowCtx.ChainID,
warpSigner,
testVM,
database,
100,
nil,
)
require.NoError(t, err)

signature, err := backend.GetBlockSignature(blkID)
require.NoError(t, err)
Expand Down
Loading