Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1326] send price updates after block is finalized #2611

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ func New(
},
app.RevShareKeeper,
&app.MarketMapKeeper,
app.FullNodeStreamingManager,
)
pricesModule := pricesmodule.NewAppModule(
appCodec,
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count"
GrpcSendPriceUpdateCount = "grpc_send_price_update_count"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

The metric is not being incremented when sending price updates

The GrpcSendPriceUpdateCount metric is defined but not being used in the codebase. The SendPriceUpdate method in FullNodeStreamingManagerImpl should increment this metric similar to how GrpcSendSubaccountUpdateCount is incremented in other streaming operations.

  • protocol/streaming/full_node_streaming_manager.go: Add metric increment in SendPriceUpdate method
🔗 Analysis chain

LGTM! The metric key follows established guidelines.

The new metric constant GrpcSendPriceUpdateCount is well-placed in the gRPC metrics section and follows the naming conventions. Its purpose aligns with the PR's objective of sending price updates after block finalization.

Let's verify the metric's usage in the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the new metric is properly utilized in the price update functionality
# Expected: The metric should be used in the streaming manager's price update logic

# Search for metric usage
rg "GrpcSendPriceUpdateCount" --type go

# Search for related price update functionality
rg "SendPriceUpdate.*metrics\." --type go

Length of output: 199


Script:

#!/bin/bash
# Let's broaden our search to find price update-related functionality and metrics
# Search for price update-related code
rg "PriceUpdate" --type go -B 2 -A 2

# Search for any metrics increment in gRPC-related files
fd "grpc" --type f --extension go --exec rg -l "metrics\.(Inc|Add)"

Length of output: 116379


Script:

#!/bin/bash
# Let's search for the usage of the new metric in gRPC-related code
rg "GrpcSendPriceUpdateCount" --type go -B 2 -A 2

# Search for price update streaming functionality in gRPC server code
fd "grpc" --type f --extension go --exec rg -l "SendPriceUpdate"

# Search for metrics usage in streaming manager
rg "metrics\." --type go -B 2 -A 2 ./protocol/streaming/

Length of output: 9331


Script:

#!/bin/bash
# Let's search for the price update streaming implementation in the streaming manager
rg "SendPriceUpdate" --type go -B 3 -A 3

# Search for price update related code in the streaming manager
rg "PriceUpdate" --type go -B 3 -A 3 ./protocol/streaming/

Length of output: 17108

GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
Expand Down
95 changes: 92 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
err error,
) {
// Perform some basic validation on the request.
if len(clobPairIds) == 0 && len(subaccountIds) == 0 {
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
return types.ErrInvalidStreamingRequest
}

Expand Down Expand Up @@ -493,6 +493,33 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
)
}

// SendPriceUpdates sends price updates to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendPriceUpdate(
ctx sdk.Context,
priceUpdate pricestypes.StreamPriceUpdate,
) {
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}

metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update metric name

1,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect metrics counter used in SendPriceUpdate

In the SendPriceUpdate method, the metrics counter GrpcSendSubaccountUpdateCount is incremented, which is intended for subaccount updates. A dedicated metrics counter for price updates should be used instead.

Apply this diff to fix the metrics counter:

metrics.IncrCounter(
-	metrics.GrpcSendSubaccountUpdateCount,
+	metrics.GrpcSendPriceUpdateCount,
	1,
)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}
metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
1,
)
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}
metrics.IncrCounter(
metrics.GrpcSendPriceUpdateCount,
1,
)


// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_PriceUpdate{
PriceUpdate: &priceUpdate,
},
}
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
&stagedEvent,
)
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
Expand Down Expand Up @@ -545,6 +572,14 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
return exists
}

// TracksMarketId checks if a market id is being tracked by the streaming manager.
func (sm *FullNodeStreamingManagerImpl) TracksMarketId(marketId uint32) bool {
sm.Lock()
defer sm.Unlock()
_, exists := sm.marketIdToSubscriptionIdMapping[marketId]
return exists
}

func getStreamUpdatesFromOffchainUpdates(
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
Expand Down Expand Up @@ -773,6 +808,31 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

func getStreamUpdatesForPriceUpdates(
priceUpdates []pricestypes.StreamPriceUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) (
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
// Group subaccount updates by subaccount id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
marketIds = make([]uint32, 0)
for _, priceUpdate := range priceUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_PriceUpdate{
PriceUpdate: &priceUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
marketIds = append(marketIds, priceUpdate.MarketId)
}
return streamUpdates, marketIds
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand Down Expand Up @@ -976,6 +1036,23 @@ func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesBySubaccountWithLock(
}
}

// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...)
for _, marketId := range marketIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential misalignment between stream updates and subscription IDs

In cacheStreamUpdatesByMarketIdWithLock, there may be a misalignment between streamUpdates and marketIds. This can lead to incorrect mapping of updates to subscription IDs if the lengths of streamUpdates and marketIds differ or if they are not aligned.

To ensure correct mapping, each update should correspond to its related market ID:

func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
	streamUpdates []clobtypes.StreamUpdate,
	marketIds []uint32,
) {
+	if len(streamUpdates) != len(marketIds) {
+		sm.logger.Error("Mismatch between stream updates and market IDs lengths")
+		return
+	}
	for i, update := range streamUpdates {
		sm.streamUpdateCache = append(sm.streamUpdateCache, update)
		marketId := marketIds[i]
		sm.streamUpdateSubscriptionCache = append(
			sm.streamUpdateSubscriptionCache,
			sm.marketIdToSubscriptionIdMapping[marketId],
		)
	}
}

This ensures that each update is matched with the correct market ID and its associated subscriptions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...)
for _, marketId := range marketIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
if len(streamUpdates) != len(marketIds) {
sm.logger.Error("Mismatch between stream updates and market IDs lengths")
return
}
for i, update := range streamUpdates {
sm.streamUpdateCache = append(sm.streamUpdateCache, update)
marketId := marketIds[i]
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems helpful as a sanity check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@teddyding, yes, adding this check is helpful as a sanity check to ensure that the lengths of streamUpdates and marketIds are aligned. This can prevent incorrect mapping of updates to subscription IDs if they differ or are not aligned. Here's the updated function with the check:

func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
	streamUpdates []clobtypes.StreamUpdate,
	marketIds []uint32,
) {
	if len(streamUpdates) != len(marketIds) {
		sm.logger.Error("Mismatch between stream updates and market IDs lengths")
		return
	}
	for i, update := range streamUpdates {
		sm.streamUpdateCache = append(sm.streamUpdateCache, update)
		marketId := marketIds[i]
		sm.streamUpdateSubscriptionCache = append(
			sm.streamUpdateSubscriptionCache,
			sm.marketIdToSubscriptionIdMapping[marketId],
		)
	}
}

This ensures that each update is correctly associated with its corresponding market ID and subscriptions.

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
Expand All @@ -989,7 +1066,8 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(

finalizedFills,
finalizedSubaccountUpdates,
finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)
finalizedOrderbookUpdates,
finalizedPriceUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

sm.Lock()
defer sm.Unlock()
Expand Down Expand Up @@ -1032,6 +1110,14 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
)
sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds)

// Finally, cache updates for finalized subaccount updates
priceStreamUpdates, marketIds := getStreamUpdatesForPriceUpdates(
finalizedPriceUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByMarketIdWithLock(priceStreamUpdates, marketIds)

// Emit all stream updates in a single batch.
// Note we still have the lock, which is released right before function returns.
sm.FlushStreamUpdatesWithLock()
Expand All @@ -1045,6 +1131,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
finalizedFills []clobtypes.StreamOrderbookFill,
finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate,
finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate,
finalizedPriceUpdates []pricestypes.StreamPriceUpdate,
) {
// Get onchain stream events stored in transient store.
stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx)
Expand All @@ -1062,6 +1149,8 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate)
case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate:
finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate)
case *clobtypes.StagedFinalizeBlockEvent_PriceUpdate:
finalizedPriceUpdates = append(finalizedPriceUpdates, *event.PriceUpdate)
default:
panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event))
}
Expand All @@ -1076,7 +1165,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
float32(len(finalizedFills)),
)

return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates
return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates, finalizedPriceUpdates
}

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
Expand Down
10 changes: 10 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId)
return false
}

func (sm *NoopGrpcStreamingManager) TracksMarketId(id uint32) bool {
return false
}

func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate {
Expand Down Expand Up @@ -90,6 +94,12 @@ func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate(
) {
}

func (sm *NoopGrpcStreamingManager) SendPriceUpdate(
ctx sdk.Context,
priceUpdate pricestypes.StreamPriceUpdate,
) {
}

func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ type FullNodeStreamingManager interface {
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
)
SendPriceUpdate(
ctx sdk.Context,
priceUpdate pricestypes.StreamPriceUpdate,
)
GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent
TracksSubaccountId(id satypes.SubaccountId) bool
TracksMarketId(marketId uint32) bool
StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
Expand Down
2 changes: 2 additions & 0 deletions protocol/testutil/keeper/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

streaming "github.com/dydxprotocol/v4-chain/protocol/streaming"
revsharetypes "github.com/dydxprotocol/v4-chain/protocol/x/revshare/types"

"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -124,6 +125,7 @@ func createPricesKeeper(
},
revShareKeeper,
marketMapKeeper,
streaming.NewNoopGrpcStreamingManager(),
)

return k, storeKey, indexPriceCache, mockTimeProvider
Expand Down
9 changes: 9 additions & 0 deletions protocol/x/prices/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types"
"github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
)

Expand All @@ -24,6 +25,8 @@ type (
authorities map[string]struct{}
RevShareKeeper types.RevShareKeeper
MarketMapKeeper types.MarketMapKeeper

streamingManager streamingtypes.FullNodeStreamingManager
}
)

Expand All @@ -38,6 +41,7 @@ func NewKeeper(
authorities []string,
revShareKeeper types.RevShareKeeper,
marketMapKeeper types.MarketMapKeeper,
streamingManager streamingtypes.FullNodeStreamingManager,
) *Keeper {
return &Keeper{
cdc: cdc,
Expand All @@ -48,6 +52,7 @@ func NewKeeper(
authorities: lib.UniqueSliceToSet(authorities),
RevShareKeeper: revShareKeeper,
MarketMapKeeper: marketMapKeeper,
streamingManager: streamingManager,
}
}

Expand All @@ -66,3 +71,7 @@ func (k Keeper) HasAuthority(authority string) bool {
_, ok := k.authorities[authority]
return ok
}

func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}
14 changes: 14 additions & 0 deletions protocol/x/prices/keeper/market_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ func (k Keeper) UpdateMarketPrices(
pricefeedmetrics.GetLabelForMarketId(marketPrice.Id),
},
)

// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
)
}
}
Comment on lines +100 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for SendPriceUpdate

The SendPriceUpdate call lacks error handling. If the streaming operation fails, we should at least log the error to maintain observability.

Consider applying this change:

-				k.GetFullNodeStreamingManager().SendPriceUpdate(
-					ctx,
-					types.StreamPriceUpdate{
-						MarketId: marketPrice.Id,
-						Price:    marketPrice,
-						Snapshot: false,
-					},
-				)
+				if err := k.GetFullNodeStreamingManager().SendPriceUpdate(
+					ctx,
+					types.StreamPriceUpdate{
+						MarketId: marketPrice.Id,
+						Price:    marketPrice,
+						Snapshot: false,
+					},
+				); err != nil {
+					k.Logger(ctx).Error(
+						"failed to send price update stream",
+						"market_id", marketPrice.Id,
+						"error", err,
+					)
+				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
)
}
}
// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
if err := k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
); err != nil {
k.Logger(ctx).Error(
"failed to send price update stream",
"market_id", marketPrice.Id,
"error", err,
)
}
}
}

}

// Generate indexer events.
Expand Down
Loading