Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Implement custom order filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
albrow committed Jan 8, 2020
1 parent 32448ff commit 8d77bbe
Show file tree
Hide file tree
Showing 19 changed files with 933 additions and 616 deletions.
4 changes: 4 additions & 0 deletions browser/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"syscall/js"
"time"

"github.com/0xProject/0x-mesh/orderfilter"

"github.com/0xProject/0x-mesh/core"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -136,6 +138,8 @@ func convertConfig(jsConfig js.Value) (core.Config, error) {
if maxOrdersInStorage := jsConfig.Get("maxOrdersInStorage"); !isNullOrUndefined(maxOrdersInStorage) {
config.MaxOrdersInStorage = maxOrdersInStorage.Int()
}
// TODO(albrow): Add proper support for custom order filters here.
config.CustomOrderFilter = orderfilter.DefaultCustomOrderSchema

return config, nil
}
Expand Down
1 change: 1 addition & 0 deletions browser/ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export interface Config {
// maximum expiration time for incoming orders and remove any orders with an
// expiration time too far in the future. Defaults to 100,000.
maxOrdersInStorage?: number;
// TODO(albrow): Add customOrderFilter here.
}

export interface ContractAddresses {
Expand Down
85 changes: 57 additions & 28 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/0xProject/0x-mesh/keys"
"github.com/0xProject/0x-mesh/loghooks"
"github.com/0xProject/0x-mesh/meshdb"
"github.com/0xProject/0x-mesh/orderfilter"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/rpc"
"github.com/0xProject/0x-mesh/zeroex"
Expand All @@ -39,7 +40,6 @@ import (
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"github.com/xeipuuv/gojsonschema"
)

const (
Expand Down Expand Up @@ -142,6 +142,13 @@ type Config struct {
// enforcing a limit on maximum expiration time for incoming orders and remove
// any orders with an expiration time too far in the future.
MaxOrdersInStorage int `envvar:"MAX_ORDERS_IN_STORAGE" default:"100000"`
// CustomOrderFilter is a JSON-schema which will be used for validating
// incoming orders. If provided, Mesh will only receive orders from other
// peers in the network with the same filter.
//
// TODO(albrow): Link to more documentation about JSON-schemas and how this
// filter works.
CustomOrderFilter string `envvar:"CUSTOM_ORDER_FILTER" default:"{}"`
}

type snapshotInfo struct {
Expand All @@ -159,8 +166,7 @@ type App struct {
blockWatcher *blockwatch.Watcher
orderWatcher *orderwatch.Watcher
orderValidator *ordervalidator.OrderValidator
orderJSONSchema *gojsonschema.Schema
meshMessageJSONSchema *gojsonschema.Schema
orderFilter *orderfilter.Filter
snapshotExpirationWatcher *expirationwatch.Watcher
muIdToSnapshotInfo sync.Mutex
idToSnapshotInfo map[string]snapshotInfo
Expand Down Expand Up @@ -292,17 +298,16 @@ func New(config Config) (*App, error) {
return nil, err
}

snapshotExpirationWatcher := expirationwatch.New()

orderJSONSchema, err := setupOrderSchemaValidator()
// Initialize the order filter
orderFilter, err := orderfilter.New(config.EthereumChainID, config.CustomOrderFilter)
if err != nil {
return nil, err
}
meshMessageJSONSchema, err := setupMeshMessageSchemaValidator()
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid custom order filter: %s", err.Error())
}

// Initialize remaining fields.
snapshotExpirationWatcher := expirationwatch.New()
orderSelector := &orderSelector{
topic: orderFilter.Topic(),
nextOffset: 0,
db: meshDB,
}
Expand All @@ -316,8 +321,7 @@ func New(config Config) (*App, error) {
blockWatcher: blockWatcher,
orderWatcher: orderWatcher,
orderValidator: orderValidator,
orderJSONSchema: orderJSONSchema,
meshMessageJSONSchema: meshMessageJSONSchema,
orderFilter: orderFilter,
snapshotExpirationWatcher: snapshotExpirationWatcher,
idToSnapshotInfo: map[string]snapshotInfo{},
orderSelector: orderSelector,
Expand Down Expand Up @@ -345,8 +349,24 @@ func unquoteConfig(config Config) Config {
return config
}

func getPubSubTopic(chainID int) string {
return fmt.Sprintf("/0x-orders/network/%d/version/2", chainID)
func getPublishTopics(chainID int, customFilter *orderfilter.Filter) ([]string, error) {
defaultTopic, err := orderfilter.GetDefaultTopic(chainID)
if err != nil {
return nil, err
}
customTopic := customFilter.Topic()
if defaultTopic == customTopic {
// If we're just using the default order filter, we don't need to publish to
// multiple topics.
return []string{defaultTopic}, nil
} else {
// If we are using a custom order filter, publish to *both* the default
// topic and the custom topic. All orders that match the custom order filter
// must necessarily match the default filter. This also allows us to
// implement cross-topic forwarding in the future.
// See https://github.com/0xProject/0x-mesh/pull/563
return []string{defaultTopic, customTopic}, nil
}
}

func getRendezvous(chainID int) string {
Expand Down Expand Up @@ -394,6 +414,12 @@ func initMetadata(chainID int, meshDB *meshdb.MeshDB) (*meshdb.Metadata, error)
}

func (app *App) Start(ctx context.Context) error {
// Get the publish topics depending on our custom order filter.
publishTopics, err := getPublishTopics(app.config.EthereumChainID, app.orderFilter)
if err != nil {
return err
}

// Create a child context so that we can preemptively cancel if there is an
// error.
innerCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -491,16 +517,18 @@ func (app *App) Start(ctx context.Context) error {
bootstrapList = strings.Split(app.config.BootstrapList, ",")
}
nodeConfig := p2p.Config{
Topic: getPubSubTopic(app.config.EthereumChainID),
TCPPort: app.config.P2PTCPPort,
WebSocketsPort: app.config.P2PWebSocketsPort,
Insecure: false,
PrivateKey: app.privKey,
MessageHandler: app,
RendezvousString: getRendezvous(app.config.EthereumChainID),
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
SubscribeTopic: app.orderFilter.Topic(),
PublishTopics: publishTopics,
TCPPort: app.config.P2PTCPPort,
WebSocketsPort: app.config.P2PWebSocketsPort,
Insecure: false,
PrivateKey: app.privKey,
MessageHandler: app,
RendezvousString: getRendezvous(app.config.EthereumChainID),
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
CustomMessageValidator: app.orderFilter.ValidatePubSubMessage,
}
app.node, err = p2p.New(innerCtx, nodeConfig)
if err != nil {
Expand All @@ -515,6 +543,7 @@ func (app *App) Start(ctx context.Context) error {
addrs := app.node.Multiaddrs()
log.WithFields(map[string]interface{}{
"addresses": addrs,
"topic": app.orderFilter.Topic(),
}).Info("starting p2p node")

wg.Add(1)
Expand Down Expand Up @@ -711,7 +740,7 @@ func (app *App) AddOrders(ctx context.Context, signedOrdersRaw []*json.RawMessag
schemaValidOrders := []*zeroex.SignedOrder{}
for _, signedOrderRaw := range signedOrdersRaw {
signedOrderBytes := []byte(*signedOrderRaw)
result, err := app.schemaValidateOrder(signedOrderBytes)
result, err := app.orderFilter.ValidateOrderJSON(signedOrderBytes)
if err != nil {
signedOrder := &zeroex.SignedOrder{}
if err := signedOrder.UnmarshalJSON(signedOrderBytes); err != nil {
Expand Down Expand Up @@ -801,7 +830,7 @@ func (app *App) AddOrders(ctx context.Context, signedOrdersRaw []*json.RawMessag
func (app *App) shareOrder(order *zeroex.SignedOrder) error {
<-app.started

encoded, err := encoding.OrderToRawMessage(order)
encoded, err := encoding.OrderToRawMessage(app.orderFilter.Topic(), order)
if err != nil {
return err
}
Expand Down Expand Up @@ -847,7 +876,7 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {

response := &rpc.GetStatsResponse{
Version: version,
PubSubTopic: getPubSubTopic(app.config.EthereumChainID),
PubSubTopic: app.orderFilter.Topic(),
Rendezvous: getRendezvous(app.config.EthereumChainID),
PeerID: app.peerID.String(),
EthereumChainID: app.config.EthereumChainID,
Expand Down
25 changes: 2 additions & 23 deletions core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
var _ p2p.MessageHandler = &App{}

type orderSelector struct {
topic string
nextOffset int
db *meshdb.MeshDB
}
Expand Down Expand Up @@ -95,7 +96,7 @@ func (orderSelector *orderSelector) GetMessagesToShare(max int) ([][]byte, error
log.WithFields(map[string]interface{}{
"order": order,
}).Trace("selected order to share")
encoded, err := encoding.OrderToRawMessage(order.SignedOrder)
encoded, err := encoding.OrderToRawMessage(orderSelector.topic, order.SignedOrder)
if err != nil {
return nil, err
}
Expand All @@ -121,28 +122,6 @@ func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) err
continue
}

result, err := app.schemaValidateMeshMessage(msg.Data)
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"from": msg.From,
}).Trace("could not schema validate message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}
if !result.Valid() {
formattedErrors := make([]string, len(result.Errors()))
for i, resultError := range result.Errors() {
formattedErrors[i] = resultError.String()
}
log.WithFields(map[string]interface{}{
"errors": formattedErrors,
"from": msg.From,
}).Trace("order schema validation failed for message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}

order, err := encoding.RawMessageToOrder(msg.Data)
if err != nil {
log.WithFields(map[string]interface{}{
Expand Down
4 changes: 2 additions & 2 deletions core/message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func TestMessagesSharedSerial(t *testing.T) {
require.NoError(t, err)

selector := &orderSelector{
topic: "customTopic",
nextOffset: 0,
db: meshDB,
}
Expand Down Expand Up @@ -192,9 +193,8 @@ func verifyRoundRobinSharing(t *testing.T, selector *orderSelector, nextOffset i

// Calculate the orders that we expect to be shared
for i := 0; i < expectedOrdersLength; i++ {
encodedOrder, err := encoding.OrderToRawMessage(orderList[(nextOffset+i)%count].SignedOrder)
encodedOrder, err := encoding.OrderToRawMessage(selector.topic, orderList[(nextOffset+i)%count].SignedOrder)
require.NoError(t, err)

expectedOrders[i] = encodedOrder
}

Expand Down
97 changes: 16 additions & 81 deletions core/validation.go
Original file line number Diff line number Diff line change
@@ -1,91 +1,26 @@
package core

import (
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/p2p"
"github.com/xeipuuv/gojsonschema"
"github.com/0xProject/0x-mesh/zeroex"
)

// JSON-schema schemas
var (
addressSchemaLoader = gojsonschema.NewStringLoader(`{"id":"/addressSchema","type":"string","pattern":"^0x[0-9a-fA-F]{40}$"}`)
wholeNumberSchemaLoader = gojsonschema.NewStringLoader(`{"id":"/wholeNumberSchema","anyOf":[{"type":"string","pattern":"^\\d+$"},{"type":"integer"}]}`)
hexSchemaLoader = gojsonschema.NewStringLoader(`{"id":"/hexSchema","type":"string","pattern":"^0x(([0-9a-fA-F])*)?$"}`)
orderSchemaLoader = gojsonschema.NewStringLoader(`{"id":"/orderSchema","properties":{"makerAddress":{"$ref":"/addressSchema"},"takerAddress":{"$ref":"/addressSchema"},"makerFee":{"$ref":"/wholeNumberSchema"},"takerFee":{"$ref":"/wholeNumberSchema"},"senderAddress":{"$ref":"/addressSchema"},"makerAssetAmount":{"$ref":"/wholeNumberSchema"},"takerAssetAmount":{"$ref":"/wholeNumberSchema"},"makerAssetData":{"$ref":"/hexSchema"},"takerAssetData":{"$ref":"/hexSchema"},"makerFeeAssetData":{"$ref":"/hexSchema"},"takerFeeAssetData":{"$ref":"/hexSchema"},"salt":{"$ref":"/wholeNumberSchema"},"feeRecipientAddress":{"$ref":"/addressSchema"},"expirationTimeSeconds":{"$ref":"/wholeNumberSchema"},"exchangeAddress":{"$ref":"/addressSchema"},"chainId": {"type": "number"}},"required":["makerAddress","takerAddress","makerFee","takerFee","senderAddress","makerAssetAmount","takerAssetAmount","makerAssetData","takerAssetData","makerFeeAssetData","takerFeeAssetData","salt","feeRecipientAddress","expirationTimeSeconds","exchangeAddress","chainId"],"type":"object"}`)
signedOrderSchemaLoader = gojsonschema.NewStringLoader(`{"id":"/signedOrderSchema","allOf":[{"$ref":"/orderSchema"},{"properties":{"signature":{"$ref":"/hexSchema"}},"required":["signature"]}]}`)
meshMessageSchemaLoader = gojsonschema.NewStringLoader(`{"id":"/meshMessageSchema","properties":{"MessageType":{"type":"string"},"Order":{"$ref":"/signedOrderSchema"}},"required":["MessageType","Order"]}`)
)

func setupMeshMessageSchemaValidator() (*gojsonschema.Schema, error) {
sl := gojsonschema.NewSchemaLoader()
if err := sl.AddSchemas(addressSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(wholeNumberSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(hexSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(orderSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(signedOrderSchemaLoader); err != nil {
return nil, err
}
schema, err := sl.Compile(meshMessageSchemaLoader)
if err != nil {
return nil, err
}
return schema, nil
}

func setupOrderSchemaValidator() (*gojsonschema.Schema, error) {
sl := gojsonschema.NewSchemaLoader()
if err := sl.AddSchemas(addressSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(wholeNumberSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(hexSchemaLoader); err != nil {
return nil, err
}
if err := sl.AddSchemas(orderSchemaLoader); err != nil {
return nil, err
}
schema, err := sl.Compile(signedOrderSchemaLoader)
if err != nil {
return nil, err
}
return schema, nil
}

func (app *App) schemaValidateOrder(o []byte) (*gojsonschema.Result, error) {
orderLoader := gojsonschema.NewBytesLoader(o)

result, err := app.orderJSONSchema.Validate(orderLoader)
if err != nil {
return nil, err
}

return result, nil
}

func (app *App) schemaValidateMeshMessage(o []byte) (*gojsonschema.Result, error) {
messageLoader := gojsonschema.NewBytesLoader(o)

result, err := app.meshMessageJSONSchema.Validate(messageLoader)
if err != nil {
return nil, err
}

return result, nil
func validateMessageSize(message *p2p.Message) error {
// TODO(albrow): split up max order size and max message size.
// if len(message.Data) > constants.MaxOrderSizeInBytes {
// return errMaxSize
// }
return nil
}

func validateMessageSize(message *p2p.Message) error {
if len(message.Data) > constants.MaxOrderSizeInBytes {
return constants.ErrMaxMessageSize
}
func validateOrderSize(order *zeroex.SignedOrder) error {
// TODO(albrow): split up max order size and max message size.
// encoded, err := encodeOrder(order)
// if err != nil {
// return err
// }
// if len(encoded) > constants.MaxOrderSizeInBytes {
// return errMaxSize
// }
return nil
}
Loading

0 comments on commit 8d77bbe

Please sign in to comment.