Skip to content

Commit

Permalink
[Backport v3.x] backport full node streaming PRs (#1181)
Browse files Browse the repository at this point in the history
* [CT-645] Move off chain updates and v1 to a different package (#1131)

* [CT-645] Add protos for orderbook stream query service

* move removal reasons to a separate package

* [CT-645] Add protos for orderbook stream query service (#1133)

* [CT-645] Add protos for orderbook stream query service

* make update not nullable

* fix build

* [CT-644] instantiate grpc stream manager (#1134)

* [CT-644] instantiate grpc stream manager

* update type

* update channel type

* [CT-646] stream offchain updates through stream manager (#1138)

* [CT-646] stream offchain updates through stream manager

* comments

* fix lint

* get rid of finished

* comments

* comments

* [CT-652] add command line flag for full node streaming (#1145)

* [CT-647] construct the initial orderbook snapshot (#1147)

* [CT-647] construct the initial orderbook snapshot

* [CT-647] initialize new streams and send orderbook snapshot (#1152)

* [CT-647] initialize new streams and send orderbook snapshot

* use sync once

* comments

* fix test
  • Loading branch information
jayy04 authored Mar 15, 2024
1 parent 85fb10a commit 2a47cf5
Show file tree
Hide file tree
Showing 51 changed files with 1,890 additions and 588 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Rpc } from "../../helpers";
import * as _m0 from "protobufjs/minimal";
import { QueryClient, createProtobufRpcClient } from "@cosmjs/stargate";
import { QueryGetClobPairRequest, QueryClobPairResponse, QueryAllClobPairRequest, QueryClobPairAllResponse, MevNodeToNodeCalculationRequest, MevNodeToNodeCalculationResponse, QueryEquityTierLimitConfigurationRequest, QueryEquityTierLimitConfigurationResponse, QueryBlockRateLimitConfigurationRequest, QueryBlockRateLimitConfigurationResponse, QueryLiquidationsConfigurationRequest, QueryLiquidationsConfigurationResponse } from "./query";
import { QueryGetClobPairRequest, QueryClobPairResponse, QueryAllClobPairRequest, QueryClobPairAllResponse, MevNodeToNodeCalculationRequest, MevNodeToNodeCalculationResponse, QueryEquityTierLimitConfigurationRequest, QueryEquityTierLimitConfigurationResponse, QueryBlockRateLimitConfigurationRequest, QueryBlockRateLimitConfigurationResponse, QueryLiquidationsConfigurationRequest, QueryLiquidationsConfigurationResponse, StreamOrderbookUpdatesRequest, StreamOrderbookUpdatesResponse } from "./query";
/** Query defines the gRPC querier service. */

export interface Query {
Expand All @@ -22,6 +22,9 @@ export interface Query {
/** Queries LiquidationsConfiguration. */

liquidationsConfiguration(request?: QueryLiquidationsConfigurationRequest): Promise<QueryLiquidationsConfigurationResponse>;
/** Streams orderbook updates. */

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse>;
}
export class QueryClientImpl implements Query {
private readonly rpc: Rpc;
Expand All @@ -34,6 +37,7 @@ export class QueryClientImpl implements Query {
this.equityTierLimitConfiguration = this.equityTierLimitConfiguration.bind(this);
this.blockRateLimitConfiguration = this.blockRateLimitConfiguration.bind(this);
this.liquidationsConfiguration = this.liquidationsConfiguration.bind(this);
this.streamOrderbookUpdates = this.streamOrderbookUpdates.bind(this);
}

clobPair(request: QueryGetClobPairRequest): Promise<QueryClobPairResponse> {
Expand Down Expand Up @@ -74,6 +78,12 @@ export class QueryClientImpl implements Query {
return promise.then(data => QueryLiquidationsConfigurationResponse.decode(new _m0.Reader(data)));
}

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse> {
const data = StreamOrderbookUpdatesRequest.encode(request).finish();
const promise = this.rpc.request("dydxprotocol.clob.Query", "StreamOrderbookUpdates", data);
return promise.then(data => StreamOrderbookUpdatesResponse.decode(new _m0.Reader(data)));
}

}
export const createRpcQueryExtension = (base: QueryClient) => {
const rpc = createProtobufRpcClient(base);
Expand Down Expand Up @@ -101,6 +111,10 @@ export const createRpcQueryExtension = (base: QueryClient) => {

liquidationsConfiguration(request?: QueryLiquidationsConfigurationRequest): Promise<QueryLiquidationsConfigurationResponse> {
return queryService.liquidationsConfiguration(request);
},

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse> {
return queryService.streamOrderbookUpdates(request);
}

};
Expand Down
165 changes: 165 additions & 0 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ClobPair, ClobPairSDKType } from "./clob_pair";
import { EquityTierLimitConfiguration, EquityTierLimitConfigurationSDKType } from "./equity_tier_limit_config";
import { BlockRateLimitConfiguration, BlockRateLimitConfigurationSDKType } from "./block_rate_limit_config";
import { LiquidationsConfig, LiquidationsConfigSDKType } from "./liquidations_config";
import { OffChainUpdateV1, OffChainUpdateV1SDKType } from "../indexer/off_chain_updates/off_chain_updates";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial, Long } from "../../helpers";
/** QueryGetClobPairRequest is request type for the ClobPair method. */
Expand Down Expand Up @@ -198,6 +199,58 @@ export interface QueryLiquidationsConfigurationResponse {
export interface QueryLiquidationsConfigurationResponseSDKType {
liquidations_config?: LiquidationsConfigSDKType;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesRequest {
/** Clob pair ids to stream orderbook updates for. */
clobPairId: number[];
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesRequestSDKType {
/** Clob pair ids to stream orderbook updates for. */
clob_pair_id: number[];
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponse {
/** Orderbook updates for the clob pair. */
updates: OffChainUpdateV1[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponseSDKType {
/** Orderbook updates for the clob pair. */
updates: OffChainUpdateV1SDKType[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
return {
Expand Down Expand Up @@ -789,4 +842,116 @@ export const QueryLiquidationsConfigurationResponse = {
return message;
}

};

function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesRequest {
return {
clobPairId: []
};
}

export const StreamOrderbookUpdatesRequest = {
encode(message: StreamOrderbookUpdatesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
writer.uint32(10).fork();

for (const v of message.clobPairId) {
writer.uint32(v);
}

writer.ldelim();
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdatesRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStreamOrderbookUpdatesRequest();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
if ((tag & 7) === 2) {
const end2 = reader.uint32() + reader.pos;

while (reader.pos < end2) {
message.clobPairId.push(reader.uint32());
}
} else {
message.clobPairId.push(reader.uint32());
}

break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StreamOrderbookUpdatesRequest>): StreamOrderbookUpdatesRequest {
const message = createBaseStreamOrderbookUpdatesRequest();
message.clobPairId = object.clobPairId?.map(e => e) || [];
return message;
}

};

function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false
};
}

export const StreamOrderbookUpdatesResponse = {
encode(message: StreamOrderbookUpdatesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
for (const v of message.updates) {
OffChainUpdateV1.encode(v!, writer.uint32(10).fork()).ldelim();
}

if (message.snapshot === true) {
writer.uint32(16).bool(message.snapshot);
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdatesResponse {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStreamOrderbookUpdatesResponse();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32()));
break;

case 2:
message.snapshot = reader.bool();
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StreamOrderbookUpdatesResponse>): StreamOrderbookUpdatesResponse {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
return message;
}

};
28 changes: 28 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "dydxprotocol/clob/clob_pair.proto";
import "dydxprotocol/clob/equity_tier_limit_config.proto";
import "dydxprotocol/clob/liquidations_config.proto";
import "dydxprotocol/clob/mev.proto";
import "dydxprotocol/indexer/off_chain_updates/off_chain_updates.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

Expand Down Expand Up @@ -50,6 +51,12 @@ service Query {
returns (QueryLiquidationsConfigurationResponse) {
option (google.api.http).get = "/dydxprotocol/clob/liquidations_config";
}

// GRPC Streams

// Streams orderbook updates.
rpc StreamOrderbookUpdates(StreamOrderbookUpdatesRequest)
returns (stream StreamOrderbookUpdatesResponse);
}

// QueryGetClobPairRequest is request type for the ClobPair method.
Expand Down Expand Up @@ -126,3 +133,24 @@ message QueryLiquidationsConfigurationRequest {}
message QueryLiquidationsConfigurationResponse {
LiquidationsConfig liquidations_config = 1 [ (gogoproto.nullable) = false ];
}

// StreamOrderbookUpdatesRequest is a request message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesRequest {
// Clob pair ids to stream orderbook updates for.
repeated uint32 clob_pair_id = 1;
}

// StreamOrderbookUpdatesResponse is a response message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesResponse {
// Orderbook updates for the clob pair.
repeated dydxprotocol.indexer.off_chain_updates.OffChainUpdateV1 updates = 1
[ (gogoproto.nullable) = false ];

// Snapshot indicates if the response is from a snapshot of the orderbook.
// This is true for the initial response and false for all subsequent updates.
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.off_chain_updates;
import "dydxprotocol/indexer/shared/removal_reason.proto";
import "dydxprotocol/indexer/protocol/v1/clob.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types";

// Do not make any breaking changes to these protos, a new version should be
// created if a breaking change is needed.
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/protocol/v1/clob.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.protocol.v1;
import "dydxprotocol/indexer/protocol/v1/subaccount.proto";
import "gogoproto/gogo.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types";

// Initial copy of protos from dYdX chain application state protos for the clob
// module for use to send Indexer specific messages. Do not make any breaking
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/protocol/v1/subaccount.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.protocol.v1;
import "cosmos_proto/cosmos.proto";
import "gogoproto/gogo.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types";

// Initial copy of protos from dYdX chain application state protos for the
// subaccount module for use to send Indexer specific messages. Do not make any
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/shared/removal_reason.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
syntax = "proto3";
package dydxprotocol.indexer.shared;

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/shared";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types";

// TODO(DEC-869): Update reasons/statuses for Advanced Orders.

Expand Down
1 change: 1 addition & 0 deletions protocol/app/ante_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func newTestHandlerOptions() HandlerOptions {
nil,
nil,
nil,
nil,
flags.GetDefaultClobFlags(),
rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](),
rate_limit.NewNoOpRateLimiter[*types.MsgCancelOrder](),
Expand Down
30 changes: 27 additions & 3 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"

// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
)

var (
Expand Down Expand Up @@ -284,8 +288,9 @@ type App struct {
// module configurator
configurator module.Configurator

IndexerEventManager indexer_manager.IndexerEventManager
Server *daemonserver.Server
IndexerEventManager indexer_manager.IndexerEventManager
GrpcStreamingManager streamingtypes.GrpcStreamingManager
Server *daemonserver.Server

// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is
Expand Down Expand Up @@ -593,6 +598,9 @@ func New(
tkeys[indexer_manager.TransientStoreKey],
indexerFlags.SendOffchainData,
)

app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, logger)

timeProvider := &timelib.TimeProviderImpl{}

app.EpochsKeeper = *epochsmodulekeeper.NewKeeper(
Expand Down Expand Up @@ -877,7 +885,9 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand All @@ -899,6 +909,7 @@ func New(
app.StatsKeeper,
app.RewardsKeeper,
app.IndexerEventManager,
app.GrpcStreamingManager,
txConfig.TxDecoder(),
clobFlags,
rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](),
Expand Down Expand Up @@ -1592,3 +1603,16 @@ func getIndexerFromOptions(
}
return indexerMessageSender, indexerFlags
}

// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified
// options. This function will default to returning a no-op instance.
func getGrpcStreamingManagerFromOptions(
appFlags flags.Flags,
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
}
Loading

0 comments on commit 2a47cf5

Please sign in to comment.