Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-647] construct the initial orderbook snapshot #1147

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 29 additions & 2 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -22,8 +23,14 @@ type GrpcStreamingManagerImpl struct {

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
// Initialize the subscription with orderbook snapshots.
initialize sync.Once

// Clob pair ids to subscribe to.
clobPairIds []uint32
srv clobtypes.Query_StreamOrderbookUpdatesServer

// Stream
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
Expand Down Expand Up @@ -68,6 +75,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -100,7 +108,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: false,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand All @@ -117,6 +125,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
sm.Lock()
defer sm.Unlock()

clobPairIds := make(map[uint32]bool)
for _, subscription := range sm.orderbookSubscriptions {
subscription.initialize.Do(
func() {
for _, clobPairId := range subscription.clobPairIds {
clobPairIds[clobPairId] = true
}
},
)
}

return lib.GetSortedKeys[lib.Sortable[uint32]](clobPairIds)
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ func (sm *NoopGrpcStreamingManager) Subscribe(

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}
6 changes: 5 additions & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,9 @@ type GrpcStreamingManager interface {
) (
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
GetUninitializedClobPairIds() []uint32
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
)
}
3 changes: 3 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func PrepareCheckState(
types.GetInternalOperationsQueueTextString(newLocalValidatorOperationsQueue),
)

// Initialize new GRPC streams with orderbook snapshots, if any.
keeper.InitializeNewGrpcStreams(ctx)

// Set per-orderbook gauges.
keeper.MemClob.SetMemclobGauges(ctx)
}
19 changes: 19 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,22 @@ func (k Keeper) InitMemStore(ctx sdk.Context) {
func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) {
k.antehandler = anteHandler
}

// InitializeNewGrpcStreams initializes new gRPC streams for all uninitialized clob pairs
// by sending the corresponding orderbook snapshots.
func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
streamingManager := k.GetGrpcStreamingManager()
allUpdates := types.NewOffchainUpdates()

uninitializedClobPairIds := streamingManager.GetUninitializedClobPairIds()
for _, clobPairId := range uninitializedClobPairIds {
update := k.MemClob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
types.ClobPairId(clobPairId),
)

allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
}
2 changes: 1 addition & 1 deletion protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ func (k Keeper) SendOffchainMessages(
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates)
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of the SendOrderbookUpdates call with the false boolean argument in the SendOffchainMessages function introduces a new behavior for sending off-chain orderbook updates. This change seems to be part of the enhancements for handling off-chain updates more efficiently or in a specific manner. However, it's crucial to ensure that the false argument aligns with the intended behavior, especially regarding how updates are processed or filtered before being sent. If the false value disables a specific processing step or filtering, confirm that this behavior is desired in all scenarios where SendOffchainMessages is called.

Consider verifying the impact of this change on the overall system, especially in scenarios where different behaviors for sending off-chain updates might be required. If the false argument significantly alters the behavior in a way that might not be universally applicable, it might be worth exploring the possibility of making this behavior configurable or ensuring that the function's callers are aware of the implications.

- k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
+ // Ensure the boolean argument passed to SendOrderbookUpdates aligns with the intended behavior.
+ // If the `false` value has specific implications, consider making this behavior configurable or
+ // clearly documenting its effects to avoid unintended consequences.
+ k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, desiredBehavior)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
// Ensure the boolean argument passed to SendOrderbookUpdates aligns with the intended behavior.
// If the `false` value has specific implications, consider making this behavior configurable or
// clearly documenting its effects to avoid unintended consequences.
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, desiredBehavior)

}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
88 changes: 88 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package memclob

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

// GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot.
// This is used by the gRPC streaming server to send the orderbook snapshot to the client.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
ctx sdk.Context,
clobPairId types.ClobPairId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()

if orderbook, exists := m.openOrders.orderbooksMap[clobPairId]; exists {
// Generate the offchain updates for buy orders.
// Updates are sorted in descending order of price.
buyPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Bids)
for i := len(buyPriceLevels) - 1; i >= 0; i-- {
subticks := buyPriceLevels[i]
level := orderbook.Bids[subticks]

// For each price level, generate offchain updates for each order in the level.
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
)
},
)
}

// Generate the offchain updates for sell orders.
// Updates are sorted in ascending order of price.
sellPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Asks)
for i := 0; i < len(sellPriceLevels); i++ {
subticks := sellPriceLevels[i]
level := orderbook.Asks[subticks]

// For each price level, generate offchain updates for each order in the level.
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
)
},
)
}
}

return offchainUpdates
}
Comment on lines +10 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GetOffchainUpdatesForOrderbookSnapshot method efficiently generates off-chain updates for both buy and sell orders in the orderbook. A few observations and suggestions:

  • Correctness & Performance: The method correctly checks for the existence of the orderbook for the given clobPairId and iterates over buy and sell orders to generate off-chain updates. The use of lib.GetSortedKeys to sort price levels is a good practice for ensuring the updates are in the correct order (descending for buys, ascending for sells).

  • Maintainability: The code is well-structured and follows a clear logic flow, making it easy to understand and maintain. However, consider extracting the logic for generating off-chain updates for buy and sell orders into separate private methods to reduce the method's length and improve readability.

  • Error Handling: Currently, the method does not handle or return any errors. Ensure that all external calls, especially those that might fail (e.g., m.GetOffchainUpdatesForOrder), are properly handled. Consider modifying the method signature to return an error as well and handle these gracefully.

  • Testing: Ensure comprehensive tests are written to cover various scenarios, including cases where the orderbook does not exist for the given clobPairId, and cases with different combinations of buy and sell orders.

Consider refactoring to improve readability and error handling.


// GetOffchainUpdatesForOrder returns a place order offchain message and
// a update order offchain message used to construct an order for
// the orderbook snapshot grpc stream.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(
ctx sdk.Context,
order types.Order,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()
orderId := order.OrderId

// Generate a order place message.
if message, success := off_chain_updates.CreateOrderPlaceMessage(
ctx,
order,
); success {
offchainUpdates.AddPlaceMessage(orderId, message)
}

// Get the current fill amount of the order.
fillAmount := m.GetOrderFilledAmount(ctx, orderId)

// Generate an update message updating the total filled amount of order.
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
fillAmount,
); success {
offchainUpdates.AddUpdateMessage(orderId, message)
}

return offchainUpdates
}
91 changes: 91 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package memclob

import (
"testing"

"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
sdktest "github.com/dydxprotocol/v4-chain/protocol/testutil/sdk"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {
ctx, _, _ := sdktest.NewSdkContextWithMultistore()

clobKeeper := &mocks.MemClobKeeper{}
clobKeeper.On(
"GetOrderFillAmount",
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)

memclob.CreateOrderbook(ctx, constants.ClobPair_Btc)

orders := []types.Order{
constants.Order_Alice_Num0_Id1_Clob0_Buy15_Price10_GTB18_PO,
constants.Order_Alice_Num0_Id0_Clob0_Buy10_Price10_GTB16,
constants.Order_Bob_Num0_Id12_Clob0_Buy5_Price40_GTB20,
}

for _, order := range orders {
memclob.mustAddOrderToOrderbook(ctx, order, false)
}

offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
constants.ClobPair_Btc.GetClobPairId(),
)

expected := types.NewOffchainUpdates()
// Buy orders are in descending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))

require.Equal(t, expected, offchainUpdates)
}

func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {
ctx, _, _ := sdktest.NewSdkContextWithMultistore()

clobKeeper := &mocks.MemClobKeeper{}
clobKeeper.On(
"GetOrderFillAmount",
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)

memclob.CreateOrderbook(ctx, constants.ClobPair_Btc)

orders := []types.Order{
constants.Order_Bob_Num0_Id12_Clob0_Sell20_Price35_GTB32,
constants.Order_Alice_Num0_Id0_Clob0_Sell5_Price10_GTB20,
constants.Order_Alice_Num0_Id1_Clob0_Sell15_Price10_GTB18_PO,
}

for _, order := range orders {
memclob.mustAddOrderToOrderbook(ctx, order, false)
}

offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
constants.ClobPair_Btc.GetClobPairId(),
)

expected := types.NewOffchainUpdates()
// Sell orders are in ascending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))

require.Equal(t, expected, offchainUpdates)
}
1 change: 1 addition & 0 deletions protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ type ClobKeeper interface {
clobPair ClobPair,
) error
UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error
InitializeNewGrpcStreams(ctx sdk.Context)
}
4 changes: 4 additions & 0 deletions protocol/x/clob/types/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ type MemClob interface {
subaccountId satypes.SubaccountId,
perpetualId uint32,
)
GetOffchainUpdatesForOrderbookSnapshot(
ctx sdk.Context,
clobPairId ClobPairId,
) (offchainUpdates *OffchainUpdates)
}
Loading