Skip to content

Commit

Permalink
[CT-646] stream offchain updates through stream manager (#1138)
Browse files Browse the repository at this point in the history
* [CT-646] stream offchain updates through stream manager

* comments

* fix lint

* get rid of finished

* comments

* comments
  • Loading branch information
jayy04 committed Mar 15, 2024
1 parent 936ae97 commit ea73729
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 12 deletions.
9 changes: 4 additions & 5 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,9 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand Down Expand Up @@ -1610,8 +1612,5 @@ func getGrpcStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
if appFlags.NonValidatingFullNode {
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
return streaming.NewGrpcStreamingManager()
}
102 changes: 98 additions & 4 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package grpc

import (
"sync"

"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -9,10 +13,23 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
sync.Mutex

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
clobPairIds []uint32
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{}
return &GrpcStreamingManagerImpl{
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
}
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
Expand All @@ -24,15 +41,92 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
clobPairIds := req.GetClobPairId()

// Perform some basic validation on the request.
if len(clobPairIds) == 0 {
return clobtypes.ErrInvalidGrpcStreamingRequest
}

subscription := &OrderbookSubscription{
clobPairIds: clobPairIds,
srv: srv,
}

sm.Lock()
defer sm.Unlock()

sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription
sm.nextSubscriptionId++

return nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
offchainUpdates *clobtypes.OffchainUpdates,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}

// Unmarshal messages to v1 updates.
v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for clobPairId, update := range updates {
v1update, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
v1updates[clobPairId] = v1update
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: false,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
}
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, message := range offchainUpdates.Messages {
var update ocutypes.OffChainUpdateV1
err := proto.Unmarshal(message.Message.Value, &update)
if err != nil {
return nil, err
}
v1updates = append(v1updates, update)
}
return v1updates, nil
}
3 changes: 1 addition & 2 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
return clobtypes.ErrGrpcStreamingManagerNotEnabled
}

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
Expand Down
1 change: 0 additions & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type GrpcStreamingManager interface {
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
Expand Down
8 changes: 8 additions & 0 deletions protocol/x/clob/keeper/grpc_stream_orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,13 @@ func (k Keeper) StreamOrderbookUpdates(
req *types.StreamOrderbookUpdatesRequest,
stream types.Query_StreamOrderbookUpdatesServer,
) error {
err := k.GetGrpcStreamingManager().Subscribe(*req, stream)
if err != nil {
return err
}

// Keep this scope alive because once this scope exits - the stream is closed
ctx := stream.Context()
<-ctx.Done()
return nil
}
2 changes: 2 additions & 0 deletions protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,8 @@ func (k Keeper) SendOffchainMessages(
}
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates)
}

// getPessimisticCollateralCheckPrice returns the price in subticks we should use for collateralization checks.
Expand Down
12 changes: 12 additions & 0 deletions protocol/x/clob/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,16 @@ var (
10001,
"Subaccount cannot open more orders due to equity tier limit.",
)

// GrpcStreamingManager errors.
ErrGrpcStreamingManagerNotEnabled = errorsmod.Register(
ModuleName,
11000,
"GrpcStreamingManager is not enabled",
)
ErrInvalidGrpcStreamingRequest = errorsmod.Register(
ModuleName,
11001,
"Invalid gRPC streaming request",
)
)

0 comments on commit ea73729

Please sign in to comment.