Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

test: Added more tests for ordersync #890

Merged
merged 11 commits into from
Jul 31, 2020
86 changes: 71 additions & 15 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"flag"
"fmt"
"math/big"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -94,10 +95,13 @@ func TestConfigChainIDAndRPCMatchDetection(t *testing.T) {
}

func newTestApp(t *testing.T, ctx context.Context) *App {
return newTestAppWithPrivateConfig(t, ctx, defaultPrivateConfig())
return newTestAppWithPrivateConfig(t, ctx, defaultOrderFilter, defaultPrivateConfig())
}

func newTestAppWithPrivateConfig(t *testing.T, ctx context.Context, pConfig privateConfig) *App {
func newTestAppWithPrivateConfig(t *testing.T, ctx context.Context, customOrderFilter string, pConfig privateConfig) *App {
if customOrderFilter == "" {
customOrderFilter = defaultOrderFilter
}
dataDir := "/tmp/test_node/" + uuid.New().String()
config := Config{
Verbosity: 2,
Expand All @@ -114,7 +118,7 @@ func newTestAppWithPrivateConfig(t *testing.T, ctx context.Context, pConfig priv
EthereumRPCMaxRequestsPer24HrUTC: 99999999999999,
EthereumRPCMaxRequestsPerSecond: 99999999999999,
MaxOrdersInStorage: 100000,
CustomOrderFilter: "{}",
CustomOrderFilter: customOrderFilter,
}
app, err := newWithPrivateConfig(ctx, config, pConfig)
require.NoError(t, err)
Expand Down Expand Up @@ -208,6 +212,37 @@ func TestOrderSync(t *testing.T) {
},
},
},
{
name: "makerAssetAmount orderfilter - match all orders",
customOrderFilter: `{"properties":{"makerAssetAmount":{"pattern":"^1$","type":"string"}}}`,
orderOptionsForIndex: func(_ int) []orderopts.Option {
return []orderopts.Option{orderopts.MakerAssetAmount(big.NewInt(1))}
},
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
paginationSubprotocols: []ordersyncSubprotocolFactory{
NewFilteredPaginationSubprotocolV1,
NewFilteredPaginationSubprotocolV0,
},
},
},
{
name: "makerAssetAmount OrderFilter - matches one order",
customOrderFilter: `{"properties":{"makerAssetAmount":{"pattern":"^1$","type":"string"}}}`,
orderOptionsForIndex: func(i int) []orderopts.Option {
if i == 0 {
return []orderopts.Option{orderopts.MakerAssetAmount(big.NewInt(1))}
}
return []orderopts.Option{}
},
pConfig: privateConfig{
paginationSubprotocolPerPage: 10,
paginationSubprotocols: []ordersyncSubprotocolFactory{
NewFilteredPaginationSubprotocolV1,
NewFilteredPaginationSubprotocolV0,
},
},
},
}
for i, testCase := range testCases {
testCaseName := fmt.Sprintf("%s (test case %d)", testCase.name, i)
Expand All @@ -216,10 +251,14 @@ func TestOrderSync(t *testing.T) {
}

type ordersyncTestCase struct {
name string
pConfig privateConfig
name string
customOrderFilter string
orderOptionsForIndex func(int) []orderopts.Option
pConfig privateConfig
}

const defaultOrderFilter = "{}"

func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *testing.T) {
return func(t *testing.T) {
teardownSubTest := setupSubTest(t)
Expand All @@ -230,20 +269,26 @@ func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *test
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
wg := &sync.WaitGroup{}
originalNode := newTestAppWithPrivateConfig(t, ctx, testCase.pConfig)
originalNode := newTestAppWithPrivateConfig(t, ctx, defaultOrderFilter, testCase.pConfig)
wg.Add(1)
go func() {
defer wg.Done()
if err := originalNode.Start(); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
panic(err)
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()

// Manually add some orders to originalNode.
orderOptions := scenario.OptionsForAll(orderopts.SetupMakerState(true))
orderOptionsForIndex := func(i int) []orderopts.Option {
orderOptions := []orderopts.Option{orderopts.SetupMakerState(true)}
if testCase.orderOptionsForIndex != nil {
return append(testCase.orderOptionsForIndex(i), orderOptions...)
}
return orderOptions
}
numOrders := testCase.pConfig.paginationSubprotocolPerPage*3 + 1
originalOrders := scenario.NewSignedTestOrdersBatch(t, numOrders, orderOptions)
originalOrders := scenario.NewSignedTestOrdersBatch(t, numOrders, orderOptionsForIndex)

// We have to wait for latest block to be processed by the Mesh node.
time.Sleep(blockProcessingWaitTime)
Expand All @@ -252,13 +297,13 @@ func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *test
require.NoError(t, err)
require.Empty(t, results.Rejected, "tried to add orders but some were invalid: \n%s\n", spew.Sdump(results))

newNode := newTestApp(t, ctx)
newNode := newTestAppWithPrivateConfig(t, ctx, testCase.customOrderFilter, defaultPrivateConfig())
wg.Add(1)
go func() {
defer wg.Done()
if err := newNode.Start(); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
panic(err)
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()
<-newNode.started
Expand All @@ -275,6 +320,17 @@ func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *test
})
require.NoError(t, err)

// Only the orders that satisfy the new node's orderfilter should
// be received during ordersync.
filteredOrders := []*zeroex.SignedOrder{}
for _, order := range originalOrders {
matches, err := newNode.orderFilter.MatchOrder(order)
require.NoError(t, err)
if matches {
filteredOrders = append(filteredOrders, order)
}
}

// Wait for newNode to get the orders via ordersync.
receivedAddedEvents := []*zeroex.OrderEvent{}
OrderEventLoop:
Expand All @@ -288,18 +344,18 @@ func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *test
receivedAddedEvents = append(receivedAddedEvents, orderEvent)
}
}
if len(receivedAddedEvents) >= len(originalOrders) {
if len(receivedAddedEvents) >= len(filteredOrders) {
break OrderEventLoop
}
}
}

// Test that the orders are actually in the database and are returned by
// GetOrders.
newNodeOrdersResp, err := newNode.GetOrders(len(originalOrders), common.Hash{})
newNodeOrdersResp, err := newNode.GetOrders(len(filteredOrders), common.Hash{})
require.NoError(t, err)
assert.Len(t, newNodeOrdersResp.OrdersInfos, len(originalOrders), "new node should have %d orders", len(originalOrders))
for _, expectedOrder := range originalOrders {
assert.Len(t, newNodeOrdersResp.OrdersInfos, len(filteredOrders), "new node should have %d orders", len(originalOrders))
for _, expectedOrder := range filteredOrders {
orderHash, err := expectedOrder.ComputeOrderHash()
require.NoError(t, err)
expectedOrder.ResetHash()
Expand Down
7 changes: 0 additions & 7 deletions core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ import (
// Ensure that App implements p2p.MessageHandler.
var _ p2p.MessageHandler = &App{}

func min(a int, b int) int {
if a < b {
return a
}
return b
}

func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) error {
// First we validate the messages and decode them into orders.
orders := []*zeroex.SignedOrder{}
Expand Down
97 changes: 53 additions & 44 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func New(ctx context.Context, node *p2p.Node, subprotocols []Subprotocol) *Servi
sids = append(sids, subp.Name())
supportedSubprotocols[subp.Name()] = subp
}
// TODO(jalextowle): We should ensure that there were no duplicates -- there
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
// is no reason to support this.
s := &Service{
ctx: ctx,
node: node,
Expand Down Expand Up @@ -223,51 +225,10 @@ func (s *Service) HandleStream(stream network.Stream) {
log.WithFields(log.Fields{
"requester": stream.Conn().RemotePeer().Pretty(),
}).Trace("received ordersync request")
if rawReq.Type != TypeRequest {
log.WithField("gotType", rawReq.Type).Warn("wrong type for Request")
s.handlePeerScoreEvent(requesterID, psInvalidMessage)
rawRes := s.handleRawRequest(rawReq, requesterID)
if rawRes == nil {
return
}
subprotocol, i, err := s.GetMatchingSubprotocol(rawReq)
if err != nil {
log.WithError(err).Warn("GetMatchingSubprotocol returned error")
s.handlePeerScoreEvent(requesterID, psSubprotocolNegotiationFailed)
return
}
if len(rawReq.Subprotocols) > 1 {
firstRequests := FirstRequestsForSubprotocols{}
err := json.Unmarshal(rawReq.Metadata, &firstRequests)

// NOTE(jalextowle): Older versions of Mesh did not include
// metadata in the first ordersync request. In order to handle
// this in a backwards compatible way, we simply avoid updating
// the request metadata if there was an error decoding the
// metadata from the request or if the length of the
// MetadataForSubprotocol is too small (or empty). This latter
// check also ensures that the array is long enough for us
// to access the i-th element.
if err == nil && len(firstRequests.MetadataForSubprotocol) > i {
rawReq.Metadata = firstRequests.MetadataForSubprotocol[i]
}
}
res, err := handleRequestWithSubprotocol(s.ctx, subprotocol, requesterID, rawReq)
if err != nil {
log.WithError(err).Warn("subprotocol returned error")
return
}
encodedMetadata, err := json.Marshal(res.Metadata)
if err != nil {
log.WithError(err).Error("could not encode raw metadata")
return
}
s.handlePeerScoreEvent(requesterID, psValidMessage)
rawRes := rawResponse{
Type: TypeResponse,
Subprotocol: subprotocol.Name(),
Orders: res.Orders,
Complete: res.Complete,
Metadata: encodedMetadata,
}
if err := json.NewEncoder(stream).Encode(rawRes); err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
Expand All @@ -276,7 +237,7 @@ func (s *Service) HandleStream(stream network.Stream) {
s.handlePeerScoreEvent(requesterID, psUnexpectedDisconnect)
return
}
if res.Complete {
if rawRes.Complete {
return
}
}
Expand Down Expand Up @@ -441,6 +402,54 @@ func calculateDelayWithJitter(approxDelay time.Duration, jitterAmount float64) t
return approxDelay + time.Duration(delta)
}

func (s *Service) handleRawRequest(rawReq *rawRequest, requesterID peer.ID) *rawResponse {
if rawReq.Type != TypeRequest {
log.WithField("gotType", rawReq.Type).Warn("wrong type for Request")
s.handlePeerScoreEvent(requesterID, psInvalidMessage)
return nil
}
subprotocol, i, err := s.GetMatchingSubprotocol(rawReq)
if err != nil {
log.WithError(err).Warn("GetMatchingSubprotocol returned error")
s.handlePeerScoreEvent(requesterID, psSubprotocolNegotiationFailed)
return nil
}
if len(rawReq.Subprotocols) > 1 {
firstRequests := FirstRequestsForSubprotocols{}
err := json.Unmarshal(rawReq.Metadata, &firstRequests)

// NOTE(jalextowle): Older versions of Mesh did not include
// metadata in the first ordersync request. In order to handle
// this in a backwards compatible way, we simply avoid updating
// the request metadata if there was an error decoding the
// metadata from the request or if the length of the
// MetadataForSubprotocol is too small (or empty). This latter
// check also ensures that the array is long enough for us
// to access the i-th element.
if err == nil && len(firstRequests.MetadataForSubprotocol) > i {
rawReq.Metadata = firstRequests.MetadataForSubprotocol[i]
}
}
res, err := handleRequestWithSubprotocol(s.ctx, subprotocol, requesterID, rawReq)
if err != nil {
log.WithError(err).Warn("subprotocol returned error")
return nil
}
encodedMetadata, err := json.Marshal(res.Metadata)
if err != nil {
log.WithError(err).Error("could not encode raw metadata")
return nil
}
s.handlePeerScoreEvent(requesterID, psValidMessage)
return &rawResponse{
Type: TypeResponse,
Subprotocol: subprotocol.Name(),
Orders: res.Orders,
Complete: res.Complete,
Metadata: encodedMetadata,
}
}

func handleRequestWithSubprotocol(ctx context.Context, subprotocol Subprotocol, requesterID peer.ID, rawReq *rawRequest) (*Response, error) {
req, err := parseRequestWithSubprotocol(subprotocol, requesterID, rawReq)
if err != nil {
Expand Down
Loading