From d50c3a191850a35289bbb648248bc21c9058ae32 Mon Sep 17 00:00:00 2001 From: Chenyao Yu <4844716+chenyaoy@users.noreply.github.com> Date: Wed, 19 Jun 2024 19:39:39 -0400 Subject: [PATCH] [CT-856] Add order replacement to fix vault causing orderbook flickering (#1602) --- protocol/app/app.go | 1 + protocol/indexer/events/stateful_order.go | 17 +++++++ protocol/mocks/ClobKeeper.go | 8 +-- protocol/testutil/keeper/vault.go | 1 + .../x/clob/keeper/msg_server_cancel_orders.go | 25 +++++----- .../x/clob/keeper/msg_server_place_order.go | 40 ++++++++------- protocol/x/clob/types/clob_keeper.go | 1 + protocol/x/vault/keeper/keeper.go | 36 ++++++++------ protocol/x/vault/keeper/orders.go | 39 +++++++++++++-- protocol/x/vault/keeper/orders_test.go | 49 +++++++++++++++++-- protocol/x/vault/types/expected_keepers.go | 1 + 11 files changed, 162 insertions(+), 56 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index a7fc4cfd0a..50615e744f 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1090,6 +1090,7 @@ func New( app.PerpetualsKeeper, app.PricesKeeper, app.SubaccountsKeeper, + app.IndexerEventManager, []string{ lib.GovModuleAddress.String(), delaymsgmoduletypes.ModuleAddress.String(), diff --git a/protocol/indexer/events/stateful_order.go b/protocol/indexer/events/stateful_order.go index f1a0d029db..d5cc0c537d 100644 --- a/protocol/indexer/events/stateful_order.go +++ b/protocol/indexer/events/stateful_order.go @@ -20,6 +20,23 @@ func NewLongTermOrderPlacementEvent( } } +func NewLongTermOrderReplacementEvent( + oldOrderId clobtypes.OrderId, + order clobtypes.Order, +) *StatefulOrderEventV1 { + oldIndexerOrderId := v1.OrderIdToIndexerOrderId(oldOrderId) + indexerOrder := v1.OrderToIndexerOrder(order) + orderReplace := StatefulOrderEventV1_LongTermOrderReplacementV1{ + OldOrderId: &oldIndexerOrderId, + Order: &indexerOrder, + } + return &StatefulOrderEventV1{ + Event: &StatefulOrderEventV1_OrderReplacement{ + OrderReplacement: &orderReplace, + }, + } +} + func NewStatefulOrderRemovalEvent( removedOrderId clobtypes.OrderId, reason sharedtypes.OrderRemovalReason, diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index 44e0e938f5..c45d843acb 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -626,16 +626,16 @@ func (_m *ClobKeeper) GetSubaccountMaxNotionalLiquidatable(ctx types.Context, su } // HandleMsgCancelOrder provides a mock function with given fields: ctx, msg -func (_m *ClobKeeper) HandleMsgCancelOrder(ctx types.Context, msg *clobtypes.MsgCancelOrder) error { - ret := _m.Called(ctx, msg) +func (_m *ClobKeeper) HandleMsgCancelOrder(ctx types.Context, msg *clobtypes.MsgCancelOrder, isInternalOrder bool) error { + ret := _m.Called(ctx, msg, isInternalOrder) if len(ret) == 0 { panic("no return value specified for HandleMsgCancelOrder") } var r0 error - if rf, ok := ret.Get(0).(func(types.Context, *clobtypes.MsgCancelOrder) error); ok { - r0 = rf(ctx, msg) + if rf, ok := ret.Get(0).(func(types.Context, *clobtypes.MsgCancelOrder, bool) error); ok { + r0 = rf(ctx, msg, isInternalOrder) } else { r0 = ret.Error(0) } diff --git a/protocol/testutil/keeper/vault.go b/protocol/testutil/keeper/vault.go index 73b18c8940..911980eedc 100644 --- a/protocol/testutil/keeper/vault.go +++ b/protocol/testutil/keeper/vault.go @@ -57,6 +57,7 @@ func createVaultKeeper( &mocks.PerpetualsKeeper{}, &mocks.PricesKeeper{}, &mocks.SubaccountsKeeper{}, + &mocks.IndexerEventManager{}, []string{ lib.GovModuleAddress.String(), delaymsgtypes.ModuleAddress.String(), diff --git a/protocol/x/clob/keeper/msg_server_cancel_orders.go b/protocol/x/clob/keeper/msg_server_cancel_orders.go index b5165d4bc1..02610fd93d 100644 --- a/protocol/x/clob/keeper/msg_server_cancel_orders.go +++ b/protocol/x/clob/keeper/msg_server_cancel_orders.go @@ -23,7 +23,7 @@ func (k msgServer) CancelOrder( ) (resp *types.MsgCancelOrderResponse, err error) { ctx := lib.UnwrapSDKContext(goCtx, types.ModuleName) - if err := k.Keeper.HandleMsgCancelOrder(ctx, msg); err != nil { + if err := k.Keeper.HandleMsgCancelOrder(ctx, msg, false); err != nil { return nil, err } @@ -37,6 +37,7 @@ func (k msgServer) CancelOrder( func (k Keeper) HandleMsgCancelOrder( ctx sdk.Context, msg *types.MsgCancelOrder, + isInternalOrder bool, ) (err error) { lib.AssertDeliverTxMode(ctx) @@ -110,17 +111,19 @@ func (k Keeper) HandleMsgCancelOrder( k.MustSetProcessProposerMatchesEvents(ctx, processProposerMatchesEvents) // 4. Add the relevant on-chain Indexer event for the cancellation. - k.GetIndexerEventManager().AddTxnEvent( - ctx, - indexerevents.SubtypeStatefulOrder, - indexerevents.StatefulOrderEventVersion, - indexer_manager.GetBytes( - indexerevents.NewStatefulOrderRemovalEvent( - msg.OrderId, - indexershared.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED, + if !isInternalOrder { // vault order indexer event logic is handled elsewhere + k.GetIndexerEventManager().AddTxnEvent( + ctx, + indexerevents.SubtypeStatefulOrder, + indexerevents.StatefulOrderEventVersion, + indexer_manager.GetBytes( + indexerevents.NewStatefulOrderRemovalEvent( + msg.OrderId, + indexershared.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED, + ), ), - ), - ) + ) + } return nil } diff --git a/protocol/x/clob/keeper/msg_server_place_order.go b/protocol/x/clob/keeper/msg_server_place_order.go index 5db309df39..d3801d5ad3 100644 --- a/protocol/x/clob/keeper/msg_server_place_order.go +++ b/protocol/x/clob/keeper/msg_server_place_order.go @@ -114,31 +114,35 @@ func (k Keeper) HandleMsgPlaceOrder( // 4. Emit the new order placement indexer event. if order.IsConditionalOrder() { - k.GetIndexerEventManager().AddTxnEvent( - ctx, - indexerevents.SubtypeStatefulOrder, - indexerevents.StatefulOrderEventVersion, - indexer_manager.GetBytes( - indexerevents.NewConditionalOrderPlacementEvent( - order, + if !isInternalOrder { // vault order indexer event logic is handled elsewhere + k.GetIndexerEventManager().AddTxnEvent( + ctx, + indexerevents.SubtypeStatefulOrder, + indexerevents.StatefulOrderEventVersion, + indexer_manager.GetBytes( + indexerevents.NewConditionalOrderPlacementEvent( + order, + ), ), - ), - ) + ) + } processProposerMatchesEvents.PlacedConditionalOrderIds = append( processProposerMatchesEvents.PlacedConditionalOrderIds, order.OrderId, ) } else { - k.GetIndexerEventManager().AddTxnEvent( - ctx, - indexerevents.SubtypeStatefulOrder, - indexerevents.StatefulOrderEventVersion, - indexer_manager.GetBytes( - indexerevents.NewLongTermOrderPlacementEvent( - order, + if !isInternalOrder { // vault order indexer event logic is handled elsewhere + k.GetIndexerEventManager().AddTxnEvent( + ctx, + indexerevents.SubtypeStatefulOrder, + indexerevents.StatefulOrderEventVersion, + indexer_manager.GetBytes( + indexerevents.NewLongTermOrderPlacementEvent( + order, + ), ), - ), - ) + ) + } processProposerMatchesEvents.PlacedLongTermOrderIds = append( processProposerMatchesEvents.PlacedLongTermOrderIds, order.OrderId, diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index 3993d6036c..e60150cf50 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -46,6 +46,7 @@ type ClobKeeper interface { HandleMsgCancelOrder( ctx sdk.Context, msg *MsgCancelOrder, + isInternalOrder bool, ) (err error) HandleMsgPlaceOrder( ctx sdk.Context, diff --git a/protocol/x/vault/keeper/keeper.go b/protocol/x/vault/keeper/keeper.go index 9fe75f116f..ff33edc5c2 100644 --- a/protocol/x/vault/keeper/keeper.go +++ b/protocol/x/vault/keeper/keeper.go @@ -7,19 +7,21 @@ import ( storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/vault/types" ) type ( Keeper struct { - cdc codec.BinaryCodec - storeKey storetypes.StoreKey - clobKeeper types.ClobKeeper - perpetualsKeeper types.PerpetualsKeeper - pricesKeeper types.PricesKeeper - subaccountsKeeper types.SubaccountsKeeper - authorities map[string]struct{} + cdc codec.BinaryCodec + storeKey storetypes.StoreKey + clobKeeper types.ClobKeeper + perpetualsKeeper types.PerpetualsKeeper + pricesKeeper types.PricesKeeper + subaccountsKeeper types.SubaccountsKeeper + indexerEventManager indexer_manager.IndexerEventManager + authorities map[string]struct{} } ) @@ -30,19 +32,25 @@ func NewKeeper( perpetualsKeeper types.PerpetualsKeeper, pricesKeeper types.PricesKeeper, subaccountsKeeper types.SubaccountsKeeper, + indexerEventManager indexer_manager.IndexerEventManager, authorities []string, ) *Keeper { return &Keeper{ - cdc: cdc, - storeKey: storeKey, - clobKeeper: clobKeeper, - perpetualsKeeper: perpetualsKeeper, - pricesKeeper: pricesKeeper, - subaccountsKeeper: subaccountsKeeper, - authorities: lib.UniqueSliceToSet(authorities), + cdc: cdc, + storeKey: storeKey, + clobKeeper: clobKeeper, + perpetualsKeeper: perpetualsKeeper, + pricesKeeper: pricesKeeper, + subaccountsKeeper: subaccountsKeeper, + indexerEventManager: indexerEventManager, + authorities: lib.UniqueSliceToSet(authorities), } } +func (k Keeper) GetIndexerEventManager() indexer_manager.IndexerEventManager { + return k.indexerEventManager +} + func (k Keeper) HasAuthority(authority string) bool { _, ok := k.authorities[authority] return ok diff --git a/protocol/x/vault/keeper/orders.go b/protocol/x/vault/keeper/orders.go index 5d8054dcc5..51cadf2b94 100644 --- a/protocol/x/vault/keeper/orders.go +++ b/protocol/x/vault/keeper/orders.go @@ -6,6 +6,8 @@ import ( errorsmod "cosmossdk.io/errors" sdk "github.com/cosmos/cosmos-sdk/types" + indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events" + "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/lib/log" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" @@ -24,6 +26,7 @@ func (k Keeper) RefreshAllVaultOrders(ctx sdk.Context) { defer totalSharesIterator.Close() for ; totalSharesIterator.Valid(); totalSharesIterator.Next() { vaultId, err := types.GetVaultIdFromStateKey(totalSharesIterator.Key()) + if err != nil { log.ErrorLogWithError(ctx, "Failed to get vault ID from state key", err) continue @@ -84,7 +87,7 @@ func (k Keeper) RefreshVaultClobOrders(ctx sdk.Context, vaultId types.VaultId) ( err := k.clobKeeper.HandleMsgCancelOrder(ctx, clobtypes.NewMsgCancelOrderStateful( order.OrderId, uint32(ctx.BlockTime().Unix())+orderExpirationSeconds, - )) + ), true) if err != nil { log.ErrorLogWithError(ctx, "Failed to cancel order", err, "order", order, "vaultId", vaultId) } @@ -94,24 +97,52 @@ func (k Keeper) RefreshVaultClobOrders(ctx sdk.Context, vaultId types.VaultId) ( ) } } - // Place new CLOB orders. ordersToPlace, err := k.GetVaultClobOrders(ctx, vaultId) if err != nil { log.ErrorLogWithError(ctx, "Failed to get vault clob orders to place", err, "vaultId", vaultId) return err } - for _, order := range ordersToPlace { + + for i, order := range ordersToPlace { err := k.PlaceVaultClobOrder(ctx, order) if err != nil { log.ErrorLogWithError(ctx, "Failed to place order", err, "order", order, "vaultId", vaultId) } + vaultId.IncrCounterWithLabels( metrics.VaultPlaceOrder, metrics.GetLabelForBoolValue(metrics.Success, err == nil), ) - } + // Send indexer messages. We expect ordersToCancel and ordersToPlace to have the same length + // and the order to place at each index to be a replacement of the order to cancel at the same index. + replacedOrder := ordersToCancel[i] + if replacedOrder == nil { + k.GetIndexerEventManager().AddTxnEvent( + ctx, + indexerevents.SubtypeStatefulOrder, + indexerevents.StatefulOrderEventVersion, + indexer_manager.GetBytes( + indexerevents.NewLongTermOrderPlacementEvent( + *order, + ), + ), + ) + } else { + k.GetIndexerEventManager().AddTxnEvent( + ctx, + indexerevents.SubtypeStatefulOrder, + indexerevents.StatefulOrderEventVersion, + indexer_manager.GetBytes( + indexerevents.NewLongTermOrderReplacementEvent( + replacedOrder.OrderId, + *order, + ), + ), + ) + } + } return nil } diff --git a/protocol/x/vault/keeper/orders_test.go b/protocol/x/vault/keeper/orders_test.go index 371288ef70..6f0fad2db4 100644 --- a/protocol/x/vault/keeper/orders_test.go +++ b/protocol/x/vault/keeper/orders_test.go @@ -7,6 +7,10 @@ import ( "github.com/cometbft/cometbft/types" "github.com/dydxprotocol/v4-chain/protocol/dtypes" + "github.com/dydxprotocol/v4-chain/protocol/indexer" + indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events" + "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" + "github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender" testapp "github.com/dydxprotocol/v4-chain/protocol/testutil/app" "github.com/dydxprotocol/v4-chain/protocol/testutil/constants" assettypes "github.com/dydxprotocol/v4-chain/protocol/x/assets/types" @@ -105,11 +109,16 @@ func TestRefreshAllVaultOrders(t *testing.T) { activationThresholdQuoteQuantums: big.NewInt(123_456_789), }, } - for name, tc := range tests { t.Run(name, func(t *testing.T) { + // Enable testapp's indexer event manager + msgSender := msgsender.NewIndexerMessageSenderInMemoryCollector() + appOpts := map[string]interface{}{ + indexer.MsgSenderInstanceForTest: msgSender, + } + // Initialize tApp and ctx (in deliverTx mode). - tApp := testapp.NewTestAppBuilder(t).WithGenesisDocFn(func() (genesis types.GenesisDoc) { + tApp := testapp.NewTestAppBuilder(t).WithAppOptions(appOpts).WithGenesisDocFn(func() (genesis types.GenesisDoc) { genesis = testapp.DefaultGenesis() // Initialize each vault with quote quantums to be able to place orders. testapp.UpdateGenesisDocWithAppStateForModule( @@ -160,6 +169,7 @@ func TestRefreshAllVaultOrders(t *testing.T) { // Simulate vault orders placed in last block. numPreviousOrders := 0 + previousOrders := make(map[vaulttypes.VaultId][]*clobtypes.Order) for i, vaultId := range tc.vaultIds { if tc.totalShares[i].Sign() > 0 && tc.assetQuantums[i].Cmp(tc.activationThresholdQuoteQuantums) >= 0 { orders, err := tApp.App.VaultKeeper.GetVaultClobOrders( @@ -171,6 +181,7 @@ func TestRefreshAllVaultOrders(t *testing.T) { err := tApp.App.VaultKeeper.PlaceVaultClobOrder(ctx, order) require.NoError(t, err) } + previousOrders[vaultId] = orders numPreviousOrders += len(orders) } } @@ -183,13 +194,34 @@ func TestRefreshAllVaultOrders(t *testing.T) { // cancelled and orders from this block have been placed. numExpectedOrders := 0 allExpectedOrderIds := make(map[clobtypes.OrderId]bool) - for i, vaultId := range tc.vaultIds { - if tc.totalShares[i].Sign() > 0 && tc.assetQuantums[i].Cmp(tc.activationThresholdQuoteQuantums) >= 0 { + expectedIndexerEvents := make([]indexer_manager.IndexerTendermintEvent, 0) + indexerEventIndex := 0 + for vault_index, vaultId := range tc.vaultIds { + if tc.totalShares[vault_index].Sign() > 0 && + tc.assetQuantums[vault_index].Cmp(tc.activationThresholdQuoteQuantums) >= 0 { expectedOrders, err := tApp.App.VaultKeeper.GetVaultClobOrders(ctx, vaultId) require.NoError(t, err) numExpectedOrders += len(expectedOrders) - for _, order := range expectedOrders { + ordersToCancel := previousOrders[vaultId] + for i, order := range expectedOrders { allExpectedOrderIds[order.OrderId] = true + orderToCancel := ordersToCancel[i] + event := indexer_manager.IndexerTendermintEvent{ + Subtype: indexerevents.SubtypeStatefulOrder, + OrderingWithinBlock: &indexer_manager.IndexerTendermintEvent_TransactionIndex{ + TransactionIndex: 0, + }, + EventIndex: uint32(indexerEventIndex), + Version: indexerevents.StatefulOrderEventVersion, + DataBytes: indexer_manager.GetBytes( + indexerevents.NewLongTermOrderReplacementEvent( + orderToCancel.OrderId, + *order, + ), + ), + } + indexerEventIndex += 1 + expectedIndexerEvents = append(expectedIndexerEvents, event) } } } @@ -198,6 +230,13 @@ func TestRefreshAllVaultOrders(t *testing.T) { for _, order := range allStatefulOrders { require.True(t, allExpectedOrderIds[order.OrderId]) } + + // test that the indexer events emitted are as expected + block := tApp.App.VaultKeeper.GetIndexerEventManager().ProduceBlock(ctx) + require.Len(t, block.Events, numExpectedOrders) + for i, event := range block.Events { + require.Equal(t, expectedIndexerEvents[i], *event) + } }) } } diff --git a/protocol/x/vault/types/expected_keepers.go b/protocol/x/vault/types/expected_keepers.go index ae2d63a451..8411769bf4 100644 --- a/protocol/x/vault/types/expected_keepers.go +++ b/protocol/x/vault/types/expected_keepers.go @@ -22,6 +22,7 @@ type ClobKeeper interface { HandleMsgCancelOrder( ctx sdk.Context, msg *clobtypes.MsgCancelOrder, + isInternalOrder bool, ) (err error) HandleMsgPlaceOrder( ctx sdk.Context,