Skip to content

Commit

Permalink
Add support for warp off chain messages (#1028)
Browse files Browse the repository at this point in the history
Co-authored-by: Ceyhun Onur <ceyhun.onur@avalabs.org>
  • Loading branch information
aaronbuchwald and ceyonur authored Dec 27, 2023
1 parent 468534b commit 9566457
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 47 deletions.
7 changes: 7 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ava-labs/subnet-evm/core/txpool"
"github.com/ava-labs/subnet-evm/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/spf13/cast"
)

Expand Down Expand Up @@ -208,6 +209,12 @@ type Config struct {
// * 0: means no limit
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
TxLookupLimit uint64 `json:"tx-lookup-limit"`

// WarpOffChainMessages encodes off-chain messages (unrelated to any on-chain event ie. block or AddressedCall)
// that the node should be willing to sign.
// Note: only supports AddressedCall payloads as defined here:
// https://github.com/ava-labs/avalanchego/tree/7623ffd4be915a5185c9ed5e11fa9be15a6e1f00/vms/platformvm/warp/payload#addressedcall
WarpOffChainMessages []hexutil.Bytes `json:"warp-off-chain-messages"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down
11 changes: 9 additions & 2 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,15 @@ func (vm *VM) Initialize(
vm.Network = peer.NewNetwork(p2pNetwork, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.client = peer.NewNetworkClient(vm.Network)

// initialize warp backend
vm.warpBackend = warp.NewBackend(vm.ctx.NetworkID, vm.ctx.ChainID, vm.ctx.WarpSigner, vm, vm.warpDB, warpSignatureCacheSize)
// Initialize warp backend
offchainWarpMessages := make([][]byte, len(vm.config.WarpOffChainMessages))
for i, hexMsg := range vm.config.WarpOffChainMessages {
offchainWarpMessages[i] = []byte(hexMsg)
}
vm.warpBackend, err = warp.NewBackend(vm.ctx.NetworkID, vm.ctx.ChainID, vm.ctx.WarpSigner, vm, vm.warpDB, warpSignatureCacheSize, offchainWarpMessages)
if err != nil {
return err
}

// clear warpdb on initialization if config enabled
if vm.config.PruneWarpDB {
Expand Down
73 changes: 54 additions & 19 deletions warp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package warp

import (
"context"
"errors"
"fmt"

"github.com/ava-labs/avalanchego/cache"
Expand All @@ -19,7 +20,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)

var _ Backend = &backend{}
var (
_ Backend = &backend{}
errParsingOffChainMessage = errors.New("failed to parse off-chain message")
)

const batchSize = ethdb.IdealBatchSize

Expand Down Expand Up @@ -48,28 +52,56 @@ type Backend interface {

// backend implements Backend, keeps track of warp messages, and generates message signatures.
type backend struct {
networkID uint32
sourceChainID ids.ID
db database.Database
warpSigner avalancheWarp.Signer
blockClient BlockClient
messageSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
blockSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
messageCache *cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]
networkID uint32
sourceChainID ids.ID
db database.Database
warpSigner avalancheWarp.Signer
blockClient BlockClient
messageSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
blockSignatureCache *cache.LRU[ids.ID, [bls.SignatureLen]byte]
messageCache *cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]
offchainAddressedCallMsgs map[ids.ID]*avalancheWarp.UnsignedMessage
}

// NewBackend creates a new Backend, and initializes the signature cache and message tracking database.
func NewBackend(networkID uint32, sourceChainID ids.ID, warpSigner avalancheWarp.Signer, blockClient BlockClient, db database.Database, cacheSize int) Backend {
return &backend{
networkID: networkID,
sourceChainID: sourceChainID,
db: db,
warpSigner: warpSigner,
blockClient: blockClient,
messageSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
blockSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
messageCache: &cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]{Size: cacheSize},
func NewBackend(
networkID uint32,
sourceChainID ids.ID,
warpSigner avalancheWarp.Signer,
blockClient BlockClient,
db database.Database,
cacheSize int,
offchainMessages [][]byte,
) (Backend, error) {
b := &backend{
networkID: networkID,
sourceChainID: sourceChainID,
db: db,
warpSigner: warpSigner,
blockClient: blockClient,
messageSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
blockSignatureCache: &cache.LRU[ids.ID, [bls.SignatureLen]byte]{Size: cacheSize},
messageCache: &cache.LRU[ids.ID, *avalancheWarp.UnsignedMessage]{Size: cacheSize},
offchainAddressedCallMsgs: make(map[ids.ID]*avalancheWarp.UnsignedMessage),
}
return b, b.initOffChainMessages(offchainMessages)
}

func (b *backend) initOffChainMessages(offchainMessages [][]byte) error {
for i, offchainMsg := range offchainMessages {
unsignedMsg, err := avalancheWarp.ParseUnsignedMessage(offchainMsg)
if err != nil {
return fmt.Errorf("%w at index %d: %w", errParsingOffChainMessage, i, err)
}

_, err = payload.ParseAddressedCall(unsignedMsg.Payload)
if err != nil {
return fmt.Errorf("%w at index %d as AddressedCall: %w", errParsingOffChainMessage, i, err)
}
b.offchainAddressedCallMsgs[unsignedMsg.ID()] = unsignedMsg
}

return nil
}

func (b *backend) Clear() error {
Expand Down Expand Up @@ -160,6 +192,9 @@ func (b *backend) GetMessage(messageID ids.ID) (*avalancheWarp.UnsignedMessage,
if message, ok := b.messageCache.Get(messageID); ok {
return message, nil
}
if message, ok := b.offchainAddressedCallMsgs[messageID]; ok {
return message, nil
}

unsignedMessageBytes, err := b.db.Get(messageID[:])
if err != nil {
Expand Down
103 changes: 81 additions & 22 deletions warp/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/hashing"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
Expand All @@ -22,18 +23,32 @@ import (
)

var (
networkID uint32 = 54321
sourceChainID = ids.GenerateTestID()
testPayload = []byte("test")
networkID uint32 = 54321
sourceChainID = ids.GenerateTestID()
testSourceAddress = utils.RandomBytes(20)
testPayload = []byte("test")
testUnsignedMessage *avalancheWarp.UnsignedMessage
)

func init() {
testAddressedCallPayload, err := payload.NewAddressedCall(testSourceAddress, testPayload)
if err != nil {
panic(err)
}
testUnsignedMessage, err = avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testAddressedCallPayload.Bytes())
if err != nil {
panic(err)
}
}

func TestClearDB(t *testing.T) {
db := memdb.New()

sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backendIntf := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500)
backendIntf, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500, nil)
require.NoError(t, err)
backend, ok := backendIntf.(*backend)
require.True(t, ok)

Expand Down Expand Up @@ -76,20 +91,19 @@ func TestAddAndGetValidMessage(t *testing.T) {
sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backend := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500)

// Create a new unsigned message and add it to the warp backend.
unsignedMsg, err := avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testPayload)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500, nil)
require.NoError(t, err)
err = backend.AddMessage(unsignedMsg)

// Add testUnsignedMessage to the warp backend
err = backend.AddMessage(testUnsignedMessage)
require.NoError(t, err)

// Verify that a signature is returned successfully, and compare to expected signature.
messageID := unsignedMsg.ID()
messageID := testUnsignedMessage.ID()
signature, err := backend.GetMessageSignature(messageID)
require.NoError(t, err)

expectedSig, err := warpSigner.Sign(unsignedMsg)
expectedSig, err := warpSigner.Sign(testUnsignedMessage)
require.NoError(t, err)
require.Equal(t, expectedSig, signature[:])
}
Expand All @@ -100,12 +114,11 @@ func TestAddAndGetUnknownMessage(t *testing.T) {
sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backend := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500)
unsignedMsg, err := avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testPayload)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 500, nil)
require.NoError(t, err)

// Try getting a signature for a message that was not added.
messageID := unsignedMsg.ID()
messageID := testUnsignedMessage.ID()
_, err = backend.GetMessageSignature(messageID)
require.Error(t, err)
}
Expand Down Expand Up @@ -133,7 +146,8 @@ func TestGetBlockSignature(t *testing.T) {
sk, err := bls.NewSecretKey()
require.NoError(err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)
backend := NewBackend(networkID, sourceChainID, warpSigner, testVM, db, 500)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, testVM, db, 500, nil)
require.NoError(err)

blockHashPayload, err := payload.NewHash(blkID)
require.NoError(err)
Expand All @@ -158,20 +172,65 @@ func TestZeroSizedCache(t *testing.T) {
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)

// Verify zero sized cache works normally, because the lru cache will be initialized to size 1 for any size parameter <= 0.
backend := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 0)

// Create a new unsigned message and add it to the warp backend.
unsignedMsg, err := avalancheWarp.NewUnsignedMessage(networkID, sourceChainID, testPayload)
backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 0, nil)
require.NoError(t, err)
err = backend.AddMessage(unsignedMsg)

// Add testUnsignedMessage to the warp backend
err = backend.AddMessage(testUnsignedMessage)
require.NoError(t, err)

// Verify that a signature is returned successfully, and compare to expected signature.
messageID := unsignedMsg.ID()
messageID := testUnsignedMessage.ID()
signature, err := backend.GetMessageSignature(messageID)
require.NoError(t, err)

expectedSig, err := warpSigner.Sign(unsignedMsg)
expectedSig, err := warpSigner.Sign(testUnsignedMessage)
require.NoError(t, err)
require.Equal(t, expectedSig, signature[:])
}

func TestOffChainMessages(t *testing.T) {
type test struct {
offchainMessages [][]byte
check func(require *require.Assertions, b Backend)
err error
}
sk, err := bls.NewSecretKey()
require.NoError(t, err)
warpSigner := avalancheWarp.NewSigner(sk, networkID, sourceChainID)

for name, test := range map[string]test{
"no offchain messages": {},
"single off-chain message": {
offchainMessages: [][]byte{
testUnsignedMessage.Bytes(),
},
check: func(require *require.Assertions, b Backend) {
msg, err := b.GetMessage(testUnsignedMessage.ID())
require.NoError(err)
require.Equal(testUnsignedMessage.Bytes(), msg.Bytes())

signature, err := b.GetMessageSignature(testUnsignedMessage.ID())
require.NoError(err)
expectedSignatureBytes, err := warpSigner.Sign(msg)
require.NoError(err)
require.Equal(expectedSignatureBytes, signature[:])
},
},
"invalid message": {
offchainMessages: [][]byte{{1, 2, 3}},
err: errParsingOffChainMessage,
},
} {
t.Run(name, func(t *testing.T) {
require := require.New(t)
db := memdb.New()

backend, err := NewBackend(networkID, sourceChainID, warpSigner, nil, db, 0, test.offchainMessages)
require.ErrorIs(err, test.err)
if test.check != nil {
test.check(require, backend)
}
})
}
}
34 changes: 30 additions & 4 deletions warp/handlers/signature_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/avalanchego/vms/platformvm/warp/payload"
"github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/ava-labs/subnet-evm/utils"
"github.com/ava-labs/subnet-evm/warp"
Expand All @@ -27,17 +28,25 @@ func TestMessageSignatureHandler(t *testing.T) {
snowCtx := utils.TestSnowContext()
blsSecretKey, err := bls.NewSecretKey()
require.NoError(t, err)

warpSigner := avalancheWarp.NewSigner(blsSecretKey, snowCtx.NetworkID, snowCtx.ChainID)
backend := warp.NewBackend(snowCtx.NetworkID, snowCtx.ChainID, warpSigner, &block.TestVM{TestVM: common.TestVM{T: t}}, database, 100)

msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, []byte("test"))
addressedPayload, err := payload.NewAddressedCall([]byte{1, 2, 3}, []byte{1, 2, 3})
require.NoError(t, err)
offchainMessage, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, addressedPayload.Bytes())
require.NoError(t, err)

backend, err := warp.NewBackend(snowCtx.NetworkID, snowCtx.ChainID, warpSigner, &block.TestVM{TestVM: common.TestVM{T: t}}, database, 100, [][]byte{offchainMessage.Bytes()})
require.NoError(t, err)

msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, []byte("test"))
require.NoError(t, err)
messageID := msg.ID()
require.NoError(t, backend.AddMessage(msg))
signature, err := backend.GetMessageSignature(messageID)
require.NoError(t, err)
offchainSignature, err := backend.GetMessageSignature(offchainMessage.ID())
require.NoError(t, err)

unknownMessageID := ids.GenerateTestID()

emptySignature := [bls.SignatureLen]byte{}
Expand All @@ -61,6 +70,21 @@ func TestMessageSignatureHandler(t *testing.T) {
require.EqualValues(t, 0, stats.blockSignatureMiss.Count())
},
},
"offchain message": {
setup: func() (request message.MessageSignatureRequest, expectedResponse []byte) {
return message.MessageSignatureRequest{
MessageID: offchainMessage.ID(),
}, offchainSignature[:]
},
verifyStats: func(t *testing.T, stats *handlerStats) {
require.EqualValues(t, 1, stats.messageSignatureRequest.Count())
require.EqualValues(t, 1, stats.messageSignatureHit.Count())
require.EqualValues(t, 0, stats.messageSignatureMiss.Count())
require.EqualValues(t, 0, stats.blockSignatureRequest.Count())
require.EqualValues(t, 0, stats.blockSignatureHit.Count())
require.EqualValues(t, 0, stats.blockSignatureMiss.Count())
},
},
"unknown message": {
setup: func() (request message.MessageSignatureRequest, expectedResponse []byte) {
return message.MessageSignatureRequest{
Expand Down Expand Up @@ -125,14 +149,16 @@ func TestBlockSignatureHandler(t *testing.T) {
return nil, errors.New("invalid blockID")
},
}
backend := warp.NewBackend(
backend, err := warp.NewBackend(
snowCtx.NetworkID,
snowCtx.ChainID,
warpSigner,
testVM,
database,
100,
nil,
)
require.NoError(t, err)

signature, err := backend.GetBlockSignature(blkID)
require.NoError(t, err)
Expand Down

0 comments on commit 9566457

Please sign in to comment.