Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-647] construct the initial orderbook snapshot #1147

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 29 additions & 2 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -22,8 +23,14 @@ type GrpcStreamingManagerImpl struct {

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
// Initialize the subscription with orderbook snapshots.
initialize sync.Once

// Clob pair ids to subscribe to.
clobPairIds []uint32
srv clobtypes.Query_StreamOrderbookUpdatesServer

// Stream
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
Expand Down Expand Up @@ -68,6 +75,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update to the SendOrderbookUpdates method to include a snapshot boolean parameter aligns with the interface change and enhances the method's functionality. This allows the method to distinguish between regular updates and initial snapshot updates, which is essential for initializing new streams with the correct state. Ensure that all calls to this method are updated to include the new parameter.

) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -100,7 +108,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: false,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand All @@ -117,6 +125,25 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
sm.Lock()
defer sm.Unlock()

clobPairIds := make(map[uint32]bool)
for _, subscription := range sm.orderbookSubscriptions {
subscription.initialize.Do(
func() {
for _, clobPairId := range subscription.clobPairIds {
clobPairIds[clobPairId] = true
}
},
)
}

return lib.GetSortedKeys[lib.Sortable[uint32]](clobPairIds)
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ func (sm *NoopGrpcStreamingManager) Subscribe(

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}
6 changes: 5 additions & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,9 @@ type GrpcStreamingManager interface {
) (
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
GetUninitializedClobPairIds() []uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the GetUninitializedClobPairIds method to the GrpcStreamingManager interface is a significant change. It implies that all implementations of this interface must now provide a mechanism to retrieve IDs of uninitialized CLOB pairs. This is crucial for ensuring that new streams can be initialized with the correct state. Ensure that all existing implementations are updated accordingly to avoid compilation errors.

SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
)
Comment on lines +18 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification to the SendOrderbookUpdates method to include a snapshot boolean parameter is a thoughtful addition. It allows the method to differentiate between regular updates and snapshot updates, which is essential for initializing new streams with the current state of the orderbook. This change enhances the flexibility and functionality of the gRPC streaming service. However, ensure that all calls to this method throughout the codebase are updated to include the new parameter to maintain compatibility.

}
3 changes: 3 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func PrepareCheckState(
types.GetInternalOperationsQueueTextString(newLocalValidatorOperationsQueue),
)

// Initialize new GRPC streams with orderbook snapshots, if any.
keeper.InitializeNewGrpcStreams(ctx)

// Set per-orderbook gauges.
keeper.MemClob.SetMemclobGauges(ctx)
}
19 changes: 19 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,22 @@ func (k Keeper) InitMemStore(ctx sdk.Context) {
func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) {
k.antehandler = anteHandler
}

// InitializeNewGrpcStreams initializes new gRPC streams for all uninitialized clob pairs
// by sending the corresponding orderbook snapshots.
func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
streamingManager := k.GetGrpcStreamingManager()
allUpdates := types.NewOffchainUpdates()

uninitializedClobPairIds := streamingManager.GetUninitializedClobPairIds()
for _, clobPairId := range uninitializedClobPairIds {
update := k.MemClob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
types.ClobPairId(clobPairId),
)

allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
}
2 changes: 1 addition & 1 deletion protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ func (k Keeper) SendOffchainMessages(
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates)
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of the SendOrderbookUpdates call with the false boolean argument in the SendOffchainMessages function introduces a new behavior for sending off-chain orderbook updates. This change seems to be part of the enhancements for handling off-chain updates more efficiently or in a specific manner. However, it's crucial to ensure that the false argument aligns with the intended behavior, especially regarding how updates are processed or filtered before being sent. If the false value disables a specific processing step or filtering, confirm that this behavior is desired in all scenarios where SendOffchainMessages is called.

Consider verifying the impact of this change on the overall system, especially in scenarios where different behaviors for sending off-chain updates might be required. If the false argument significantly alters the behavior in a way that might not be universally applicable, it might be worth exploring the possibility of making this behavior configurable or ensuring that the function's callers are aware of the implications.

- k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
+ // Ensure the boolean argument passed to SendOrderbookUpdates aligns with the intended behavior.
+ // If the `false` value has specific implications, consider making this behavior configurable or
+ // clearly documenting its effects to avoid unintended consequences.
+ k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, desiredBehavior)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
// Ensure the boolean argument passed to SendOrderbookUpdates aligns with the intended behavior.
// If the `false` value has specific implications, consider making this behavior configurable or
// clearly documenting its effects to avoid unintended consequences.
k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, desiredBehavior)

}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
86 changes: 86 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package memclob

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

// GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot.
// This is used by the gRPC streaming server to send the orderbook snapshot to the client.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
ctx sdk.Context,
clobPairId types.ClobPairId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()

if orderbook, exists := m.openOrders.orderbooksMap[clobPairId]; exists {
// Generate the offchain updates for buy orders.
// Updates are sorted in descending order of price.
buyPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Bids)
for i := len(buyPriceLevels) - 1; i >= 0; i-- {
subticks := buyPriceLevels[i]
level := orderbook.Bids[subticks]

// For each price level, generate offchain updates for each order in the level.
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
)
},
)
}

// Generate the offchain updates for sell orders.
// Updates are sorted in ascending order of price.
sellPriceLevels := lib.GetSortedKeys[lib.Sortable[types.Subticks]](orderbook.Asks)
for i := 0; i < len(sellPriceLevels); i++ {
subticks := sellPriceLevels[i]
level := orderbook.Asks[subticks]

// For each price level, generate offchain updates for each order in the level.
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
)
},
)
}
}

return offchainUpdates
}
Comment on lines +10 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GetOffchainUpdatesForOrderbookSnapshot method efficiently generates off-chain updates for both buy and sell orders in the orderbook. A few observations and suggestions:

  • Correctness & Performance: The method correctly checks for the existence of the orderbook for the given clobPairId and iterates over buy and sell orders to generate off-chain updates. The use of lib.GetSortedKeys to sort price levels is a good practice for ensuring the updates are in the correct order (descending for buys, ascending for sells).

  • Maintainability: The code is well-structured and follows a clear logic flow, making it easy to understand and maintain. However, consider extracting the logic for generating off-chain updates for buy and sell orders into separate private methods to reduce the method's length and improve readability.

  • Error Handling: Currently, the method does not handle or return any errors. Ensure that all external calls, especially those that might fail (e.g., m.GetOffchainUpdatesForOrder), are properly handled. Consider modifying the method signature to return an error as well and handle these gracefully.

  • Testing: Ensure comprehensive tests are written to cover various scenarios, including cases where the orderbook does not exist for the given clobPairId, and cases with different combinations of buy and sell orders.

Consider refactoring to improve readability and error handling.


// GetOffchainUpdatesForClobOrder returns the offchain updates for the clob order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: returns a place order offchain message and a update order offchain message used to construct an order for the orderbook snapshot grpc stream.

func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(
ctx sdk.Context,
order types.Order,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()
orderId := order.OrderId

// Generate a order place message.
if message, success := off_chain_updates.CreateOrderPlaceMessage(
ctx,
order,
); success {
offchainUpdates.AddPlaceMessage(orderId, message)
}

// Get the current fill amount of the order.
fillAmount := m.GetOrderFilledAmount(ctx, orderId)

// Generate an update message updating the total filled amount of order.
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
fillAmount,
); success {
offchainUpdates.AddUpdateMessage(orderId, message)
}

return offchainUpdates
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GetOffchainUpdatesForOrder method is well-implemented, focusing on generating off-chain updates for a single order. Key points:

  • Correctness: The method correctly generates a place message and an update message for the order, using the off_chain_updates.CreateOrderPlaceMessage and off_chain_updates.CreateOrderUpdateMessage functions, respectively.

  • Error Handling & Robustness: Similar to the previous method, this method does not handle potential errors from external calls. It's crucial to handle these scenarios to avoid partial updates or inconsistencies in the off-chain updates.

  • Performance: The method is efficient for generating updates for a single order. However, if used in a loop for many orders, consider the overall performance impact, especially regarding the repeated calls to m.GetOrderFilledAmount.

Recommend adding error handling for external calls and evaluating the performance impact when used for multiple orders.

91 changes: 91 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package memclob

import (
"testing"

"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
sdktest "github.com/dydxprotocol/v4-chain/protocol/testutil/sdk"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {
ctx, _, _ := sdktest.NewSdkContextWithMultistore()

clobKeeper := &mocks.MemClobKeeper{}
clobKeeper.On(
"GetOrderFillAmount",
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)

memclob.CreateOrderbook(ctx, constants.ClobPair_Btc)

orders := []types.Order{
constants.Order_Alice_Num0_Id1_Clob0_Buy15_Price10_GTB18_PO,
constants.Order_Alice_Num0_Id0_Clob0_Buy10_Price10_GTB16,
constants.Order_Bob_Num0_Id12_Clob0_Buy5_Price40_GTB20,
}

for _, order := range orders {
memclob.mustAddOrderToOrderbook(ctx, order, false)
}

offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
constants.ClobPair_Btc.GetClobPairId(),
)

expected := types.NewOffchainUpdates()
// Buy orders are in descending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))

require.Equal(t, expected, offchainUpdates)
}

func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {
ctx, _, _ := sdktest.NewSdkContextWithMultistore()

clobKeeper := &mocks.MemClobKeeper{}
clobKeeper.On(
"GetOrderFillAmount",
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)

memclob.CreateOrderbook(ctx, constants.ClobPair_Btc)

orders := []types.Order{
constants.Order_Bob_Num0_Id12_Clob0_Sell20_Price35_GTB32,
constants.Order_Alice_Num0_Id0_Clob0_Sell5_Price10_GTB20,
constants.Order_Alice_Num0_Id1_Clob0_Sell15_Price10_GTB18_PO,
}

for _, order := range orders {
memclob.mustAddOrderToOrderbook(ctx, order, false)
}

offchainUpdates := memclob.GetOffchainUpdatesForOrderbookSnapshot(
ctx,
constants.ClobPair_Btc.GetClobPairId(),
)

expected := types.NewOffchainUpdates()
// Sell orders are in ascending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))

require.Equal(t, expected, offchainUpdates)
}
1 change: 1 addition & 0 deletions protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ type ClobKeeper interface {
clobPair ClobPair,
) error
UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error
InitializeNewGrpcStreams(ctx sdk.Context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of the InitializeNewGrpcStreams method to the ClobKeeper interface is a significant change. It implies that all implementations of this interface must now provide a mechanism to initialize new gRPC streams, likely for uninitialized CLOB pairs. This is crucial for ensuring that the system can dynamically handle new streams and maintain an up-to-date state. Ensure that all existing implementations are updated accordingly to avoid compilation errors.

}
4 changes: 4 additions & 0 deletions protocol/x/clob/types/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ type MemClob interface {
subaccountId satypes.SubaccountId,
perpetualId uint32,
)
GetOffchainUpdatesForOrderbookSnapshot(
ctx sdk.Context,
clobPairId ClobPairId,
) (offchainUpdates *OffchainUpdates)
}
Loading