diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index fc5a177f70..1b2fd17fa4 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -1130,6 +1130,11 @@ func (_m *ClobKeeper) RemoveOrderFillAmount(ctx types.Context, orderId clobtypes _m.Called(ctx, orderId) } +// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot +func (_m *ClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *ClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 641f107d19..e9b6d3456a 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -434,6 +434,11 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } +// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot +func (_m *MemClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *MemClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 95ae0a984e..8db6daca6b 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -103,19 +103,21 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) for _, clobPairId := range subscription.clobPairIds { if updates, ok := v1updates[clobPairId]; ok { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updates, - Snapshot: snapshot, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - break - } + updatesToSend = append(updatesToSend, updates...) } } + + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + Snapshot: snapshot, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } } // Clean up subscriptions that have been closed. diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 83b368608f..6ba8856285 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -511,3 +511,9 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForStatefulOrder( func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger() } + +func (f *FakeMemClobKeeper) SendOrderbookUpdates( + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { +} diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 300cc2a066..612c59fadd 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -233,5 +233,13 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { allUpdates.Append(update) } - streamingManager.SendOrderbookUpdates(allUpdates, true) + k.SendOrderbookUpdates(allUpdates, true) +} + +// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. +func (k Keeper) SendOrderbookUpdates( + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { + k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot) } diff --git a/protocol/x/clob/keeper/orders.go b/protocol/x/clob/keeper/orders.go index f65aff2fce..a2c7773557 100644 --- a/protocol/x/clob/keeper/orders.go +++ b/protocol/x/clob/keeper/orders.go @@ -1310,8 +1310,6 @@ func (k Keeper) SendOffchainMessages( } k.GetIndexerEventManager().SendOffchainData(update) } - - k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false) } // getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks. diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 8f17780a85..7cbcc4b471 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -1216,6 +1216,18 @@ func (m *MemClobPriceTimePriority) PurgeInvalidMemclobState( m.RemoveOrderIfFilled(ctx, orderId) } + // For order matches in the last block, send an orderbook update + // to the grpc streams. + // Note that fully filled orders are removed from the orderbook in `RemoveOrderIfFilled` above. + allUpdates := types.NewOffchainUpdates() + for _, orderId := range filledOrderIds { + if m.openOrders.hasOrder(ctx, orderId) { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + } + m.clobKeeper.SendOrderbookUpdates(allUpdates, false) + // Remove all canceled stateful order IDs from the memclob if they exist. // If the slice has non-stateful order IDs or contains duplicates, panic. if lib.ContainsDuplicates(canceledStatefulOrderIds) { @@ -1510,6 +1522,10 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( } m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel) + + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) } // mustPerformTakerOrderMatching performs matching using the provided taker order while the order @@ -1944,6 +1960,10 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( !m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) { m.operationsToPropose.RemoveShortTermOrderTxBytes(order) } + + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) } // mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order. @@ -1969,6 +1989,13 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder m.mustRemoveOrder(ctx, makerOrderId) } + // If the order was fully filled, an orderbook update for removal was already sent in `mustRemoveOrder`. + // If the order was partially filled, send an orderbook update for the order's new total filled amount. + if newTotalFilledAmount < makerOrderBaseQuantums { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } + if m.generateOffchainUpdates { // Send an off-chain update message to the indexer to update the total filled size of the maker // order. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index a1c86d4d0c..7eee3594b7 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -3,6 +3,8 @@ package memclob import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( return offchainUpdates } -// GetOffchainUpdatesForOrder returns a place order offchain message and +// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and // a update order offchain message used to construct an order for // the orderbook snapshot grpc stream. -func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement( ctx sdk.Context, order types.Order, ) (offchainUpdates *types.OffchainUpdates) { @@ -86,3 +88,41 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( return offchainUpdates } + +// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason( + ctx, + orderId, + indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED, + ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, + ); success { + offchainUpdates.AddRemoveMessage(orderId, message) + } + return offchainUpdates +} + +// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + + // Get the current fill amount of the order. + fillAmount := m.GetOrderFilledAmount(ctx, orderId) + + // Generate an update message updating the total filled amount of order. + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + fillAmount, + ); success { + offchainUpdates.AddUpdateMessage(orderId, message) + } + return offchainUpdates +} diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go index e5bd2bf3f2..5a865b25f9 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go @@ -21,6 +21,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { mock.Anything, mock.Anything, ).Return(false, satypes.BaseQuantums(0), uint32(0)) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -44,9 +45,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { expected := types.NewOffchainUpdates() // Buy orders are in descending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) require.Equal(t, expected, offchainUpdates) } @@ -60,6 +61,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { mock.Anything, mock.Anything, ).Return(false, satypes.BaseQuantums(0), uint32(0)) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -83,9 +85,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { expected := types.NewOffchainUpdates() // Sell orders are in ascending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) require.Equal(t, expected, offchainUpdates) } diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index 474d799b03..d96b046ad5 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -144,5 +144,10 @@ type ClobKeeper interface { clobPair ClobPair, ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error + // Gprc streaming InitializeNewGrpcStreams(ctx sdk.Context) + SendOrderbookUpdates( + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 298e701cb9..7ca02f2569 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -115,4 +115,8 @@ type MemClobKeeper interface { Logger( ctx sdk.Context, ) log.Logger + SendOrderbookUpdates( + offchainUpdates *OffchainUpdates, + snapshot bool, + ) }