-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FNS subaccount implementation #2059
Conversation
func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( | ||
updates []clobtypes.StreamUpdate, | ||
subaccountIds []*satypes.SubaccountId, | ||
numUpdatesToAdd uint32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need this field, can just use len(updates)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// AddUpdatesToCache adds a series of updates to the full node streaming cache. | ||
// SendSubaccountUpdates groups subaccount updates by their subaccount ids and | ||
// sends messages to the subscribers. | ||
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdates( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just do SendSubaccountUpdate
and not take in a series of subaccount updates. It doesn't quite make sense to have a couple subaccount updates in a row for the same subaccount id since we would only take the last one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to add this as a TODO for now. The current code replicates what's done by Indexer & the client code should cover the aggregation. Created https://linear.app/dydx/issue/CT-1117/aggregate-subaccount-update-per-subaccount-id to track
// Group subaccount updates by subaccount id. | ||
streamUpdates := make([]clobtypes.StreamUpdate, 0) | ||
subaccountIds := make([]*satypes.SubaccountId, 0) | ||
for _, subaccountUpdate := range subaccountUpdates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could eliminate the grouping logic by making it take in one subaccount update, same as above comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamUpdate is 1:1 with SubaccountUpdate so we will still need to send multiple StreamUpdates. But yes, if we implement above (added a todo for now), we will only have 1 SubaccountUpdate per subaccountid.
} | ||
|
||
// Remove all subscriptions and wipe the buffer if buffer overflows. | ||
if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
factor this common code out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
defer metrics.ModuleMeasureSince( | ||
metrics.FullNodeGrpc, | ||
metrics.GrpcSendSubaccountUpdatesLatency, | ||
time.Now(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the datadog dashboards with subaccount metrics? would be good to know what % of the messages in our update cache are subaccounts, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -544,6 +718,14 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( | |||
} | |||
|
|||
sm.SendSnapshot(allUpdates, subscriptionId, blockHeight, execMode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have one stream, can we combine everything in just one snapshot function? Users might be confused they get snapshot messages in two chunks, even though they are for subaccounts and orderbooks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -262,6 +263,33 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) { | |||
clobPairId, | |||
) | |||
}, | |||
func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate { | |||
subaccount := k.subaccountsKeeper.GetSubaccount( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might need to use GetSettledSubaccount
to account for funding rates. Depends on what research wants us to emit. Whatever we do, should make sure all our subaccount emissions are settled or non settled.
@jayy04 any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the subaccount snapshot code. We are accounting for funding rates during the updates.
@@ -369,6 +411,17 @@ func (k Keeper) UpdateSubaccounts( | |||
), | |||
) | |||
|
|||
// if GRPC streaming is on, emit a generated subaccount update to stream. | |||
if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit for later: we might want to early return if fns doesn't have a current subscription to this subaccount id. This is definitely a hot path in our code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can't have that outside the FNS manager due to accessing this without the lock. Think we need to define:
def streamingManager.CheckIfSubaccountRegistered :
lock.acquire()
defer lock.release
// check subaccount to subscirption id mapping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, i see this function GetSubaccountIdToSubscriptionIdMapping
acquires the lock and returns the map -- I don't think we should expose the pointer to the map in case it is modified somewhere without the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, created TracksSubaccountId
function instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (4)
- protocol/streaming/full_node_streaming_manager.go (17 hunks)
- protocol/streaming/noop_streaming_manager.go (3 hunks)
- protocol/streaming/types/interface.go (4 hunks)
- protocol/x/subaccounts/keeper/subaccount.go (4 hunks)
Files skipped from review as they are similar to previous changes (3)
- protocol/streaming/noop_streaming_manager.go
- protocol/streaming/types/interface.go
- protocol/x/subaccounts/keeper/subaccount.go
Additional comments not posted (10)
protocol/streaming/full_node_streaming_manager.go (10)
42-43
: Addition ofsubaccountIdToSubscriptionIdMapping
is appropriate.This field is necessary for managing subaccount-specific subscriptions effectively.
59-60
: Addition ofsubaccountIds
inOrderbookSubscription
is appropriate.This field supports the inclusion of subaccount IDs in subscription management.
86-86
: Initialization ofsubaccountIdToSubscriptionIdMapping
is appropriate.This ensures the map is ready for managing subaccount subscriptions.
168-178
: Update logic forsubaccountIdToSubscriptionIdMapping
is consistent.The logic mirrors that of
clobPairIdToSubscriptionIdMapping
, ensuring uniform subscription management.
256-269
: Removal logic forsubaccountIdToSubscriptionIdMapping
is consistent.This consistency helps maintain the integrity of subscription management.
303-319
:getSubaccountStreamUpdates
method is well-implemented.The method correctly transforms subaccount updates into stream updates.
505-532
:SendSubaccountUpdates
method is well-implemented.The method effectively groups and sends subaccount updates. Note the existing TODO for aggregation.
563-584
:AddSubaccountUpdatesToCache
method is consistent with existing caching logic.The method follows established patterns for adding updates to the cache.
Line range hint
664-691
:InitializeNewStreams
method appropriately includes subaccount snapshots.The method effectively integrates subaccount snapshot logic with existing orderbook snapshot functionality.
146-153
: Handling of subaccount IDs inSubscribe
is consistent.The conversion from pointers to values and the mapping logic align with existing patterns.
Ensure that subaccount ID handling is consistent throughout the codebase.
Verification successful
Handling of subaccount IDs is consistent across the codebase.
The conversion from pointers to values and the mapping logic in the
Subscribe
method align with existing patterns throughout the codebase. No inconsistencies were found.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify consistent handling of subaccount IDs across the codebase. # Test: Search for subaccount ID handling patterns. Expect: Consistent usage. rg --type go -A 5 $'subaccountId'Length of output: 203726
|
||
var streamUpdates []clobtypes.StreamUpdate | ||
streamUpdates = append(streamUpdates, sm.getOrderbookStreamUpdate(offchainUpdates, blockHeight, execMode)...) | ||
streamUpdates = append(streamUpdates, sm.getSubaccountStreamUpdates(saUpdates, blockHeight, execMode)...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meganit: getSubaccountStreamUpdate
seems like some getter function for the full node streaming manager whereas it is a struct wrapping function, can we rename that to something else or just inline define the struct definitions in this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to toSubaccountStreamUpdates
and toOrderbookStreamUpdate
@@ -456,7 +597,6 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( | |||
sm.streamUpdateCache = nil | |||
sm.streamUpdateSubscriptionCache = nil | |||
} | |||
sm.EmitMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, i see it was pulled out
@@ -410,7 +485,7 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( | |||
clobPairId = takerOrder.OrderId.ClobPairId | |||
} | |||
|
|||
sm.AddUpdatesToCache( | |||
sm.AddOrderbookUpdatesToCache( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename function since this is used for Taker Orders as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to AddOrderUpdatesToCache
// FlushCacheIfFull removes all subscriptions and wipes the buffer if buffer overflows. | ||
// Note this method requires the lock and assumes that the lock has already been | ||
// acquired by the caller. | ||
func (sm *FullNodeStreamingManagerImpl) FlushCacheIfFull() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we rename this? FlushCacheIfFull
-> RemoveSubscriptionsAndClearBufferIfFull
. Want to make sure it's explicit it's not sending out updates since all our other update emission functions are prefixed by Flush
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (4)
- protocol/streaming/full_node_streaming_manager.go (17 hunks)
- protocol/streaming/noop_streaming_manager.go (3 hunks)
- protocol/streaming/types/interface.go (4 hunks)
- protocol/x/subaccounts/keeper/subaccount.go (4 hunks)
Files skipped from review as they are similar to previous changes (3)
- protocol/streaming/full_node_streaming_manager.go
- protocol/streaming/types/interface.go
- protocol/x/subaccounts/keeper/subaccount.go
Additional comments not posted (5)
protocol/streaming/noop_streaming_manager.go (5)
Line range hint
23-28
:
LGTM! The function signature update is appropriate.The addition of the
SubaccountId
parameter aligns with the intended subaccount streaming functionality.
54-59
: LGTM! The new method aligns with the class's purpose.The
SendSubaccountUpdates
method is correctly defined with appropriate parameters for handling subaccount updates.
61-63
: LGTM! The function is consistent with the class's purpose.The
TracksSubaccountId
method correctly reflects the no-op nature by returningfalse
.
64-69
: LGTM! The function signature update is appropriate.The addition of the
getSubaccountSnapshot
parameter aligns with the intended subaccount streaming functionality.
Line range hint
71-73
: No changes detected in theStop
function.The function remains unchanged and consistent with the class's purpose.
Changelist
Test Plan
Tested in dev env
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
New Features
Bug Fixes
Documentation