Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #474 from 0xProject/feature/order-pinning
Browse files Browse the repository at this point in the history
Implement order pinning
  • Loading branch information
albrow authored Oct 28, 2019
2 parents 760c1cb + 0d9b950 commit 0b773a4
Show file tree
Hide file tree
Showing 16 changed files with 220 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This changelog is a work in progress and may contain notes for versions which ha
### Features ✅

- Implemented a new strategy for limiting the amount of database storage used by Mesh and removing orders when the database is full. This strategy involves a dynamically adjusting maximum expiration time. When the database is full, Mesh will enforce a maximum expiration time for all incoming orders and remove any existing orders with an expiration time too far in the future. If conditions change and there is enough space in the database again, the max expiration time will slowly increase. This is a short term solution which solves the immediate issue of finite storage capacities and does a decent job of protecting against spam. We expect to improve and possibly replace it in the future. See [#450](https://github.com/0xProject/0x-mesh/pull/450) for more details.
- Added support for a new feature called "order pinning" ([#474](https://github.com/0xProject/0x-mesh/pull/474)). Pinned orders will not be affected by any DDoS prevention or incentive mechanisms (including the new dynamic max expiration time feature) and will always stay in storage until they are no longer fillable. By default, all orders which are submitted via either the JSON-RPC API or the `addOrdersAsync` function in the TypeScript bindings will be pinned.

### Bug fixes 🐞

Expand Down
6 changes: 3 additions & 3 deletions browser/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,15 @@ func (cw *MeshWrapper) Start() error {
// AddOrders converts raw JavaScript orders into the appropriate type, calls
// core.App.AddOrders, converts the result into basic JavaScript types (string,
// int, etc.) and returns it.
func (cw *MeshWrapper) AddOrders(rawOrders js.Value) (js.Value, error) {
func (cw *MeshWrapper) AddOrders(rawOrders js.Value, pinned bool) (js.Value, error) {
// HACK(albrow): There is a more effecient way to do this, but for now,
// just use JSON to convert to the Go type.
encodedOrders := js.Global().Get("JSON").Call("stringify", rawOrders).String()
var rawMessages []*json.RawMessage
if err := json.Unmarshal([]byte(encodedOrders), &rawMessages); err != nil {
return js.Undefined(), err
}
results, err := cw.app.AddOrders(rawMessages)
results, err := cw.app.AddOrders(rawMessages, pinned)
if err != nil {
return js.Undefined(), err
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (cw *MeshWrapper) JSValue() js.Value {
// addOrdersAsync(orders: Array<SignedOrder>): Promise<ValidationResults>
"addOrdersAsync": js.FuncOf(func(this js.Value, args []js.Value) interface{} {
return wrapInPromise(func() (interface{}, error) {
return cw.AddOrders(args[0])
return cw.AddOrders(args[0], args[1].Bool())
})
}),
})
Expand Down
12 changes: 8 additions & 4 deletions browser/ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ interface MeshWrapper {
startAsync(): Promise<void>;
onError(handler: (err: Error) => void): void;
onOrderEvents(handler: (events: WrapperOrderEvent[]) => void): void;
addOrdersAsync(orders: WrapperSignedOrder[]): Promise<WrapperValidationResults>;
addOrdersAsync(orders: WrapperSignedOrder[], pinned: boolean): Promise<WrapperValidationResults>;
}

// The type for configuration exposed by MeshWrapper.
Expand Down Expand Up @@ -592,11 +592,15 @@ export class Mesh {
* the order; it will not be rejected for any invalid orders (check
* results.rejected instead).
*
* @param orders An array of orders to add.
* @param orders An array of orders to add.
* @param pinned Whether or not the orders should be pinned. Pinned
* orders will not be affected by any DDoS prevention or incentive
* mechanisms and will always stay in storage until they are no longer
* fillable.
* @returns Validation results for the given orders, indicating which orders
* were accepted and which were rejected.
*/
public async addOrdersAsync(orders: SignedOrder[]): Promise<ValidationResults> {
public async addOrdersAsync(orders: SignedOrder[], pinned: boolean = true): Promise<ValidationResults> {
await waitForLoadAsync();
if (this._wrapper === undefined) {
// If this is called after startAsync, this._wrapper is always
Expand All @@ -605,7 +609,7 @@ export class Mesh {
return Promise.reject(new Error('Mesh is still loading. Try again soon.'));
}
const meshOrders = orders.map(signedOrderToWrapperSignedOrder);
const meshResults = await this._wrapper.addOrdersAsync(meshOrders);
const meshResults = await this._wrapper.addOrdersAsync(meshOrders, pinned);
return wrapperValidationResultsToValidationResults(meshResults);
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/mesh/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (handler *rpcHandler) GetOrders(page, perPage int, snapshotID string) (*rpc
}

// AddOrders is called when an RPC client calls AddOrders.
func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) {
func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage, opts rpc.AddOrdersOpts) (*ordervalidator.ValidationResults, error) {
log.WithField("count", len(signedOrdersRaw)).Debug("received AddOrders request via RPC")
validationResults, err := handler.app.AddOrders(signedOrdersRaw)
validationResults, err := handler.app.AddOrders(signedOrdersRaw, opts.Pinned)
if err != nil {
// We don't want to leak internal error details to the RPC client.
log.WithField("error", err.Error()).Error("internal error in AddOrders RPC call")
Expand Down
29 changes: 24 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,11 @@ func (app *App) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersR
}

// AddOrders can be used to add orders to Mesh. It validates the given orders
// and if they are valid, will store and eventually broadcast the orders to peers.
func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.ValidationResults, error) {
// and if they are valid, will store and eventually broadcast the orders to
// peers. If pinned is true, the orders will be marked as pinned, which means
// they will only be removed if they become unfillable and will not be removed
// due to having a high expiration time or any incentive mechanisms.
func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage, pinned bool) (*ordervalidator.ValidationResults, error) {
allValidationResults := &ordervalidator.ValidationResults{
Accepted: []*ordervalidator.AcceptedOrderInfo{},
Rejected: []*ordervalidator.RejectedOrderInfo{},
Expand Down Expand Up @@ -686,12 +689,22 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*ordervalidator.V
allValidationResults.Rejected = append(allValidationResults.Rejected, orderInfo)
}

for _, acceptedOrderInfo := range allValidationResults.Accepted {
for i, acceptedOrderInfo := range allValidationResults.Accepted {
// Add the order to the OrderWatcher. This also saves the order in the
// database.
err = app.orderWatcher.Add(acceptedOrderInfo)
err = app.orderWatcher.Add(acceptedOrderInfo, pinned)
if err != nil {
return nil, err
if err == meshdb.ErrDBFilledWithPinnedOrders {
allValidationResults.Accepted = append(allValidationResults.Accepted[:i], allValidationResults.Accepted[i+1:]...)
allValidationResults.Rejected = append(allValidationResults.Rejected, &ordervalidator.RejectedOrderInfo{
OrderHash: acceptedOrderInfo.OrderHash,
SignedOrder: acceptedOrderInfo.SignedOrder,
Kind: ordervalidator.MeshError,
Status: ordervalidator.RODatabaseFullOfOrders,
})
} else {
return nil, err
}
}
// Share the order with our peers.
if err := app.shareOrder(acceptedOrderInfo.SignedOrder); err != nil {
Expand Down Expand Up @@ -737,6 +750,10 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
if err != nil {
return nil, err
}
numPinnedOrders, err := app.db.CountPinnedOrders()
if err != nil {
return nil, err
}

response := &rpc.GetStatsResponse{
Version: version,
Expand All @@ -748,6 +765,7 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
NumOrders: numOrders,
NumPeers: app.node.GetNumPeers(),
NumOrdersIncludingRemoved: numOrdersIncludingRemoved,
NumPinnedOrders: numPinnedOrders,
MaxExpirationTime: app.orderWatcher.MaxExpirationTime().String(),
}
return response, nil
Expand Down Expand Up @@ -776,6 +794,7 @@ func (app *App) periodicallyLogStats(ctx context.Context) {
"latestBlock": stats.LatestBlock,
"numOrders": stats.NumOrders,
"numOrdersIncludingRemoved": stats.NumOrdersIncludingRemoved,
"numPinnedOrders": stats.NumPinnedOrders,
"numPeers": stats.NumPeers,
"maxExpirationTime": stats.MaxExpirationTime,
}).Info("current stats")
Expand Down
12 changes: 11 additions & 1 deletion core/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,17 @@ func (app *App) HandleMessages(messages []*p2p.Message) error {
"from": msg.From.String(),
}).Trace("all fields for new valid order received from peer")
// Add stores the message in the database.
if err := app.orderWatcher.Add(acceptedOrderInfo); err != nil {
if err := app.orderWatcher.Add(acceptedOrderInfo, false); err != nil {
if err == meshdb.ErrDBFilledWithPinnedOrders {
// If the database is full of pinned orders, log and then continue.
log.WithFields(map[string]interface{}{
"error": err.Error(),
"orderHash": acceptedOrderInfo.OrderHash.Hex(),
"from": msg.From.String(),
}).Error("could not store valid order because database is full")
continue
}
// For any other type of error, return it.
return err
}
app.handlePeerScoreEvent(msg.From, psOrderStored)
Expand Down
34 changes: 31 additions & 3 deletions meshdb/meshdb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package meshdb

import (
"errors"
"fmt"
"math/big"
"time"
Expand All @@ -13,6 +14,8 @@ import (
log "github.com/sirupsen/logrus"
)

var ErrDBFilledWithPinnedOrders = errors.New("the database is full of pinned orders; no orders can be removed in order to make space")

// Order is the database representation a 0x order along with some relevant metadata
type Order struct {
Hash common.Hash
Expand All @@ -26,6 +29,9 @@ type Order struct {
// flag it for removal. After this order isn't updated for X time and has IsRemoved = true,
// the order can be permanently deleted.
IsRemoved bool
// IsPinned indicates whether or not the order is pinned. Pinned orders are
// not removed from the database unless they become unfillable.
IsPinned bool
}

// ID returns the Order's ID
Expand Down Expand Up @@ -155,7 +161,14 @@ func setupOrders(database *db.DB) (*OrdersCollection, error) {

expirationTimeIndex := col.AddIndex("expirationTime", func(m db.Model) []byte {
order := m.(*Order)
return uint256ToConstantLengthBytes(order.SignedOrder.ExpirationTimeSeconds)
expTimeString := uint256ToConstantLengthBytes(order.SignedOrder.ExpirationTimeSeconds)
// We separate pinned and non-pinned orders via a prefix that is either 0 or
// 1.
pinnedString := "0"
if order.IsPinned {
pinnedString = "1"
}
return []byte(fmt.Sprintf("%s|%s", pinnedString, expTimeString))
})

return &OrdersCollection{
Expand Down Expand Up @@ -421,8 +434,9 @@ func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirati
return constants.UnlimitedExpirationTime, nil, nil
}

// Find the orders which we need to remove.
filter := m.Orders.ExpirationTimeIndex.All()
// Find the orders which we need to remove. We use a prefix filter of "0|: so
// that we only remove non-pinned orders.
filter := m.Orders.ExpirationTimeIndex.PrefixFilter([]byte("0|"))
numOrdersToRemove := numOrders - targetMaxOrders
if err := m.Orders.NewQuery(filter).Reverse().Max(numOrdersToRemove).Run(&removedOrders); err != nil {
return nil, nil, err
Expand All @@ -438,6 +452,13 @@ func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirati
return nil, nil, err
}

// If we could not remove numOrdersToRemove orders than it means the database
// is full of pinned orders. We still remove as many orders as we can and then
// return an error.
if len(removedOrders) < numOrdersToRemove {
return nil, nil, ErrDBFilledWithPinnedOrders
}

// The new max expiration time is simply the minimum expiration time of the
// orders that were removed (i.e., the expiration time of the last order in
// the slice). We add a buffer of -1 just to make sure we don't exceed
Expand All @@ -448,3 +469,10 @@ func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirati
newMaxExpirationTime = newMaxExpirationTime.Sub(newMaxExpirationTime, big.NewInt(1))
return newMaxExpirationTime, removedOrders, nil
}

// CountPinnedOrders returns the number of pinned orders.
func (m *MeshDB) CountPinnedOrders() (int, error) {
// We use a prefix filter of "1|" so that we only count pinned orders.
filter := m.Orders.ExpirationTimeIndex.PrefixFilter([]byte("1|"))
return m.Orders.NewQuery(filter).Count()
}
103 changes: 79 additions & 24 deletions meshdb/meshdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTrimOrdersByExpirationTime(t *testing.T) {

// Note: most of the fields in these orders are the same. For the purposes of
// this test, the only thing that matters is the Salt and ExpirationTime.
rawOrders := []*zeroex.Order{
rawUnpinnedOrders := []*zeroex.Order{
{
MakerAddress: makerAddress,
TakerAddress: constants.NullAddress,
Expand Down Expand Up @@ -211,8 +211,82 @@ func TestTrimOrdersByExpirationTime(t *testing.T) {
ExchangeAddress: contractAddresses.Exchange,
},
}
rawPinnedOrders := []*zeroex.Order{
{
MakerAddress: makerAddress,
TakerAddress: constants.NullAddress,
SenderAddress: constants.NullAddress,
FeeRecipientAddress: common.HexToAddress("0xa258b39954cef5cb142fd567a46cddb31a670124"),
TakerAssetData: common.Hex2Bytes("f47261b000000000000000000000000034d402f14d58e001d8efbe6585051bf9706aa064"),
MakerAssetData: common.Hex2Bytes("025717920000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c480000000000000000000000000000000000000000000000000000000000000001"),
Salt: big.NewInt(0),
MakerFee: big.NewInt(0),
TakerFee: big.NewInt(0),
MakerAssetAmount: big.NewInt(3551808554499581700),
TakerAssetAmount: big.NewInt(1),
ExpirationTimeSeconds: big.NewInt(250),
ExchangeAddress: contractAddresses.Exchange,
},
{
MakerAddress: makerAddress,
TakerAddress: constants.NullAddress,
SenderAddress: constants.NullAddress,
FeeRecipientAddress: common.HexToAddress("0xa258b39954cef5cb142fd567a46cddb31a670124"),
TakerAssetData: common.Hex2Bytes("f47261b000000000000000000000000034d402f14d58e001d8efbe6585051bf9706aa064"),
MakerAssetData: common.Hex2Bytes("025717920000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c480000000000000000000000000000000000000000000000000000000000000001"),
Salt: big.NewInt(1),
MakerFee: big.NewInt(0),
TakerFee: big.NewInt(0),
MakerAssetAmount: big.NewInt(3551808554499581700),
TakerAssetAmount: big.NewInt(1),
ExpirationTimeSeconds: big.NewInt(350),
ExchangeAddress: contractAddresses.Exchange,
},
}

insertRawOrders(t, meshDB, rawUnpinnedOrders, false)
pinnedOrders := insertRawOrders(t, meshDB, rawPinnedOrders, true)

// Call CalculateNewMaxExpirationTimeAndTrimDatabase and check the results.
targetMaxOrders := 4
gotExpirationTime, gotRemovedOrders, err := meshDB.TrimOrdersByExpirationTime(targetMaxOrders)
require.NoError(t, err)
assert.Equal(t, "199", gotExpirationTime.String(), "newMaxExpirationTime")
assert.Len(t, gotRemovedOrders, 2, "wrong number of orders removed")

// Check that the expiration time of each removed order is >= the new max.
for _, removedOrder := range gotRemovedOrders {
expirationTimeOfRemovedOrder := removedOrder.SignedOrder.ExpirationTimeSeconds
assert.True(t, expirationTimeOfRemovedOrder.Cmp(gotExpirationTime) != -1, "an order was removed with expiration time (%s) less than the new max (%s)", expirationTimeOfRemovedOrder, gotExpirationTime)
}
var remainingOrders []*Order
require.NoError(t, meshDB.Orders.FindAll(&remainingOrders))
assert.Len(t, remainingOrders, 4, "wrong number of orders remaining")

// Check that the expiration time of each remaining order is <= the new max.
for _, remainingOrder := range remainingOrders {
if !remainingOrder.IsPinned {
// Unpinned orders should not have an expiration time greater than the
// new max.
expirationTimeOfRemainingOrder := remainingOrder.SignedOrder.ExpirationTimeSeconds
newMaxPlusOne := big.NewInt(0).Add(gotExpirationTime, big.NewInt(1))
assert.True(t, expirationTimeOfRemainingOrder.Cmp(newMaxPlusOne) != 1, "a remaining order had an expiration time (%s) greater than the new max + 1 (%s)", expirationTimeOfRemainingOrder, newMaxPlusOne)
}
}

// Check that the pinned orders are still in the database.
for _, pinnedOrder := range pinnedOrders {
require.NoError(t, meshDB.Orders.FindByID(pinnedOrder.Hash.Bytes(), &Order{}))
}

// Trying to trim orders when the database is full of pinned orders should
// return an error.
_, _, err = meshDB.TrimOrdersByExpirationTime(1)
assert.EqualError(t, err, ErrDBFilledWithPinnedOrders.Error(), "expected ErrFilledWithPinnedOrders when targetMaxOrders is less than the number of pinned orders")
}

orders := make([]*Order, len(rawOrders))
func insertRawOrders(t *testing.T, meshDB *MeshDB, rawOrders []*zeroex.Order, isPinned bool) []*Order {
results := make([]*Order, len(rawOrders))
for i, order := range rawOrders {
// Sign, compute order hash, and insert.
signedOrder, err := zeroex.SignTestOrder(order)
Expand All @@ -226,29 +300,10 @@ func TestTrimOrdersByExpirationTime(t *testing.T) {
FillableTakerAssetAmount: big.NewInt(1),
LastUpdated: time.Now(),
IsRemoved: false,
IsPinned: isPinned,
}
orders[i] = order
results[i] = order
require.NoError(t, meshDB.Orders.Insert(order))
}

// Call CalculateNewMaxExpirationTimeAndTrimDatabase and check the results.
targetMaxOrders := 2
gotExpirationTime, gotRemovedOrders, err := meshDB.TrimOrdersByExpirationTime(targetMaxOrders)
require.NoError(t, err)
assert.Equal(t, "199", gotExpirationTime.String(), "newMaxExpirationTime")
assert.Len(t, gotRemovedOrders, 2, "wrong number of orders removed")
// Check that the expiration time of each removed order is >= the new max.
for _, removedOrder := range gotRemovedOrders {
expirationTimeOfRemovedOrder := removedOrder.SignedOrder.ExpirationTimeSeconds
assert.True(t, expirationTimeOfRemovedOrder.Cmp(gotExpirationTime) != -1, "an order was removed with expiration time (%s) less than the new max (%s)", expirationTimeOfRemovedOrder, gotExpirationTime)
}
var remainingOrders []*Order
require.NoError(t, meshDB.Orders.FindAll(&remainingOrders))
assert.Len(t, remainingOrders, 2, "wrong number of orders remaining")
// Check that the expiration time of each remaining order is <= the new max.
for _, removedOrder := range remainingOrders {
expirationTimeOfRemovedOrder := removedOrder.SignedOrder.ExpirationTimeSeconds
newMaxPlusOne := big.NewInt(0).Add(gotExpirationTime, big.NewInt(1))
assert.True(t, expirationTimeOfRemovedOrder.Cmp(newMaxPlusOne) != 1, "a remaining order had an expiration time (%s) greater than the new max + 1 (%s)", expirationTimeOfRemovedOrder, newMaxPlusOne)
}
return results
}
Loading

0 comments on commit 0b773a4

Please sign in to comment.