Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Implement custom order filtering #630

Merged
merged 15 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions browser/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/0xProject/0x-mesh/core"
"github.com/0xProject/0x-mesh/orderfilter"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/event"
)
Expand Down Expand Up @@ -91,6 +92,7 @@ func convertConfig(jsConfig js.Value) (core.Config, error) {
EthereumRPCMaxRequestsPerSecond: 30,
EnableEthereumRPCRateLimiting: true,
MaxOrdersInStorage: 100000,
CustomOrderFilter: orderfilter.DefaultCustomOrderSchema,
}

// Required config options
Expand Down Expand Up @@ -136,6 +138,9 @@ func convertConfig(jsConfig js.Value) (core.Config, error) {
if maxOrdersInStorage := jsConfig.Get("maxOrdersInStorage"); !isNullOrUndefined(maxOrdersInStorage) {
config.MaxOrdersInStorage = maxOrdersInStorage.Int()
}
if customOrderFilter := jsConfig.Get("customOrderFilter"); !isNullOrUndefined(customOrderFilter) {
config.CustomOrderFilter = customOrderFilter.String()
}

return config, nil
}
Expand Down
78 changes: 77 additions & 1 deletion browser/ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,59 @@ BrowserFS.configure(
// The interval (in milliseconds) to check whether Wasm is done loading.
const wasmLoadCheckIntervalMs = 100;

/**
* An interface for JSON schema types, which are used for custom order filters.
*/
export interface JsonSchema {
id?: string;
$schema?: string;
$ref?: string;
title?: string;
description?: string;
multipleOf?: number;
maximum?: number;
exclusiveMaximum?: boolean;
minimum?: number;
exclusiveMinimum?: boolean;
maxLength?: number;
minLength?: number;
pattern?: string | RegExp;
additionalItems?: boolean | JsonSchema;
items?: JsonSchema | JsonSchema[];
maxItems?: number;
minItems?: number;
uniqueItems?: boolean;
maxProperties?: number;
minProperties?: number;
required?: string[];
additionalProperties?: boolean | JsonSchema;
definitions?: {
[name: string]: JsonSchema;
};
properties?: {
[name: string]: JsonSchema;
};
patternProperties?: {
[name: string]: JsonSchema;
};
dependencies?: {
[name: string]: JsonSchema | string[];
};
enum?: any[];
// NOTE(albrow): This interface type is based on
// https://github.com/tdegrunt/jsonschema/blob/9cb2cf847a33abb76b694c6ed4d8d12ef2037201/lib/index.d.ts#L50
// but modified to include the 'const' field from the JSON Schema
// specification draft 6 (https://json-schema.org/understanding-json-schema/reference/generic.html#constant-values)
// See also: https://github.com/tdegrunt/jsonschema/issues/271
const?: any;
type?: string | string[];
format?: string;
allOf?: JsonSchema[];
anyOf?: JsonSchema[];
oneOf?: JsonSchema[];
not?: JsonSchema;
}

// Note(albrow): This is currently copied over from core/core.go. We need to keep
// both definitions in sync, so if you change one you must also change the
// other.
Expand Down Expand Up @@ -126,6 +179,26 @@ export interface Config {
// maximum expiration time for incoming orders and remove any orders with an
// expiration time too far in the future. Defaults to 100,000.
maxOrdersInStorage?: number;
// A a JSON Schema object which will be used for validating incoming orders.
// If provided, Mesh will only receive orders from other peers in the
// network with the same filter.
//
// Here is an example filter which will only allow orders with a specific
// makerAssetData:
//
// {
// properties: {
// makerAssetData: {
// const: "0xf47261b0000000000000000000000000871dd7c2b4b25e1aa18728e9d5f2af4c4e431f5c"
// }
// }
// }
fabioberger marked this conversation as resolved.
Show resolved Hide resolved
//
// Note that you only need to include the requirements for your specific
// application in the filter. The default requirements for a valid order (e.g.
// all the required fields) are automatically included. For more information
// on JSON Schemas, see https://json-schema.org/
customOrderFilter?: JsonSchema;
}

export interface ContractAddresses {
Expand Down Expand Up @@ -177,8 +250,9 @@ interface WrapperConfig {
ethereumRPCMaxRequestsPer24HrUTC?: number;
ethereumRPCMaxRequestsPerSecond?: number;
enableEthereumRPCRateLimiting?: boolean;
customContractAddresses?: string; // json-encoded instead of Object.
customContractAddresses?: string; // json-encoded string instead of Object.
maxOrdersInStorage?: number;
customOrderFilter?: string; // json-encoded string instead of Object
}

// The type for signed orders exposed by MeshWrapper. Unlike other types, the
Expand Down Expand Up @@ -678,10 +752,12 @@ function configToWrapperConfig(config: Config): WrapperConfig {
const bootstrapList = config.bootstrapList == null ? undefined : config.bootstrapList.join(',');
const customContractAddresses =
config.customContractAddresses == null ? undefined : JSON.stringify(config.customContractAddresses);
const customOrderFilter = config.customOrderFilter == null ? undefined : JSON.stringify(config.customOrderFilter);
return {
...config,
bootstrapList,
customContractAddresses,
customOrderFilter,
};
}

Expand Down
98 changes: 70 additions & 28 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/0xProject/0x-mesh/keys"
"github.com/0xProject/0x-mesh/loghooks"
"github.com/0xProject/0x-mesh/meshdb"
"github.com/0xProject/0x-mesh/orderfilter"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/rpc"
"github.com/0xProject/0x-mesh/zeroex"
Expand All @@ -39,7 +40,6 @@ import (
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"github.com/xeipuuv/gojsonschema"
)

const (
Expand Down Expand Up @@ -143,6 +143,26 @@ type Config struct {
// enforcing a limit on maximum expiration time for incoming orders and remove
// any orders with an expiration time too far in the future.
MaxOrdersInStorage int `envvar:"MAX_ORDERS_IN_STORAGE" default:"100000"`
// CustomOrderFilter is a stringified JSON Schema which will be used for
// validating incoming orders. If provided, Mesh will only receive orders from
// other peers in the network with the same filter.
//
// Here is an example filter which will only allow orders with a specific
// makerAssetData:
//
// {
// "properties": {
// "makerAssetData": {
// "const": "0xf47261b0000000000000000000000000871dd7c2b4b25e1aa18728e9d5f2af4c4e431f5c"
// }
// }
// }
//
// Note that you only need to include the requirements for your specific
// application in the filter. The default requirements for a valid order (e.g.
// all the required fields) are automatically included. For more information
// on JSON Schemas, see https://json-schema.org/
CustomOrderFilter string `envvar:"CUSTOM_ORDER_FILTER" default:"{}"`
}

type snapshotInfo struct {
Expand All @@ -160,8 +180,7 @@ type App struct {
blockWatcher *blockwatch.Watcher
orderWatcher *orderwatch.Watcher
orderValidator *ordervalidator.OrderValidator
orderJSONSchema *gojsonschema.Schema
meshMessageJSONSchema *gojsonschema.Schema
orderFilter *orderfilter.Filter
snapshotExpirationWatcher *expirationwatch.Watcher
muIdToSnapshotInfo sync.Mutex
idToSnapshotInfo map[string]snapshotInfo
Expand Down Expand Up @@ -293,17 +312,16 @@ func New(config Config) (*App, error) {
return nil, err
}

snapshotExpirationWatcher := expirationwatch.New()

orderJSONSchema, err := setupOrderSchemaValidator()
// Initialize the order filter
orderFilter, err := orderfilter.New(config.EthereumChainID, config.CustomOrderFilter)
if err != nil {
return nil, err
}
meshMessageJSONSchema, err := setupMeshMessageSchemaValidator()
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid custom order filter: %s", err.Error())
}

// Initialize remaining fields.
snapshotExpirationWatcher := expirationwatch.New()
orderSelector := &orderSelector{
topic: orderFilter.Topic(),
nextOffset: 0,
db: meshDB,
}
Expand All @@ -317,8 +335,7 @@ func New(config Config) (*App, error) {
blockWatcher: blockWatcher,
orderWatcher: orderWatcher,
orderValidator: orderValidator,
orderJSONSchema: orderJSONSchema,
meshMessageJSONSchema: meshMessageJSONSchema,
orderFilter: orderFilter,
snapshotExpirationWatcher: snapshotExpirationWatcher,
idToSnapshotInfo: map[string]snapshotInfo{},
orderSelector: orderSelector,
Expand Down Expand Up @@ -346,8 +363,24 @@ func unquoteConfig(config Config) Config {
return config
}

func getPubSubTopic(chainID int) string {
return fmt.Sprintf("/0x-orders/network/%d/version/2", chainID)
func getPublishTopics(chainID int, customFilter *orderfilter.Filter) ([]string, error) {
defaultTopic, err := orderfilter.GetDefaultTopic(chainID)
if err != nil {
return nil, err
}
customTopic := customFilter.Topic()
if defaultTopic == customTopic {
// If we're just using the default order filter, we don't need to publish to
// multiple topics.
return []string{defaultTopic}, nil
} else {
// If we are using a custom order filter, publish to *both* the default
// topic and the custom topic. All orders that match the custom order filter
// must necessarily match the default filter. This also allows us to
// implement cross-topic forwarding in the future.
// See https://github.com/0xProject/0x-mesh/pull/563
return []string{defaultTopic, customTopic}, nil
}
}

func getRendezvous(chainID int) string {
Expand Down Expand Up @@ -395,6 +428,12 @@ func initMetadata(chainID int, meshDB *meshdb.MeshDB) (*meshdb.Metadata, error)
}

func (app *App) Start(ctx context.Context) error {
// Get the publish topics depending on our custom order filter.
publishTopics, err := getPublishTopics(app.config.EthereumChainID, app.orderFilter)
if err != nil {
return err
}

// Create a child context so that we can preemptively cancel if there is an
// error.
innerCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -495,16 +534,18 @@ func (app *App) Start(ctx context.Context) error {
bootstrapList = strings.Split(app.config.BootstrapList, ",")
}
nodeConfig := p2p.Config{
Topic: getPubSubTopic(app.config.EthereumChainID),
TCPPort: app.config.P2PTCPPort,
WebSocketsPort: app.config.P2PWebSocketsPort,
Insecure: false,
PrivateKey: app.privKey,
MessageHandler: app,
RendezvousString: getRendezvous(app.config.EthereumChainID),
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
SubscribeTopic: app.orderFilter.Topic(),
PublishTopics: publishTopics,
TCPPort: app.config.P2PTCPPort,
WebSocketsPort: app.config.P2PWebSocketsPort,
Insecure: false,
PrivateKey: app.privKey,
MessageHandler: app,
RendezvousString: getRendezvous(app.config.EthereumChainID),
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
CustomMessageValidator: app.orderFilter.ValidatePubSubMessage,
}
app.node, err = p2p.New(innerCtx, nodeConfig)
if err != nil {
Expand All @@ -519,6 +560,7 @@ func (app *App) Start(ctx context.Context) error {
addrs := app.node.Multiaddrs()
log.WithFields(map[string]interface{}{
"addresses": addrs,
"topic": app.orderFilter.Topic(),
}).Info("starting p2p node")

wg.Add(1)
Expand Down Expand Up @@ -715,7 +757,7 @@ func (app *App) AddOrders(ctx context.Context, signedOrdersRaw []*json.RawMessag
schemaValidOrders := []*zeroex.SignedOrder{}
for _, signedOrderRaw := range signedOrdersRaw {
signedOrderBytes := []byte(*signedOrderRaw)
result, err := app.schemaValidateOrder(signedOrderBytes)
result, err := app.orderFilter.ValidateOrderJSON(signedOrderBytes)
if err != nil {
signedOrder := &zeroex.SignedOrder{}
if err := signedOrder.UnmarshalJSON(signedOrderBytes); err != nil {
Expand Down Expand Up @@ -805,7 +847,7 @@ func (app *App) AddOrders(ctx context.Context, signedOrdersRaw []*json.RawMessag
func (app *App) shareOrder(order *zeroex.SignedOrder) error {
<-app.started

encoded, err := encoding.OrderToRawMessage(order)
encoded, err := encoding.OrderToRawMessage(app.orderFilter.Topic(), order)
if err != nil {
return err
}
Expand Down Expand Up @@ -851,7 +893,7 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {

response := &rpc.GetStatsResponse{
Version: version,
PubSubTopic: getPubSubTopic(app.config.EthereumChainID),
PubSubTopic: app.orderFilter.Topic(),
Rendezvous: getRendezvous(app.config.EthereumChainID),
PeerID: app.peerID.String(),
EthereumChainID: app.config.EthereumChainID,
Expand Down
32 changes: 9 additions & 23 deletions core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
var _ p2p.MessageHandler = &App{}

type orderSelector struct {
topic string
nextOffset int
db *meshdb.MeshDB
}
Expand Down Expand Up @@ -95,7 +96,7 @@ func (orderSelector *orderSelector) GetMessagesToShare(max int) ([][]byte, error
log.WithFields(map[string]interface{}{
"order": order,
}).Trace("selected order to share")
encoded, err := encoding.OrderToRawMessage(order.SignedOrder)
encoded, err := encoding.OrderToRawMessage(orderSelector.topic, order.SignedOrder)
if err != nil {
return nil, err
}
Expand All @@ -121,28 +122,6 @@ func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) err
continue
}

result, err := app.schemaValidateMeshMessage(msg.Data)
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"from": msg.From,
}).Trace("could not schema validate message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}
if !result.Valid() {
formattedErrors := make([]string, len(result.Errors()))
for i, resultError := range result.Errors() {
formattedErrors[i] = resultError.String()
}
log.WithFields(map[string]interface{}{
"errors": formattedErrors,
"from": msg.From,
}).Trace("order schema validation failed for message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}

order, err := encoding.RawMessageToOrder(msg.Data)
if err != nil {
log.WithFields(map[string]interface{}{
Expand Down Expand Up @@ -213,3 +192,10 @@ func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) err
}
return nil
}

func validateMessageSize(message *p2p.Message) error {
if len(message.Data) > constants.MaxMessageSizeInBytes {
return constants.ErrMaxMessageSize
}
return nil
}
Loading