diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e8eb2104..4e2052247 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This changelog is a work in progress and may contain notes for versions which ha ### Features ✅ - Implemented a new strategy for limiting the amount of database storage used by Mesh and removing orders when the database is full. This strategy involves a dynamically adjusting maximum expiration time. When the database is full, Mesh will enforce a maximum expiration time for all incoming orders and remove any existing orders with an expiration time too far in the future. If conditions change and there is enough space in the database again, the max expiration time will slowly increase. This is a short term solution which solves the immediate issue of finite storage capacities and does a decent job of protecting against spam. We expect to improve and possibly replace it in the future. See [#450](https://github.com/0xProject/0x-mesh/pull/450) for more details. +- Added support for a new feature called "order pinning" ([#474](https://github.com/0xProject/0x-mesh/pull/474)). Pinned orders will not be affected by any DDoS prevention or incentive mechanisms (including the new dynamic max expiration time feature) and will always stay in storage until they are no longer fillable. By default, all orders which are submitted via either the JSON-RPC API or the `addOrdersAsync` function in the TypeScript bindings will be pinned. ### Bug fixes 🐞 diff --git a/browser/go/main.go b/browser/go/main.go index df446363b..8ee6c5970 100644 --- a/browser/go/main.go +++ b/browser/go/main.go @@ -199,7 +199,7 @@ func (cw *MeshWrapper) Start() error { // AddOrders converts raw JavaScript orders into the appropriate type, calls // core.App.AddOrders, converts the result into basic JavaScript types (string, // int, etc.) and returns it. -func (cw *MeshWrapper) AddOrders(rawOrders js.Value) (js.Value, error) { +func (cw *MeshWrapper) AddOrders(rawOrders js.Value, pinned bool) (js.Value, error) { // HACK(albrow): There is a more effecient way to do this, but for now, // just use JSON to convert to the Go type. encodedOrders := js.Global().Get("JSON").Call("stringify", rawOrders).String() @@ -207,7 +207,7 @@ func (cw *MeshWrapper) AddOrders(rawOrders js.Value) (js.Value, error) { if err := json.Unmarshal([]byte(encodedOrders), &rawMessages); err != nil { return js.Undefined(), err } - results, err := cw.app.AddOrders(rawMessages) + results, err := cw.app.AddOrders(rawMessages, pinned) if err != nil { return js.Undefined(), err } @@ -242,7 +242,7 @@ func (cw *MeshWrapper) JSValue() js.Value { // addOrdersAsync(orders: Array): Promise "addOrdersAsync": js.FuncOf(func(this js.Value, args []js.Value) interface{} { return wrapInPromise(func() (interface{}, error) { - return cw.AddOrders(args[0]) + return cw.AddOrders(args[0], args[1].Bool()) }) }), }) diff --git a/browser/ts/index.ts b/browser/ts/index.ts index 932d6e324..0abc26129 100644 --- a/browser/ts/index.ts +++ b/browser/ts/index.ts @@ -123,7 +123,7 @@ interface MeshWrapper { startAsync(): Promise; onError(handler: (err: Error) => void): void; onOrderEvents(handler: (events: WrapperOrderEvent[]) => void): void; - addOrdersAsync(orders: WrapperSignedOrder[]): Promise; + addOrdersAsync(orders: WrapperSignedOrder[], pinned: boolean): Promise; } // The type for configuration exposed by MeshWrapper. @@ -592,11 +592,15 @@ export class Mesh { * the order; it will not be rejected for any invalid orders (check * results.rejected instead). * - * @param orders An array of orders to add. + * @param orders An array of orders to add. + * @param pinned Whether or not the orders should be pinned. Pinned + * orders will not be affected by any DDoS prevention or incentive + * mechanisms and will always stay in storage until they are no longer + * fillable. * @returns Validation results for the given orders, indicating which orders * were accepted and which were rejected. */ - public async addOrdersAsync(orders: SignedOrder[]): Promise { + public async addOrdersAsync(orders: SignedOrder[], pinned: boolean = true): Promise { await waitForLoadAsync(); if (this._wrapper === undefined) { // If this is called after startAsync, this._wrapper is always @@ -605,7 +609,7 @@ export class Mesh { return Promise.reject(new Error('Mesh is still loading. Try again soon.')); } const meshOrders = orders.map(signedOrderToWrapperSignedOrder); - const meshResults = await this._wrapper.addOrdersAsync(meshOrders); + const meshResults = await this._wrapper.addOrdersAsync(meshOrders, pinned); return wrapperValidationResultsToValidationResults(meshResults); } } diff --git a/cmd/mesh/rpc_handler.go b/cmd/mesh/rpc_handler.go index f8929d217..ac562e5b0 100644 --- a/cmd/mesh/rpc_handler.go +++ b/cmd/mesh/rpc_handler.go @@ -71,9 +71,9 @@ func (handler *rpcHandler) GetOrders(page, perPage int, snapshotID string) (*rpc } // AddOrders is called when an RPC client calls AddOrders. -func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) { +func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage, opts rpc.AddOrdersOpts) (*ordervalidator.ValidationResults, error) { log.WithField("count", len(signedOrdersRaw)).Debug("received AddOrders request via RPC") - validationResults, err := handler.app.AddOrders(signedOrdersRaw) + validationResults, err := handler.app.AddOrders(signedOrdersRaw, opts.Pinned) if err != nil { // We don't want to leak internal error details to the RPC client. log.WithField("error", err.Error()).Error("internal error in AddOrders RPC call") diff --git a/core/core.go b/core/core.go index 5e7f8450f..23553ea9a 100644 --- a/core/core.go +++ b/core/core.go @@ -611,8 +611,11 @@ func (app *App) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersR } // AddOrders can be used to add orders to Mesh. It validates the given orders -// and if they are valid, will store and eventually broadcast the orders to peers. -func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) { +// and if they are valid, will store and eventually broadcast the orders to +// peers. If pinned is true, the orders will be marked as pinned, which means +// they will only be removed if they become unfillable and will not be removed +// due to having a high expiration time or any incentive mechanisms. +func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage, pinned bool) (*ordervalidator.ValidationResults, error) { allValidationResults := &ordervalidator.ValidationResults{ Accepted: []*ordervalidator.AcceptedOrderInfo{}, Rejected: []*ordervalidator.RejectedOrderInfo{}, @@ -686,12 +689,22 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.V allValidationResults.Rejected = append(allValidationResults.Rejected, orderInfo) } - for _, acceptedOrderInfo := range allValidationResults.Accepted { + for i, acceptedOrderInfo := range allValidationResults.Accepted { // Add the order to the OrderWatcher. This also saves the order in the // database. - err = app.orderWatcher.Add(acceptedOrderInfo) + err = app.orderWatcher.Add(acceptedOrderInfo, pinned) if err != nil { - return nil, err + if err == meshdb.ErrDBFilledWithPinnedOrders { + allValidationResults.Accepted = append(allValidationResults.Accepted[:i], allValidationResults.Accepted[i+1:]...) + allValidationResults.Rejected = append(allValidationResults.Rejected, &ordervalidator.RejectedOrderInfo{ + OrderHash: acceptedOrderInfo.OrderHash, + SignedOrder: acceptedOrderInfo.SignedOrder, + Kind: ordervalidator.MeshError, + Status: ordervalidator.RODatabaseFullOfOrders, + }) + } else { + return nil, err + } } // Share the order with our peers. if err := app.shareOrder(acceptedOrderInfo.SignedOrder); err != nil { @@ -737,6 +750,10 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) { if err != nil { return nil, err } + numPinnedOrders, err := app.db.CountPinnedOrders() + if err != nil { + return nil, err + } response := &rpc.GetStatsResponse{ Version: version, @@ -748,6 +765,7 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) { NumOrders: numOrders, NumPeers: app.node.GetNumPeers(), NumOrdersIncludingRemoved: numOrdersIncludingRemoved, + NumPinnedOrders: numPinnedOrders, MaxExpirationTime: app.orderWatcher.MaxExpirationTime().String(), } return response, nil @@ -776,6 +794,7 @@ func (app *App) periodicallyLogStats(ctx context.Context) { "latestBlock": stats.LatestBlock, "numOrders": stats.NumOrders, "numOrdersIncludingRemoved": stats.NumOrdersIncludingRemoved, + "numPinnedOrders": stats.NumPinnedOrders, "numPeers": stats.NumPeers, "maxExpirationTime": stats.MaxExpirationTime, }).Info("current stats") diff --git a/core/message_handler.go b/core/message_handler.go index d3e628803..c354b00e4 100644 --- a/core/message_handler.go +++ b/core/message_handler.go @@ -182,7 +182,17 @@ func (app *App) HandleMessages(messages []*p2p.Message) error { "from": msg.From.String(), }).Trace("all fields for new valid order received from peer") // Add stores the message in the database. - if err := app.orderWatcher.Add(acceptedOrderInfo); err != nil { + if err := app.orderWatcher.Add(acceptedOrderInfo, false); err != nil { + if err == meshdb.ErrDBFilledWithPinnedOrders { + // If the database is full of pinned orders, log and then continue. + log.WithFields(map[string]interface{}{ + "error": err.Error(), + "orderHash": acceptedOrderInfo.OrderHash.Hex(), + "from": msg.From.String(), + }).Error("could not store valid order because database is full") + continue + } + // For any other type of error, return it. return err } app.handlePeerScoreEvent(msg.From, psOrderStored) diff --git a/meshdb/meshdb.go b/meshdb/meshdb.go index 7fb25bbe9..ec78d590f 100644 --- a/meshdb/meshdb.go +++ b/meshdb/meshdb.go @@ -1,6 +1,7 @@ package meshdb import ( + "errors" "fmt" "math/big" "time" @@ -13,6 +14,8 @@ import ( log "github.com/sirupsen/logrus" ) +var ErrDBFilledWithPinnedOrders = errors.New("the database is full of pinned orders; no orders can be removed in order to make space") + // Order is the database representation a 0x order along with some relevant metadata type Order struct { Hash common.Hash @@ -26,6 +29,9 @@ type Order struct { // flag it for removal. After this order isn't updated for X time and has IsRemoved = true, // the order can be permanently deleted. IsRemoved bool + // IsPinned indicates whether or not the order is pinned. Pinned orders are + // not removed from the database unless they become unfillable. + IsPinned bool } // ID returns the Order's ID @@ -155,7 +161,14 @@ func setupOrders(database *db.DB) (*OrdersCollection, error) { expirationTimeIndex := col.AddIndex("expirationTime", func(m db.Model) []byte { order := m.(*Order) - return uint256ToConstantLengthBytes(order.SignedOrder.ExpirationTimeSeconds) + expTimeString := uint256ToConstantLengthBytes(order.SignedOrder.ExpirationTimeSeconds) + // We separate pinned and non-pinned orders via a prefix that is either 0 or + // 1. + pinnedString := "0" + if order.IsPinned { + pinnedString = "1" + } + return []byte(fmt.Sprintf("%s|%s", pinnedString, expTimeString)) }) return &OrdersCollection{ @@ -421,8 +434,9 @@ func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirati return constants.UnlimitedExpirationTime, nil, nil } - // Find the orders which we need to remove. - filter := m.Orders.ExpirationTimeIndex.All() + // Find the orders which we need to remove. We use a prefix filter of "0|: so + // that we only remove non-pinned orders. + filter := m.Orders.ExpirationTimeIndex.PrefixFilter([]byte("0|")) numOrdersToRemove := numOrders - targetMaxOrders if err := m.Orders.NewQuery(filter).Reverse().Max(numOrdersToRemove).Run(&removedOrders); err != nil { return nil, nil, err @@ -438,6 +452,13 @@ func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirati return nil, nil, err } + // If we could not remove numOrdersToRemove orders than it means the database + // is full of pinned orders. We still remove as many orders as we can and then + // return an error. + if len(removedOrders) < numOrdersToRemove { + return nil, nil, ErrDBFilledWithPinnedOrders + } + // The new max expiration time is simply the minimum expiration time of the // orders that were removed (i.e., the expiration time of the last order in // the slice). We add a buffer of -1 just to make sure we don't exceed @@ -448,3 +469,10 @@ func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirati newMaxExpirationTime = newMaxExpirationTime.Sub(newMaxExpirationTime, big.NewInt(1)) return newMaxExpirationTime, removedOrders, nil } + +// CountPinnedOrders returns the number of pinned orders. +func (m *MeshDB) CountPinnedOrders() (int, error) { + // We use a prefix filter of "1|" so that we only count pinned orders. + filter := m.Orders.ExpirationTimeIndex.PrefixFilter([]byte("1|")) + return m.Orders.NewQuery(filter).Count() +} diff --git a/meshdb/meshdb_test.go b/meshdb/meshdb_test.go index 706186cec..c713da859 100644 --- a/meshdb/meshdb_test.go +++ b/meshdb/meshdb_test.go @@ -149,7 +149,7 @@ func TestTrimOrdersByExpirationTime(t *testing.T) { // Note: most of the fields in these orders are the same. For the purposes of // this test, the only thing that matters is the Salt and ExpirationTime. - rawOrders := []*zeroex.Order{ + rawUnpinnedOrders := []*zeroex.Order{ { MakerAddress: makerAddress, TakerAddress: constants.NullAddress, @@ -211,8 +211,82 @@ func TestTrimOrdersByExpirationTime(t *testing.T) { ExchangeAddress: contractAddresses.Exchange, }, } + rawPinnedOrders := []*zeroex.Order{ + { + MakerAddress: makerAddress, + TakerAddress: constants.NullAddress, + SenderAddress: constants.NullAddress, + FeeRecipientAddress: common.HexToAddress("0xa258b39954cef5cb142fd567a46cddb31a670124"), + TakerAssetData: common.Hex2Bytes("f47261b000000000000000000000000034d402f14d58e001d8efbe6585051bf9706aa064"), + MakerAssetData: common.Hex2Bytes("025717920000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c480000000000000000000000000000000000000000000000000000000000000001"), + Salt: big.NewInt(0), + MakerFee: big.NewInt(0), + TakerFee: big.NewInt(0), + MakerAssetAmount: big.NewInt(3551808554499581700), + TakerAssetAmount: big.NewInt(1), + ExpirationTimeSeconds: big.NewInt(250), + ExchangeAddress: contractAddresses.Exchange, + }, + { + MakerAddress: makerAddress, + TakerAddress: constants.NullAddress, + SenderAddress: constants.NullAddress, + FeeRecipientAddress: common.HexToAddress("0xa258b39954cef5cb142fd567a46cddb31a670124"), + TakerAssetData: common.Hex2Bytes("f47261b000000000000000000000000034d402f14d58e001d8efbe6585051bf9706aa064"), + MakerAssetData: common.Hex2Bytes("025717920000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c480000000000000000000000000000000000000000000000000000000000000001"), + Salt: big.NewInt(1), + MakerFee: big.NewInt(0), + TakerFee: big.NewInt(0), + MakerAssetAmount: big.NewInt(3551808554499581700), + TakerAssetAmount: big.NewInt(1), + ExpirationTimeSeconds: big.NewInt(350), + ExchangeAddress: contractAddresses.Exchange, + }, + } + + insertRawOrders(t, meshDB, rawUnpinnedOrders, false) + pinnedOrders := insertRawOrders(t, meshDB, rawPinnedOrders, true) + + // Call CalculateNewMaxExpirationTimeAndTrimDatabase and check the results. + targetMaxOrders := 4 + gotExpirationTime, gotRemovedOrders, err := meshDB.TrimOrdersByExpirationTime(targetMaxOrders) + require.NoError(t, err) + assert.Equal(t, "199", gotExpirationTime.String(), "newMaxExpirationTime") + assert.Len(t, gotRemovedOrders, 2, "wrong number of orders removed") + + // Check that the expiration time of each removed order is >= the new max. + for _, removedOrder := range gotRemovedOrders { + expirationTimeOfRemovedOrder := removedOrder.SignedOrder.ExpirationTimeSeconds + assert.True(t, expirationTimeOfRemovedOrder.Cmp(gotExpirationTime) != -1, "an order was removed with expiration time (%s) less than the new max (%s)", expirationTimeOfRemovedOrder, gotExpirationTime) + } + var remainingOrders []*Order + require.NoError(t, meshDB.Orders.FindAll(&remainingOrders)) + assert.Len(t, remainingOrders, 4, "wrong number of orders remaining") + + // Check that the expiration time of each remaining order is <= the new max. + for _, remainingOrder := range remainingOrders { + if !remainingOrder.IsPinned { + // Unpinned orders should not have an expiration time greater than the + // new max. + expirationTimeOfRemainingOrder := remainingOrder.SignedOrder.ExpirationTimeSeconds + newMaxPlusOne := big.NewInt(0).Add(gotExpirationTime, big.NewInt(1)) + assert.True(t, expirationTimeOfRemainingOrder.Cmp(newMaxPlusOne) != 1, "a remaining order had an expiration time (%s) greater than the new max + 1 (%s)", expirationTimeOfRemainingOrder, newMaxPlusOne) + } + } + + // Check that the pinned orders are still in the database. + for _, pinnedOrder := range pinnedOrders { + require.NoError(t, meshDB.Orders.FindByID(pinnedOrder.Hash.Bytes(), &Order{})) + } + + // Trying to trim orders when the database is full of pinned orders should + // return an error. + _, _, err = meshDB.TrimOrdersByExpirationTime(1) + assert.EqualError(t, err, ErrDBFilledWithPinnedOrders.Error(), "expected ErrFilledWithPinnedOrders when targetMaxOrders is less than the number of pinned orders") +} - orders := make([]*Order, len(rawOrders)) +func insertRawOrders(t *testing.T, meshDB *MeshDB, rawOrders []*zeroex.Order, isPinned bool) []*Order { + results := make([]*Order, len(rawOrders)) for i, order := range rawOrders { // Sign, compute order hash, and insert. signedOrder, err := zeroex.SignTestOrder(order) @@ -226,29 +300,10 @@ func TestTrimOrdersByExpirationTime(t *testing.T) { FillableTakerAssetAmount: big.NewInt(1), LastUpdated: time.Now(), IsRemoved: false, + IsPinned: isPinned, } - orders[i] = order + results[i] = order require.NoError(t, meshDB.Orders.Insert(order)) } - - // Call CalculateNewMaxExpirationTimeAndTrimDatabase and check the results. - targetMaxOrders := 2 - gotExpirationTime, gotRemovedOrders, err := meshDB.TrimOrdersByExpirationTime(targetMaxOrders) - require.NoError(t, err) - assert.Equal(t, "199", gotExpirationTime.String(), "newMaxExpirationTime") - assert.Len(t, gotRemovedOrders, 2, "wrong number of orders removed") - // Check that the expiration time of each removed order is >= the new max. - for _, removedOrder := range gotRemovedOrders { - expirationTimeOfRemovedOrder := removedOrder.SignedOrder.ExpirationTimeSeconds - assert.True(t, expirationTimeOfRemovedOrder.Cmp(gotExpirationTime) != -1, "an order was removed with expiration time (%s) less than the new max (%s)", expirationTimeOfRemovedOrder, gotExpirationTime) - } - var remainingOrders []*Order - require.NoError(t, meshDB.Orders.FindAll(&remainingOrders)) - assert.Len(t, remainingOrders, 2, "wrong number of orders remaining") - // Check that the expiration time of each remaining order is <= the new max. - for _, removedOrder := range remainingOrders { - expirationTimeOfRemovedOrder := removedOrder.SignedOrder.ExpirationTimeSeconds - newMaxPlusOne := big.NewInt(0).Add(gotExpirationTime, big.NewInt(1)) - assert.True(t, expirationTimeOfRemovedOrder.Cmp(newMaxPlusOne) != 1, "a remaining order had an expiration time (%s) greater than the new max + 1 (%s)", expirationTimeOfRemovedOrder, newMaxPlusOne) - } + return results } diff --git a/rpc/client.go b/rpc/client.go index 75a932849..1c0c9a397 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -29,10 +29,24 @@ func NewClient(addr string) (*Client, error) { }, nil } +// AddOrdersOpts is a set of options for the AddOrders RPC method. +type AddOrdersOpts struct { + // Pinned determines whether or not the added orders should be pinned. Pinned + // orders will not be affected by any DDoS prevention or incentive mechanisms + // and will always stay in storage until they are no longer fillable. Defaults + // to true. + Pinned bool `json:"pinned"` +} + // AddOrders adds orders to the 0x Mesh node and broadcasts them throughout the // 0x Mesh network. -func (c *Client) AddOrders(orders []*zeroex.SignedOrder) (*ordervalidator.ValidationResults, error) { +func (c *Client) AddOrders(orders []*zeroex.SignedOrder, opts ...AddOrdersOpts) (*ordervalidator.ValidationResults, error) { var validationResults ordervalidator.ValidationResults + if len(opts) > 0 { + if err := c.rpcClient.Call(&validationResults, "mesh_addOrders", orders, opts[0]); err != nil { + return nil, err + } + } if err := c.rpcClient.Call(&validationResults, "mesh_addOrders", orders); err != nil { return nil, err } @@ -85,6 +99,7 @@ type GetStatsResponse struct { NumPeers int `json:"numPeers"` NumOrders int `json:"numOrders"` NumOrdersIncludingRemoved int `json:"numOrdersIncludingRemoved"` + NumPinnedOrders int `json:"numPinnedOrders"` MaxExpirationTime string `json:"maxExpirationTime"` } diff --git a/rpc/client_server_test.go b/rpc/client_server_test.go index 33cb692db..eddde158d 100644 --- a/rpc/client_server_test.go +++ b/rpc/client_server_test.go @@ -27,18 +27,18 @@ import ( // dummyRPCHandler is used for testing purposes. It allows declaring handlers // for some requests or all of them, depending on testing needs. type dummyRPCHandler struct { - addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) + addOrdersHandler func(signedOrdersRaw []*json.RawMessage, opts AddOrdersOpts) (*ordervalidator.ValidationResults, error) getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error) addPeerHandler func(peerInfo peerstore.PeerInfo) error getStatsHandler func() (*GetStatsResponse, error) subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error) } -func (d *dummyRPCHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) { +func (d *dummyRPCHandler) AddOrders(signedOrdersRaw []*json.RawMessage, opts AddOrdersOpts) (*ordervalidator.ValidationResults, error) { if d.addOrdersHandler == nil { return nil, errors.New("dummyRPCHandler: no handler set for AddOrder") } - return d.addOrdersHandler(signedOrdersRaw) + return d.addOrdersHandler(signedOrdersRaw, opts) } func (d *dummyRPCHandler) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) { @@ -125,7 +125,7 @@ func TestAddOrdersSuccess(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) rpcHandler := &dummyRPCHandler{ - addOrdersHandler: func(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) { + addOrdersHandler: func(signedOrdersRaw []*json.RawMessage, opts AddOrdersOpts) (*ordervalidator.ValidationResults, error) { require.Len(t, signedOrdersRaw, 1) validationResponse := &ordervalidator.ValidationResults{} for _, signedOrderRaw := range signedOrdersRaw { diff --git a/rpc/clients/typescript/src/types.ts b/rpc/clients/typescript/src/types.ts index 35fb3c5c1..a57599f8a 100644 --- a/rpc/clients/typescript/src/types.ts +++ b/rpc/clients/typescript/src/types.ts @@ -419,5 +419,6 @@ export interface GetStatsResponse { numPeers: number; numOrders: number; numOrdersIncludingRemoved: number; + numPinnedOrders: number; maxExpirationTime: string; } diff --git a/rpc/clients/typescript/src/ws_client.ts b/rpc/clients/typescript/src/ws_client.ts index cf5a4febd..e4cc17a9a 100644 --- a/rpc/clients/typescript/src/ws_client.ts +++ b/rpc/clients/typescript/src/ws_client.ts @@ -35,7 +35,7 @@ import { StringifiedExchangeCancelUpToEvent, StringifiedExchangeFillEvent, StringifiedWethDepositEvent, -StringifiedWethWithdrawalEvent, + StringifiedWethWithdrawalEvent, ValidationResults, WSOpts, } from './types'; @@ -208,11 +208,11 @@ export class WSClient { } const contractEvent: ContractEvent = { blockHash: rawContractEvent.blockHash, - txHash: rawContractEvent.txHash, - txIndex: rawContractEvent.txIndex, - logIndex: rawContractEvent.logIndex, - isRemoved: rawContractEvent.isRemoved, - address: rawContractEvent.address, + txHash: rawContractEvent.txHash, + txIndex: rawContractEvent.txIndex, + logIndex: rawContractEvent.logIndex, + isRemoved: rawContractEvent.isRemoved, + address: rawContractEvent.address, kind, parameters, }; @@ -242,12 +242,17 @@ export class WSClient { /** * Adds an array of 0x signed orders to the Mesh node. * @param signedOrders signedOrders to add + * @param pinned Whether or not the orders should be pinned. Pinned + * orders will not be affected by any DDoS prevention or incentive + * mechanisms and will always stay in storage until they are no longer + * fillable. * @returns validation results */ - public async addOrdersAsync(signedOrders: SignedOrder[]): Promise { + public async addOrdersAsync(signedOrders: SignedOrder[], pinned: boolean = true): Promise { assert.isArray('signedOrders', signedOrders); const rawValidationResults: RawValidationResults = await (this._wsProvider as any).send('mesh_addOrders', [ signedOrders, + pinned, ]); const validationResults: ValidationResults = { accepted: WSClient._convertRawAcceptedOrderInfos(rawValidationResults.accepted), @@ -277,13 +282,11 @@ export class WSClient { let snapshotID = ''; // New snapshot let page = 0; - const getOrdersResponse: GetOrdersResponse = await this._wsProvider.send('mesh_getOrders', - [ - page, - perPage, - snapshotID, - ], - ); + const getOrdersResponse: GetOrdersResponse = await this._wsProvider.send('mesh_getOrders', [ + page, + perPage, + snapshotID, + ]); snapshotID = getOrdersResponse.snapshotID; let ordersInfos = getOrdersResponse.ordersInfos; @@ -291,8 +294,7 @@ export class WSClient { do { rawOrderInfos = [...rawOrderInfos, ...ordersInfos]; page++; - ordersInfos = (await this._wsProvider.send('mesh_getOrders', [page, perPage, snapshotID])) - .ordersInfos; + ordersInfos = (await this._wsProvider.send('mesh_getOrders', [page, perPage, snapshotID])).ordersInfos; } while (Object.keys(ordersInfos).length > 0); const orderInfos = WSClient._convertRawOrderInfos(rawOrderInfos); diff --git a/rpc/service.go b/rpc/service.go index daf71ee8b..1fec72c71 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -28,7 +28,7 @@ type rpcService struct { // RPCHandler is used to respond to incoming requests from the client. type RPCHandler interface { // AddOrders is called when the client sends an AddOrders request. - AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) + AddOrders(signedOrdersRaw []*json.RawMessage, opts AddOrdersOpts) (*ordervalidator.ValidationResults, error) // GetOrders is called when the clients sends a GetOrders request GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) // AddPeer is called when the client sends an AddPeer request. @@ -117,9 +117,16 @@ func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) { return rpcSub, nil } +var defaultAddOrdersOpts = AddOrdersOpts{ + Pinned: true, +} + // AddOrders calls rpcHandler.AddOrders and returns the validation results. -func (s *rpcService) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) { - return s.rpcHandler.AddOrders(signedOrdersRaw) +func (s *rpcService) AddOrders(signedOrdersRaw []*json.RawMessage, opts *AddOrdersOpts) (*ordervalidator.ValidationResults, error) { + if opts == nil { + opts = &defaultAddOrdersOpts + } + return s.rpcHandler.AddOrders(signedOrdersRaw, *opts) } // GetOrders calls rpcHandler.GetOrders and returns the validation results. diff --git a/zeroex/ordervalidator/order_validator.go b/zeroex/ordervalidator/order_validator.go index 9749b8181..9bc6cde44 100644 --- a/zeroex/ordervalidator/order_validator.go +++ b/zeroex/ordervalidator/order_validator.go @@ -181,6 +181,10 @@ var ( Code: "SenderAddressNotAllowed", Message: "orders with a senderAddress are not currently supported", } + RODatabaseFullOfOrders = RejectedOrderStatus{ + Code: "DatabaseFullOfOrders", + Message: "database is full of pinned orders and no orders can be deleted to make space (consider increasing MAX_ORDERS_IN_STORAGE)", + } ) // ROInvalidSchemaCode is the RejectedOrderStatus emitted if an order doesn't conform to the order schema diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 65b93ed30..98e49f15a 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -721,8 +721,11 @@ func (w *Watcher) permanentlyDeleteStaleRemovedOrders(ctx context.Context) error } // Add adds a 0x order to the DB and watches it for changes in fillability. It -// will no-op (and return nil) if the order has already been added. -func (w *Watcher) Add(orderInfo *ordervalidator.AcceptedOrderInfo) error { +// will no-op (and return nil) if the order has already been added. If pinned is +// true, the orders will be marked as pinned. Pinned orders will not be affected +// by any DDoS prevention or incentive mechanisms and will always stay in +// storage until they are no longer fillable. +func (w *Watcher) Add(orderInfo *ordervalidator.AcceptedOrderInfo, pinned bool) error { if err := w.decreaseMaxExpirationTimeIfNeeded(); err != nil { return err } @@ -738,7 +741,7 @@ func (w *Watcher) Add(orderInfo *ordervalidator.AcceptedOrderInfo) error { // Final expiration time check before inserting the order. We might have just // changed max expiration time above. - if orderInfo.SignedOrder.ExpirationTimeSeconds.Cmp(w.maxExpirationTime) == 1 { + if !pinned && orderInfo.SignedOrder.ExpirationTimeSeconds.Cmp(w.maxExpirationTime) == 1 { // HACK(albrow): This is technically not the ideal way to respond to this // situation, but it is a lot easier to implement for the time being. In the // future, we should return an error and then react to that error @@ -774,6 +777,7 @@ func (w *Watcher) Add(orderInfo *ordervalidator.AcceptedOrderInfo) error { LastUpdated: time.Now().UTC(), FillableTakerAssetAmount: orderInfo.FillableTakerAssetAmount, IsRemoved: false, + IsPinned: pinned, } err := txn.Insert(order) if err != nil { diff --git a/zeroex/orderwatch/order_watcher_test.go b/zeroex/orderwatch/order_watcher_test.go index 865130d79..a92f8cf09 100644 --- a/zeroex/orderwatch/order_watcher_test.go +++ b/zeroex/orderwatch/order_watcher_test.go @@ -745,7 +745,7 @@ func watchOrder(t *testing.T, orderWatcher *Watcher, signedOrder *zeroex.SignedO FillableTakerAssetAmount: signedOrder.TakerAssetAmount, IsNew: true, } - err = orderWatcher.Add(orderInfo) + err = orderWatcher.Add(orderInfo, false) require.NoError(t, err) }