Skip to content

Commit

Permalink
[CT-700] separate indexer and grpc streaming events
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 committed Mar 20, 2024
1 parent 9888453 commit 4b1cca9
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 22 deletions.
5 changes: 5 additions & 0 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 11 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,21 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
updatesToSend = append(updatesToSend, updates...)
}
}

if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}

// Clean up subscriptions that have been closed.
Expand Down
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,9 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForStatefulOrder(
func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger()
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
}
10 changes: 9 additions & 1 deletion protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,13 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
k.SendOrderbookUpdates(allUpdates, true)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
func (k Keeper) SendOrderbookUpdates(
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot)
}
2 changes: 0 additions & 2 deletions protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,6 @@ func (k Keeper) SendOffchainMessages(
}
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
27 changes: 27 additions & 0 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,18 @@ func (m *MemClobPriceTimePriority) PurgeInvalidMemclobState(
m.RemoveOrderIfFilled(ctx, orderId)
}

// For order matches in the last block, send an orderbook update
// to the grpc streams.
// Note that fully filled orders are removed from the orderbook in `RemoveOrderIfFilled` above.
allUpdates := types.NewOffchainUpdates()
for _, orderId := range filledOrderIds {
if m.openOrders.hasOrder(ctx, orderId) {
orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
m.clobKeeper.SendOrderbookUpdates(allUpdates, false)

// Remove all canceled stateful order IDs from the memclob if they exist.
// If the slice has non-stateful order IDs or contains duplicates, panic.
if lib.ContainsDuplicates(canceledStatefulOrderIds) {
Expand Down Expand Up @@ -1510,6 +1522,10 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook(
}

m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel)

// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}

// mustPerformTakerOrderMatching performs matching using the provided taker order while the order
Expand Down Expand Up @@ -1944,6 +1960,10 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder(
!m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) {
m.operationsToPropose.RemoveShortTermOrderTxBytes(order)
}

// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}

// mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order.
Expand All @@ -1969,6 +1989,13 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder
m.mustRemoveOrder(ctx, makerOrderId)
}

// If the order was fully filled, an orderbook update for removal was already sent in `mustRemoveOrder`.
// If the order was partially filled, send an orderbook update for the order's new total filled amount.
if newTotalFilledAmount < makerOrderBaseQuantums {
orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}

if m.generateOffchainUpdates {
// Send an off-chain update message to the indexer to update the total filled size of the maker
// order.
Expand Down
48 changes: 44 additions & 4 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package memclob
import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order),
)
},
)
Expand All @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order),
)
},
)
Expand All @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
return offchainUpdates
}

// GetOffchainUpdatesForOrder returns a place order offchain message and
// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and
// a update order offchain message used to construct an order for
// the orderbook snapshot grpc stream.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement(
ctx sdk.Context,
order types.Order,
) (offchainUpdates *types.OffchainUpdates) {
Expand Down Expand Up @@ -86,3 +88,41 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(

return offchainUpdates
}

// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval(
ctx sdk.Context,
orderId types.OrderId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()
if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason(
ctx,
orderId,
indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED,
ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED,
); success {
offchainUpdates.AddRemoveMessage(orderId, message)
}
return offchainUpdates
}

// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate(
ctx sdk.Context,
orderId types.OrderId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()

// Get the current fill amount of the order.
fillAmount := m.GetOrderFilledAmount(ctx, orderId)

// Generate an update message updating the total filled amount of order.
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
fillAmount,
); success {
offchainUpdates.AddUpdateMessage(orderId, message)
}
return offchainUpdates
}
14 changes: 8 additions & 6 deletions protocol/x/clob/memclob/memclob_grpc_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))
clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return()

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)
Expand All @@ -44,9 +45,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {

expected := types.NewOffchainUpdates()
// Buy orders are in descending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1]))

require.Equal(t, expected, offchainUpdates)
}
Expand All @@ -60,6 +61,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))
clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return()

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)
Expand All @@ -83,9 +85,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {

expected := types.NewOffchainUpdates()
// Sell orders are in ascending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0]))

require.Equal(t, expected, offchainUpdates)
}
5 changes: 5 additions & 0 deletions protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,10 @@ type ClobKeeper interface {
clobPair ClobPair,
) error
UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error
// Gprc streaming
InitializeNewGrpcStreams(ctx sdk.Context)
SendOrderbookUpdates(
offchainUpdates *OffchainUpdates,
snapshot bool,
)
}
4 changes: 4 additions & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,8 @@ type MemClobKeeper interface {
Logger(
ctx sdk.Context,
) log.Logger
SendOrderbookUpdates(
offchainUpdates *OffchainUpdates,
snapshot bool,
)
}

0 comments on commit 4b1cca9

Please sign in to comment.