diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index 4799fd2d86..94ac934d53 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -264,13 +264,13 @@ func (_m *ClobKeeper) GetIndexerEventManager() indexer_manager.IndexerEventManag return r0 } -// GetInsuranceFundBalance provides a mock function with given fields: ctx +// GetInsuranceFundBalance provides a mock function with given fields: ctx, perpetualId func (_m *ClobKeeper) GetInsuranceFundBalance(ctx types.Context, perpetualId uint32) *big.Int { - ret := _m.Called(ctx) + ret := _m.Called(ctx, perpetualId) var r0 *big.Int - if rf, ok := ret.Get(0).(func(types.Context) *big.Int); ok { - r0 = rf(ctx) + if rf, ok := ret.Get(0).(func(types.Context, uint32) *big.Int); ok { + r0 = rf(ctx, perpetualId) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*big.Int) @@ -557,6 +557,11 @@ func (_m *ClobKeeper) InitializeEquityTierLimit(ctx types.Context, config clobty return r0 } +// InitializeNewGrpcStreams provides a mock function with given fields: ctx +func (_m *ClobKeeper) InitializeNewGrpcStreams(ctx types.Context) { + _m.Called(ctx) +} + // IsLiquidatable provides a mock function with given fields: ctx, subaccountId func (_m *ClobKeeper) IsLiquidatable(ctx types.Context, subaccountId subaccountstypes.SubaccountId) (bool, error) { ret := _m.Called(ctx, subaccountId) diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index 48da388ea0..fd027b818e 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -153,6 +153,22 @@ func (_m *MemClob) GetMidPrice(ctx types.Context, clobPairId clobtypes.ClobPairI return r0, r1, r2, r3 } +// GetOffchainUpdatesForOrderbookSnapshot provides a mock function with given fields: ctx, clobPairId +func (_m *MemClob) GetOffchainUpdatesForOrderbookSnapshot(ctx types.Context, clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, clobPairId) + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.ClobPairId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, clobPairId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + // GetOperationsRaw provides a mock function with given fields: ctx func (_m *MemClob) GetOperationsRaw(ctx types.Context) []clobtypes.OperationRaw { ret := _m.Called(ctx) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 5ab3fbbde8..95ae0a984e 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -5,6 +5,7 @@ import ( "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -22,8 +23,14 @@ type GrpcStreamingManagerImpl struct { // OrderbookSubscription represents a active subscription to the orderbook updates stream. type OrderbookSubscription struct { + // Initialize the subscription with orderbook snapshots. + initialize sync.Once + + // Clob pair ids to subscribe to. clobPairIds []uint32 - srv clobtypes.Query_StreamOrderbookUpdatesServer + + // Stream + srv clobtypes.Query_StreamOrderbookUpdatesServer } func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { @@ -68,6 +75,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( // sends messages to the subscribers. func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, + snapshot bool, ) { // Group updates by clob pair id. updates := make(map[uint32]*clobtypes.OffchainUpdates) @@ -100,7 +108,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( if err := subscription.srv.Send( &clobtypes.StreamOrderbookUpdatesResponse{ Updates: updates, - Snapshot: false, + Snapshot: snapshot, }, ); err != nil { idsToRemove = append(idsToRemove, id) @@ -117,6 +125,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } } +// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized. +func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 { + sm.Lock() + defer sm.Unlock() + + clobPairIds := make(map[uint32]bool) + for _, subscription := range sm.orderbookSubscriptions { + subscription.initialize.Do( + func() { + for _, clobPairId := range subscription.clobPairIds { + clobPairIds[clobPairId] = true + } + }, + ) + } + + return lib.GetSortedKeys[lib.Sortable[uint32]](clobPairIds) +} + // GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1. func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) { v1updates := make([]ocutypes.OffChainUpdateV1, 0) diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 467b18864b..f8cc229772 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -28,5 +28,10 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, + snapshot bool, ) { } + +func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { + return []uint32{} +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 49882209a6..0027358e79 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -14,5 +14,9 @@ type GrpcStreamingManager interface { ) ( err error, ) - SendOrderbookUpdates(*clobtypes.OffchainUpdates) + GetUninitializedClobPairIds() []uint32 + SendOrderbookUpdates( + offchainUpdates *clobtypes.OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index bc26300040..a2201e3035 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -245,6 +245,9 @@ func PrepareCheckState( types.GetInternalOperationsQueueTextString(newLocalValidatorOperationsQueue), ) + // Initialize new GRPC streams with orderbook snapshots, if any. + keeper.InitializeNewGrpcStreams(ctx) + // Set per-orderbook gauges. keeper.MemClob.SetMemclobGauges(ctx) } diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 13af8452b5..370ef8d3ac 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -219,3 +219,22 @@ func (k Keeper) InitMemStore(ctx sdk.Context) { func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) { k.antehandler = anteHandler } + +// InitializeNewGrpcStreams initializes new gRPC streams for all uninitialized clob pairs +// by sending the corresponding orderbook snapshots. +func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { + streamingManager := k.GetGrpcStreamingManager() + allUpdates := types.NewOffchainUpdates() + + uninitializedClobPairIds := streamingManager.GetUninitializedClobPairIds() + for _, clobPairId := range uninitializedClobPairIds { + update := k.MemClob.GetOffchainUpdatesForOrderbookSnapshot( + ctx, + types.ClobPairId(clobPairId), + ) + + allUpdates.Append(update) + } + + streamingManager.SendOrderbookUpdates(allUpdates, true) +} diff --git a/protocol/x/clob/keeper/orders.go b/protocol/x/clob/keeper/orders.go index 4df957e2db..db709c469d 100644 --- a/protocol/x/clob/keeper/orders.go +++ b/protocol/x/clob/keeper/orders.go @@ -1312,7 +1312,7 @@ func (k Keeper) SendOffchainMessages( k.GetIndexerEventManager().SendOffchainData(update) } - k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates) + k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false) } // getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go new file mode 100644 index 0000000000..a1c86d4d0c --- /dev/null +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -0,0 +1,88 @@ +package memclob + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" + "github.com/dydxprotocol/v4-chain/protocol/lib" + "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +// GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot. +// This is used by the gRPC streaming server to send the orderbook snapshot to the client. +func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( + ctx sdk.Context, + clobPairId types.ClobPairId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + + if orderbook, exists := m.openOrders.orderbooksMap[clobPairId]; exists { + // Generate the offchain updates for buy orders. + // Updates are sorted in descending order of price. + buyPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Bids) + for i := len(buyPriceLevels) - 1; i >= 0; i-- { + subticks := buyPriceLevels[i] + level := orderbook.Bids[subticks] + + // For each price level, generate offchain updates for each order in the level. + level.LevelOrders.Front.Each( + func(order types.ClobOrder) { + offchainUpdates.Append( + m.GetOffchainUpdatesForOrder(ctx, order.Order), + ) + }, + ) + } + + // Generate the offchain updates for sell orders. + // Updates are sorted in ascending order of price. + sellPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Asks) + for i := 0; i < len(sellPriceLevels); i++ { + subticks := sellPriceLevels[i] + level := orderbook.Asks[subticks] + + // For each price level, generate offchain updates for each order in the level. + level.LevelOrders.Front.Each( + func(order types.ClobOrder) { + offchainUpdates.Append( + m.GetOffchainUpdatesForOrder(ctx, order.Order), + ) + }, + ) + } + } + + return offchainUpdates +} + +// GetOffchainUpdatesForOrder returns a place order offchain message and +// a update order offchain message used to construct an order for +// the orderbook snapshot grpc stream. +func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( + ctx sdk.Context, + order types.Order, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + orderId := order.OrderId + + // Generate a order place message. + if message, success := off_chain_updates.CreateOrderPlaceMessage( + ctx, + order, + ); success { + offchainUpdates.AddPlaceMessage(orderId, message) + } + + // Get the current fill amount of the order. + fillAmount := m.GetOrderFilledAmount(ctx, orderId) + + // Generate an update message updating the total filled amount of order. + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + fillAmount, + ); success { + offchainUpdates.AddUpdateMessage(orderId, message) + } + + return offchainUpdates +} diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go new file mode 100644 index 0000000000..e5bd2bf3f2 --- /dev/null +++ b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go @@ -0,0 +1,91 @@ +package memclob + +import ( + "testing" + + "github.com/dydxprotocol/v4-chain/protocol/mocks" + "github.com/dydxprotocol/v4-chain/protocol/testutil/constants" + sdktest "github.com/dydxprotocol/v4-chain/protocol/testutil/sdk" + "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { + ctx, _, _ := sdktest.NewSdkContextWithMultistore() + + clobKeeper := &mocks.MemClobKeeper{} + clobKeeper.On( + "GetOrderFillAmount", + mock.Anything, + mock.Anything, + ).Return(false, satypes.BaseQuantums(0), uint32(0)) + + memclob := NewMemClobPriceTimePriority(false) + memclob.SetClobKeeper(clobKeeper) + + memclob.CreateOrderbook(ctx, constants.ClobPair_Btc) + + orders := []types.Order{ + constants.Order_Alice_Num0_Id1_Clob0_Buy15_Price10_GTB18_PO, + constants.Order_Alice_Num0_Id0_Clob0_Buy10_Price10_GTB16, + constants.Order_Bob_Num0_Id12_Clob0_Buy5_Price40_GTB20, + } + + for _, order := range orders { + memclob.mustAddOrderToOrderbook(ctx, order, false) + } + + offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot( + ctx, + constants.ClobPair_Btc.GetClobPairId(), + ) + + expected := types.NewOffchainUpdates() + // Buy orders are in descending order. + expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) + expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) + expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) + + require.Equal(t, expected, offchainUpdates) +} + +func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { + ctx, _, _ := sdktest.NewSdkContextWithMultistore() + + clobKeeper := &mocks.MemClobKeeper{} + clobKeeper.On( + "GetOrderFillAmount", + mock.Anything, + mock.Anything, + ).Return(false, satypes.BaseQuantums(0), uint32(0)) + + memclob := NewMemClobPriceTimePriority(false) + memclob.SetClobKeeper(clobKeeper) + + memclob.CreateOrderbook(ctx, constants.ClobPair_Btc) + + orders := []types.Order{ + constants.Order_Bob_Num0_Id12_Clob0_Sell20_Price35_GTB32, + constants.Order_Alice_Num0_Id0_Clob0_Sell5_Price10_GTB20, + constants.Order_Alice_Num0_Id1_Clob0_Sell15_Price10_GTB18_PO, + } + + for _, order := range orders { + memclob.mustAddOrderToOrderbook(ctx, order, false) + } + + offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot( + ctx, + constants.ClobPair_Btc.GetClobPairId(), + ) + + expected := types.NewOffchainUpdates() + // Sell orders are in ascending order. + expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) + expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) + expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) + + require.Equal(t, expected, offchainUpdates) +} diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index 02b64e7c25..795b3d3f61 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -131,4 +131,5 @@ type ClobKeeper interface { clobPair ClobPair, ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error + InitializeNewGrpcStreams(ctx sdk.Context) } diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index d80d5b1345..feb4ab6f49 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -133,4 +133,8 @@ type MemClob interface { subaccountId satypes.SubaccountId, perpetualId uint32, ) + GetOffchainUpdatesForOrderbookSnapshot( + ctx sdk.Context, + clobPairId ClobPairId, + ) (offchainUpdates *OffchainUpdates) }