diff --git a/protocol/app/app.go b/protocol/app/app.go index 608f12cb8f..0d61c32c2b 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -885,7 +885,9 @@ func New( clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts) logger.Info("Parsed CLOB flags", "Flags", clobFlags) - memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled()) + memClob := clobmodulememclob.NewMemClobPriceTimePriority( + app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(), + ) app.ClobKeeper = clobmodulekeeper.NewKeeper( appCodec, @@ -1610,8 +1612,5 @@ func getGrpcStreamingManagerFromOptions( logger log.Logger, ) (manager streamingtypes.GrpcStreamingManager) { // TODO(CT-625): add command line flags for full node streaming. - if appFlags.NonValidatingFullNode { - return streaming.NewGrpcStreamingManager() - } - return streaming.NewNoopGrpcStreamingManager() + return streaming.NewGrpcStreamingManager() } diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 6b9250145f..5ab3fbbde8 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -1,6 +1,10 @@ package grpc import ( + "sync" + + "github.com/cosmos/gogoproto/proto" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -9,10 +13,23 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) // GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions. type GrpcStreamingManagerImpl struct { + sync.Mutex + + // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. + orderbookSubscriptions map[uint32]*OrderbookSubscription + nextSubscriptionId uint32 +} + +// OrderbookSubscription represents a active subscription to the orderbook updates stream. +type OrderbookSubscription struct { + clobPairIds []uint32 + srv clobtypes.Query_StreamOrderbookUpdatesServer } func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { - return &GrpcStreamingManagerImpl{} + return &GrpcStreamingManagerImpl{ + orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), + } } func (sm *GrpcStreamingManagerImpl) Enabled() bool { @@ -24,15 +41,92 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, srv clobtypes.Query_StreamOrderbookUpdatesServer, ) ( - finished chan bool, err error, ) { - return nil, nil + clobPairIds := req.GetClobPairId() + + // Perform some basic validation on the request. + if len(clobPairIds) == 0 { + return clobtypes.ErrInvalidGrpcStreamingRequest + } + + subscription := &OrderbookSubscription{ + clobPairIds: clobPairIds, + srv: srv, + } + + sm.Lock() + defer sm.Unlock() + + sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription + sm.nextSubscriptionId++ + + return nil } // SendOrderbookUpdates groups updates by their clob pair ids and // sends messages to the subscribers. func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( - updates *clobtypes.OffchainUpdates, + offchainUpdates *clobtypes.OffchainUpdates, ) { + // Group updates by clob pair id. + updates := make(map[uint32]*clobtypes.OffchainUpdates) + for _, message := range offchainUpdates.Messages { + clobPairId := message.OrderId.ClobPairId + if _, ok := updates[clobPairId]; !ok { + updates[clobPairId] = clobtypes.NewOffchainUpdates() + } + updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) + } + + // Unmarshal messages to v1 updates. + v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + for clobPairId, update := range updates { + v1update, err := GetOffchainUpdatesV1(update) + if err != nil { + panic(err) + } + v1updates[clobPairId] = v1update + } + + sm.Lock() + defer sm.Unlock() + + // Send updates to subscribers. + idsToRemove := make([]uint32, 0) + for id, subscription := range sm.orderbookSubscriptions { + for _, clobPairId := range subscription.clobPairIds { + if updates, ok := v1updates[clobPairId]; ok { + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updates, + Snapshot: false, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + break + } + } + } + } + + // Clean up subscriptions that have been closed. + // If a Send update has failed for any clob pair id, the whole subscription will be removed. + for _, id := range idsToRemove { + delete(sm.orderbookSubscriptions, id) + } +} + +// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1. +func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) { + v1updates := make([]ocutypes.OffChainUpdateV1, 0) + for _, message := range offchainUpdates.Messages { + var update ocutypes.OffChainUpdateV1 + err := proto.Unmarshal(message.Message.Value, &update) + if err != nil { + return nil, err + } + v1updates = append(v1updates, update) + } + return v1updates, nil } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index b4670ba66f..467b18864b 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -21,10 +21,9 @@ func (sm *NoopGrpcStreamingManager) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, srv clobtypes.Query_StreamOrderbookUpdatesServer, ) ( - finished chan bool, err error, ) { - return nil, nil + return clobtypes.ErrGrpcStreamingManagerNotEnabled } func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 7bf043c41d..49882209a6 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -12,7 +12,6 @@ type GrpcStreamingManager interface { req clobtypes.StreamOrderbookUpdatesRequest, srv clobtypes.Query_StreamOrderbookUpdatesServer, ) ( - finished chan bool, err error, ) SendOrderbookUpdates(*clobtypes.OffchainUpdates) diff --git a/protocol/x/clob/keeper/grpc_stream_orderbook.go b/protocol/x/clob/keeper/grpc_stream_orderbook.go index 05e65a8690..710a6ceec6 100644 --- a/protocol/x/clob/keeper/grpc_stream_orderbook.go +++ b/protocol/x/clob/keeper/grpc_stream_orderbook.go @@ -8,5 +8,13 @@ func (k Keeper) StreamOrderbookUpdates( req *types.StreamOrderbookUpdatesRequest, stream types.Query_StreamOrderbookUpdatesServer, ) error { + err := k.GetGrpcStreamingManager().Subscribe(*req, stream) + if err != nil { + return err + } + + // Keep this scope alive because once this scope exits - the stream is closed + ctx := stream.Context() + <-ctx.Done() return nil } diff --git a/protocol/x/clob/keeper/orders.go b/protocol/x/clob/keeper/orders.go index 5046c51ff2..2aeb559083 100644 --- a/protocol/x/clob/keeper/orders.go +++ b/protocol/x/clob/keeper/orders.go @@ -1270,6 +1270,8 @@ func (k Keeper) SendOffchainMessages( } k.GetIndexerEventManager().SendOffchainData(update) } + + k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates) } // getPessimisticCollateralCheckPrice returns the price in subticks we should use for collateralization checks. diff --git a/protocol/x/clob/types/errors.go b/protocol/x/clob/types/errors.go index a9396f1026..edfd850e22 100644 --- a/protocol/x/clob/types/errors.go +++ b/protocol/x/clob/types/errors.go @@ -507,4 +507,16 @@ var ( 10001, "Subaccount cannot open more orders due to equity tier limit.", ) + + // GrpcStreamingManager errors. + ErrGrpcStreamingManagerNotEnabled = errorsmod.Register( + ModuleName, + 11000, + "GrpcStreamingManager is not enabled", + ) + ErrInvalidGrpcStreamingRequest = errorsmod.Register( + ModuleName, + 11001, + "Invalid gRPC streaming request", + ) )