Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full Node Streaming Order Filtering by Subaccount impl and tests #2693

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -278,6 +278,13 @@ export interface StreamOrderbookUpdatesRequest {
/** Market ids for price updates. */

marketIds: number[];
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filterOrdersBySubaccountId: boolean;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
@@ -293,6 +300,13 @@ export interface StreamOrderbookUpdatesRequestSDKType {
/** Market ids for price updates. */

market_ids: number[];
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filter_orders_by_subaccount_id: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
@@ -1298,7 +1312,8 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
return {
clobPairId: [],
subaccountIds: [],
marketIds: []
marketIds: [],
filterOrdersBySubaccountId: false
};
}

@@ -1323,6 +1338,11 @@ export const StreamOrderbookUpdatesRequest = {
}

writer.ldelim();

if (message.filterOrdersBySubaccountId === true) {
writer.uint32(32).bool(message.filterOrdersBySubaccountId);
}

return writer;
},

@@ -1365,6 +1385,10 @@ export const StreamOrderbookUpdatesRequest = {

break;

case 4:
message.filterOrdersBySubaccountId = reader.bool();
break;

default:
reader.skipType(tag & 7);
break;
@@ -1379,6 +1403,7 @@ export const StreamOrderbookUpdatesRequest = {
message.clobPairId = object.clobPairId?.map(e => e) || [];
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
message.marketIds = object.marketIds?.map(e => e) || [];
message.filterOrdersBySubaccountId = object.filterOrdersBySubaccountId ?? false;
return message;
}

5 changes: 5 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
@@ -186,6 +186,11 @@ message StreamOrderbookUpdatesRequest {

// Market ids for price updates.
repeated uint32 market_ids = 3;

// Filter order updates by subaccount IDs.
// If true, the orderbook updates only include orders from provided subaccount
// IDs.
bool filter_orders_by_subaccount_id = 4;
}

// StreamOrderbookUpdatesResponse is a response message for the
123 changes: 103 additions & 20 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -2,27 +2,25 @@ package streaming

import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/dydxprotocol/v4-chain/protocol/lib"
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
@@ -96,6 +94,41 @@ type OrderbookSubscription struct {
nextSnapshotBlock uint32
}

func NewOrderbookSubscription(
subscriptionId uint32,
clobPairIds []uint32,
subaccountIds []satypes.SubaccountId,
marketIds []uint32,
messageSender types.OutgoingMessageSender,
updatesChannel chan []clobtypes.StreamUpdate,
) *OrderbookSubscription {
return &OrderbookSubscription{
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: subaccountIds,
marketIds: marketIds,
messageSender: messageSender,
updatesChannel: updatesChannel,
}
}

func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(
clobPairIds []uint32,
subaccountIds []satypes.SubaccountId,
marketIds []uint32,
messageSender types.OutgoingMessageSender,
) *OrderbookSubscription {
return NewOrderbookSubscription(
sm.getNextAvailableSubscriptionId(),
clobPairIds,
subaccountIds,
marketIds,
messageSender,
make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
)
}

func (sub *OrderbookSubscription) IsInitialized() bool {
return sub.initialized.Load()
}
@@ -187,11 +220,58 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
return id
}

func doFilterStreamUpdateBySubaccount(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIdNumbers []uint32,
logger log.Logger,
) bool {
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
if err == nil {
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
return true
}
} else {
logger.Error(err.Error())
}
}
return false
}

// Filter StreamUpdates for subaccountIdNumbers
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
func FilterStreamUpdateBySubaccount(
updates []clobtypes.StreamUpdate,
subaccountIdNumbers []uint32,
logger log.Logger,
) *[]clobtypes.StreamUpdate {
// If reflection becomes too expensive, split updatesChannel by message type
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
if doFilterStreamUpdateBySubaccount(updateMessage, subaccountIdNumbers, logger) {
filteredUpdates = append(filteredUpdates, update)
}
default:
filteredUpdates = append(filteredUpdates, update)
}
}

if len(filteredUpdates) > 0 {
return &filteredUpdates
}
return nil
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrdersBySubAccountId bool,
messageSender types.OutgoingMessageSender,
) (
err error,
@@ -200,24 +280,21 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
return types.ErrInvalidStreamingRequest
}
if filterOrdersBySubAccountId && (len(subaccountIds) == 0) {
sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds")
return types.ErrInvalidStreamingRequest
}

sm.Lock()
sIds := make([]satypes.SubaccountId, len(subaccountIds))
subaccountIdNumbers := make([]uint32, len(subaccountIds))
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
subaccountIdNumbers[i] = subaccountId.Number
}

subscriptionId := sm.getNextAvailableSubscriptionId()
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)

subscription := &OrderbookSubscription{
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
marketIds: marketIds,
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
@@ -268,6 +345,12 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
if filterOrdersBySubAccountId {
filteredUpdates := FilterStreamUpdateBySubaccount(updates, subaccountIdNumbers, sm.logger)
if filteredUpdates != nil {
updates = *filteredUpdates
}
}
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
@@ -1080,12 +1163,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
sm.FlushStreamUpdatesWithLock()

// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)

// Cache updates for finalized fills.
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
Loading