Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1326] send price updates after block is finalized #2611

Merged
merged 2 commits into from
Nov 26, 2024
Merged

Conversation

jayy04
Copy link
Contributor

@jayy04 jayy04 commented Nov 25, 2024

Changelist

[Describe or list the changes made in this PR]

Test Plan

[Describe how this PR was tested (if applicable)]

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced full node streaming capabilities, enhancing real-time data handling.
    • Added support for sending price updates through the streaming manager.
    • Improved subscription management with stricter validation requirements.
    • Expanded metrics to include a new key for tracking gRPC price update counts.
  • Bug Fixes

    • Enhanced error handling during initialization and market price retrieval.

These changes significantly improve the application's functionality and user experience by enabling better data streaming and management.

@jayy04 jayy04 requested a review from a team as a code owner November 25, 2024 22:11
Copy link

linear bot commented Nov 25, 2024

Copy link
Contributor

coderabbitai bot commented Nov 25, 2024

Walkthrough

The pull request introduces significant enhancements to the application's streaming capabilities by modifying the App structure, adding the FullNodeStreamingManager, and updating initialization and routing methods. It also introduces new metrics for gRPC operations and enhances the FullNodeStreamingManagerImpl with new methods for subscription management and price updates. The changes extend to the Keeper structure, integrating the streaming manager for price updates, and include updates to related methods across various files to accommodate these functionalities.

Changes

File Path Change Summary
protocol/app/app.go Added FullNodeStreamingManager to App struct; updated New, RegisterAPIRoutes, and init functions. Enhanced error handling.
protocol/lib/metrics/metric_keys.go Added constant GrpcSendPriceUpdateCount for gRPC metrics.
protocol/streaming/full_node_streaming_manager.go Enhanced Subscribe method for validation; added SendPriceUpdate, TracksMarketId, and updated event handling methods.
protocol/streaming/noop_streaming_manager.go Added TracksMarketId and SendPriceUpdate methods to NoopGrpcStreamingManager.
protocol/streaming/types/interface.go Added SendPriceUpdate and TracksMarketId methods to FullNodeStreamingManager interface.
protocol/testutil/keeper/prices.go Updated createPricesKeeper to include streamingManager parameter.
protocol/x/prices/keeper/keeper.go Added streamingManager field and GetFullNodeStreamingManager method to Keeper struct.
protocol/x/prices/keeper/market_price.go Updated UpdateMarketPrices to send price updates via streaming manager if conditions are met.

Possibly related PRs

Suggested labels

indexer, proto

Suggested reviewers

  • dydxwill
  • teddyding

Poem

🐰 In the meadow where streams flow bright,
New updates hop in, a joyous sight!
With prices dancing, and markets in tune,
Our streaming manager sings a new tune.
So let’s celebrate this coding delight,
As rabbits rejoice in the soft moonlight! 🌙✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (8)
protocol/streaming/types/interface.go (1)

55-58: LGTM! Consider error handling in implementations.

The SendPriceUpdate method signature is clean and consistent with other streaming methods in the interface.

When implementing this method, ensure proper error handling for scenarios such as:

  • Network interruptions during streaming
  • Invalid price update data
  • Context cancellation or timeouts
protocol/x/prices/keeper/keeper.go (2)

44-44: Consider adding nil validation for streamingManager

While the streaming manager is correctly added to the constructor, consider adding nil validation to prevent potential runtime issues.

 func NewKeeper(
   ...
   streamingManager streamingtypes.FullNodeStreamingManager,
 ) *Keeper {
+  if streamingManager == nil {
+    panic("streamingManager cannot be nil")
+  }
   return &Keeper{
     ...
     streamingManager: streamingManager,
   }
 }

Also applies to: 55-55


75-77: Add documentation for the exported method

Consider adding a documentation comment to describe the purpose and usage of this exported method.

+// GetFullNodeStreamingManager returns the streaming manager instance used for price updates.
+// The streaming manager is responsible for broadcasting price updates after block finalization.
 func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
   return k.streamingManager
 }
protocol/x/prices/keeper/market_price.go (1)

101-102: Consider optimizing streaming checks

The streaming manager checks (Enabled() and TracksMarketId()) are performed inside the price update loop. This could be optimized by checking Enabled() once before the loop.

Consider this optimization:

+	// Check if streaming is enabled before processing updates
+	streamingEnabled := k.GetFullNodeStreamingManager().Enabled()

	// Writes to the store are delayed so that the updates are atomically applied to state.
	for _, marketPrice := range updatedMarketPrices {
		// ... existing store update code ...

-		if k.GetFullNodeStreamingManager().Enabled() {
+		if streamingEnabled {
			if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
				// ... streaming code ...
			}
		}
	}
protocol/streaming/full_node_streaming_manager.go (4)

200-201: Consider providing a more descriptive error message

The current error returned is types.ErrInvalidStreamingRequest. It would be more helpful to provide a specific error message indicating that at least one of clobPairIds, subaccountIds, or marketIds must be provided for a valid subscription.

Apply this diff to improve the error message:

func (sm *FullNodeStreamingManagerImpl) Subscribe(
	clobPairIds []uint32,
	subaccountIds []*satypes.SubaccountId,
	marketIds []uint32,
	messageSender types.OutgoingMessageSender,
) (
	err error,
) {
	// Perform some basic validation on the request.
	if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
-		return types.ErrInvalidStreamingRequest
+		return fmt.Errorf("at least one of clobPairIds, subaccountIds, or marketIds must be provided")
	}

496-496: Align method comment with method name

The method comment uses SendPriceUpdates, but the method name is SendPriceUpdate. For clarity and consistency, the comment should match the method name.

Apply this diff to correct the comment:

-// SendPriceUpdates sends price updates to the subscribers.
+// SendPriceUpdate sends a price update to the subscribers.

819-819: Correct the comment to reflect price updates

The comment incorrectly mentions 'subaccount updates' instead of 'price updates'. It should accurately describe the function's purpose.

Apply this diff to correct the comment:

-	// Group subaccount updates by subaccount id.
+	// Group price updates by market ID.

1113-1113: Correct the comment to reflect price updates

The comment refers to 'finalized subaccount updates' but should refer to 'finalized price updates'.

Apply this diff to correct the comment:

-	// Finally, cache updates for finalized subaccount updates
+	// Finally, cache updates for finalized price updates
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 744a58b and 3cf6172.

📒 Files selected for processing (8)
  • protocol/app/app.go (1 hunks)
  • protocol/lib/metrics/metric_keys.go (1 hunks)
  • protocol/streaming/full_node_streaming_manager.go (10 hunks)
  • protocol/streaming/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/types/interface.go (1 hunks)
  • protocol/testutil/keeper/prices.go (2 hunks)
  • protocol/x/prices/keeper/keeper.go (5 hunks)
  • protocol/x/prices/keeper/market_price.go (1 hunks)
🔇 Additional comments (12)
protocol/streaming/types/interface.go (1)

63-63: LGTM! Clean addition to tracking capabilities.

The TracksMarketId method complements the existing tracking functionality and follows a consistent pattern with TracksSubaccountId.

protocol/x/prices/keeper/keeper.go (2)

14-14: LGTM!

The streaming types import is correctly added and follows Go conventions.


28-29: Verify streaming manager usage across the codebase

The streaming manager field is correctly added to the Keeper struct. Let's verify its usage pattern across the codebase to ensure consistent integration.

✅ Verification successful

Based on the search results, I can now provide a final response:

Streaming manager is properly integrated and used across the codebase

The streaming manager field is correctly integrated with consistent usage patterns:

  • Properly initialized in NewKeeper constructors across multiple modules (prices, clob, subaccounts)
  • Accessed through getter methods GetFullNodeStreamingManager() in each keeper
  • Used appropriately in critical flows:
    • CLOB module uses it for streaming orderbook fills and batch updates
    • Proper nil/enabled checks before streaming (if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled())
    • Used in initialization of new streams with proper context
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check how the streaming manager is used in price-related operations
# Expected: Find calls to streamingManager in price update logic

# Look for price update methods that should use streaming manager
ast-grep --pattern 'func $$$UpdatePrice$$$ {
  $$$
  streamingManager$$$
  $$$
}'

# Check for any potential nil checks on streaming manager
rg 'streamingManager\s*[!=]=\s*nil' -t go

Length of output: 127


Script:

#!/bin/bash
# Let's broaden our search to find all usages of streamingManager

# Find all references to streamingManager
rg "streamingManager" -A 3 -B 3 --type go

# Find any method calls on streamingManager
ast-grep --pattern 'k.streamingManager.$_($$$)'

# Look for initialization of streamingManager in NewKeeper
ast-grep --pattern 'func NewKeeper($$$) *Keeper {
  $$$
  streamingManager: $_,
  $$$
}'

Length of output: 20847

protocol/streaming/noop_streaming_manager.go (2)

97-101: LGTM! Verify consistency with actual implementation.

The empty implementation follows the no-op pattern correctly.

Let's verify the actual implementation's signature to ensure this no-op version matches:

✅ Verification successful

Signature matches perfectly with the no-op implementation

The verification confirms that the SendPriceUpdate method signature in the no-op implementation exactly matches the actual implementation in FullNodeStreamingManagerImpl:

  • Both accept identical parameters: ctx sdk.Context and priceUpdate pricestypes.StreamPriceUpdate
  • Both have the same return type (void)

The empty implementation in NoopGrpcStreamingManager correctly follows the interface contract while implementing the no-op pattern.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Compare with the actual implementation's signature
ast-grep --pattern 'func ($_ *FullNodeStreamingManagerImpl) SendPriceUpdate($_, $_) {
  $$$
}'

Length of output: 2085


57-59: LGTM! Verify interface compliance.

The implementation follows the no-op pattern correctly by always returning false.

Let's verify this method is properly defined in the interface:

✅ Verification successful

Implementation matches interface correctly

The TracksMarketId method in NoopGrpcStreamingManager correctly implements the method defined in the FullNodeStreamingManager interface with the exact same signature TracksMarketId(marketId uint32) bool. The no-op implementation returning false is consistent with the pattern used throughout this no-op implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the method signature in the FullNodeStreamingManager interface
ast-grep --pattern 'interface FullNodeStreamingManager {
  $$$
  TracksMarketId($_) $_
  $$$
}'

Length of output: 97


Script:

#!/bin/bash
# Let's try to find the interface definition using ripgrep
rg "type\s+FullNodeStreamingManager\s+interface" -A 10

Length of output: 699


Script:

#!/bin/bash
# Let's get more context of the interface definition
rg "type\s+FullNodeStreamingManager\s+interface" -A 30

Length of output: 2241


Script:

#!/bin/bash
# Let's get even more context to find TracksMarketId method
rg "type\s+FullNodeStreamingManager\s+interface" -A 50

Length of output: 3477


Script:

#!/bin/bash
# Let's get the complete interface definition to find TracksMarketId
rg "type\s+FullNodeStreamingManager\s+interface" -A 100

Length of output: 4387

protocol/x/prices/keeper/market_price.go (1)

99-112: Verify timing of price update streaming

The PR objective mentions sending price updates "after block is finalized", but the current implementation sends updates during block execution in UpdateMarketPrices. Please verify if this meets the requirements or if we need to move this to a post-commit hook.

Let's check how other modules handle post-finalization events:

protocol/testutil/keeper/prices.go (2)

7-7: LGTM: Import statement correctly added

The new import for the streaming package is properly placed and aligns with the PR's objective of implementing price update streaming functionality.


128-128: LGTM: Appropriate use of no-op streaming manager in test utilities

The integration of NewNoopGrpcStreamingManager() is appropriate for test scenarios, as it provides a null implementation that won't interfere with test execution while satisfying the keeper's dependencies.

Let's verify the streaming manager implementation in the actual keeper:

✅ Verification successful

Based on the search results, I can now generate the final response:

LGTM: Appropriate use of no-op streaming manager in test utilities

The use of NewNoopGrpcStreamingManager() in test utilities is correct. The search results confirm that:

  • The actual prices keeper uses a proper streaming manager implementation (protocol/x/prices/keeper/keeper.go)
  • The streaming manager is a crucial component in the production system, used across multiple modules (clob, prices, subaccounts)
  • The no-op implementation is specifically designed for test scenarios where streaming functionality is not needed
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the streaming manager implementation in the prices keeper
# Expected: Find the actual keeper implementation showing how streaming manager is used

# Search for the keeper implementation
ast-grep --pattern 'type Keeper struct {
  $$$
  streamingManager $_
  $$$
}'

# Search for streaming manager usage in price updates
rg -A 5 'streamingManager.*UpdatePrice'

Length of output: 121


Script:

#!/bin/bash
# Let's try a broader search to understand the streaming manager usage

# Search for the Keeper struct definition in prices package
rg -l "type Keeper struct" --type go

# Search for any streaming manager related code
rg -A 5 "streamingManager" --type go

# Search for streaming package imports
rg -l "streaming" --type go

Length of output: 14273

protocol/streaming/full_node_streaming_manager.go (3)

575-581: Method TracksMarketId implemented correctly

The TracksMarketId method correctly checks if a market ID is being tracked by the streaming manager. The use of locking ensures thread safety.


811-834: Function getStreamUpdatesForPriceUpdates is well-implemented

The function correctly creates stream updates for price updates and collects the associated market IDs.


Line range hint 1069-1120: Integration of price updates in batch streaming is correct

The addition of handling for finalizedPriceUpdates in StreamBatchUpdatesAfterFinalizeBlock properly integrates price updates into the streaming flow after block finalization.

protocol/app/app.go (1)

1013-1013: Integration of FullNodeStreamingManager into PricesKeeper looks good

The addition of app.FullNodeStreamingManager to PricesKeeper enables streaming capabilities for price updates. This enhancement appears well-implemented and should facilitate real-time data handling as intended.

Comment on lines +100 to +112
// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for SendPriceUpdate

The SendPriceUpdate call lacks error handling. If the streaming operation fails, we should at least log the error to maintain observability.

Consider applying this change:

-				k.GetFullNodeStreamingManager().SendPriceUpdate(
-					ctx,
-					types.StreamPriceUpdate{
-						MarketId: marketPrice.Id,
-						Price:    marketPrice,
-						Snapshot: false,
-					},
-				)
+				if err := k.GetFullNodeStreamingManager().SendPriceUpdate(
+					ctx,
+					types.StreamPriceUpdate{
+						MarketId: marketPrice.Id,
+						Price:    marketPrice,
+						Snapshot: false,
+					},
+				); err != nil {
+					k.Logger(ctx).Error(
+						"failed to send price update stream",
+						"market_id", marketPrice.Id,
+						"error", err,
+					)
+				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
)
}
}
// If GRPC streaming is on, emit a price update to stream.
if k.GetFullNodeStreamingManager().Enabled() {
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) {
if err := k.GetFullNodeStreamingManager().SendPriceUpdate(
ctx,
types.StreamPriceUpdate{
MarketId: marketPrice.Id,
Price: marketPrice,
Snapshot: false,
},
); err != nil {
k.Logger(ctx).Error(
"failed to send price update stream",
"market_id", marketPrice.Id,
"error", err,
)
}
}
}

@@ -74,6 +74,7 @@ const (
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count"
GrpcSendPriceUpdateCount = "grpc_send_price_update_count"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

The metric is not being incremented when sending price updates

The GrpcSendPriceUpdateCount metric is defined but not being used in the codebase. The SendPriceUpdate method in FullNodeStreamingManagerImpl should increment this metric similar to how GrpcSendSubaccountUpdateCount is incremented in other streaming operations.

  • protocol/streaming/full_node_streaming_manager.go: Add metric increment in SendPriceUpdate method
🔗 Analysis chain

LGTM! The metric key follows established guidelines.

The new metric constant GrpcSendPriceUpdateCount is well-placed in the gRPC metrics section and follows the naming conventions. Its purpose aligns with the PR's objective of sending price updates after block finalization.

Let's verify the metric's usage in the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the new metric is properly utilized in the price update functionality
# Expected: The metric should be used in the streaming manager's price update logic

# Search for metric usage
rg "GrpcSendPriceUpdateCount" --type go

# Search for related price update functionality
rg "SendPriceUpdate.*metrics\." --type go

Length of output: 199


Script:

#!/bin/bash
# Let's broaden our search to find price update-related functionality and metrics
# Search for price update-related code
rg "PriceUpdate" --type go -B 2 -A 2

# Search for any metrics increment in gRPC-related files
fd "grpc" --type f --extension go --exec rg -l "metrics\.(Inc|Add)"

Length of output: 116379


Script:

#!/bin/bash
# Let's search for the usage of the new metric in gRPC-related code
rg "GrpcSendPriceUpdateCount" --type go -B 2 -A 2

# Search for price update streaming functionality in gRPC server code
fd "grpc" --type f --extension go --exec rg -l "SendPriceUpdate"

# Search for metrics usage in streaming manager
rg "metrics\." --type go -B 2 -A 2 ./protocol/streaming/

Length of output: 9331


Script:

#!/bin/bash
# Let's search for the price update streaming implementation in the streaming manager
rg "SendPriceUpdate" --type go -B 3 -A 3

# Search for price update related code in the streaming manager
rg "PriceUpdate" --type go -B 3 -A 3 ./protocol/streaming/

Length of output: 17108

Comment on lines 501 to 509
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}

metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
1,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect metrics counter used in SendPriceUpdate

In the SendPriceUpdate method, the metrics counter GrpcSendSubaccountUpdateCount is incremented, which is intended for subaccount updates. A dedicated metrics counter for price updates should be used instead.

Apply this diff to fix the metrics counter:

metrics.IncrCounter(
-	metrics.GrpcSendSubaccountUpdateCount,
+	metrics.GrpcSendPriceUpdateCount,
	1,
)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}
metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
1,
)
if !lib.IsDeliverTxMode(ctx) {
// If not `DeliverTx`, return since there is no optimistic price updates.
return
}
metrics.IncrCounter(
metrics.GrpcSendPriceUpdateCount,
1,
)

Comment on lines 1039 to 1055
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...)
for _, marketId := range marketIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential misalignment between stream updates and subscription IDs

In cacheStreamUpdatesByMarketIdWithLock, there may be a misalignment between streamUpdates and marketIds. This can lead to incorrect mapping of updates to subscription IDs if the lengths of streamUpdates and marketIds differ or if they are not aligned.

To ensure correct mapping, each update should correspond to its related market ID:

func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
	streamUpdates []clobtypes.StreamUpdate,
	marketIds []uint32,
) {
+	if len(streamUpdates) != len(marketIds) {
+		sm.logger.Error("Mismatch between stream updates and market IDs lengths")
+		return
+	}
	for i, update := range streamUpdates {
		sm.streamUpdateCache = append(sm.streamUpdateCache, update)
		marketId := marketIds[i]
		sm.streamUpdateSubscriptionCache = append(
			sm.streamUpdateSubscriptionCache,
			sm.marketIdToSubscriptionIdMapping[marketId],
		)
	}
}

This ensures that each update is matched with the correct market ID and its associated subscriptions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...)
for _, marketId := range marketIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache,
// and store corresponding market ids.
// This method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
if len(streamUpdates) != len(marketIds) {
sm.logger.Error("Mismatch between stream updates and market IDs lengths")
return
}
for i, update := range streamUpdates {
sm.streamUpdateCache = append(sm.streamUpdateCache, update)
marketId := marketIds[i]
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems helpful as a sanity check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@teddyding, yes, adding this check is helpful as a sanity check to ensure that the lengths of streamUpdates and marketIds are aligned. This can prevent incorrect mapping of updates to subscription IDs if they differ or are not aligned. Here's the updated function with the check:

func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
	streamUpdates []clobtypes.StreamUpdate,
	marketIds []uint32,
) {
	if len(streamUpdates) != len(marketIds) {
		sm.logger.Error("Mismatch between stream updates and market IDs lengths")
		return
	}
	for i, update := range streamUpdates {
		sm.streamUpdateCache = append(sm.streamUpdateCache, update)
		marketId := marketIds[i]
		sm.streamUpdateSubscriptionCache = append(
			sm.streamUpdateSubscriptionCache,
			sm.marketIdToSubscriptionIdMapping[marketId],
		)
	}
}

This ensures that each update is correctly associated with its corresponding market ID and subscriptions.

}

metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update metric name

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
protocol/streaming/full_node_streaming_manager.go (2)

819-820: Fix incorrect comment.

The comment refers to subaccount updates but this is for price updates.

-	// Group subaccount updates by subaccount id.
+	// Group price updates by market id.

820-822: Consider pre-allocating slices for better performance.

Pre-allocate slices with the known size to avoid reallocations.

-	streamUpdates = make([]clobtypes.StreamUpdate, 0)
-	marketIds = make([]uint32, 0)
+	streamUpdates = make([]clobtypes.StreamUpdate, 0, len(priceUpdates))
+	marketIds = make([]uint32, 0, len(priceUpdates))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3cf6172 and b3cd32f.

📒 Files selected for processing (1)
  • protocol/streaming/full_node_streaming_manager.go (10 hunks)
🔇 Additional comments (4)
protocol/streaming/full_node_streaming_manager.go (4)

496-521: LGTM: Price update streaming implementation is correct.

The implementation follows the established pattern for streaming updates and correctly:

  • Checks for DeliverTx mode
  • Uses the appropriate metrics counter
  • Stages events for finalization

575-581: LGTM: Market ID tracking implementation is consistent.

The implementation follows the same pattern as TracksSubaccountId with proper lock handling.


1039-1058: LGTM: Market ID caching implementation is robust.

The implementation includes proper validation and follows the established caching pattern.


1073-1074: LGTM: Price updates correctly integrated into batch processing.

The changes properly integrate price updates into the finalization and batch processing flow, maintaining consistency with the existing patterns for other update types.

Also applies to: 1117-1124, 1138-1138, 1156-1157, 1172-1172

@jayy04 jayy04 merged commit 115acf7 into main Nov 26, 2024
22 checks passed
@jayy04 jayy04 deleted the jy/ct-1326 branch November 26, 2024 21:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

2 participants