Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport v4.x] backport full node streaming to v4.x branch #1270

Merged
merged 11 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Rpc } from "../../helpers";
import * as _m0 from "protobufjs/minimal";
import { QueryClient, createProtobufRpcClient } from "@cosmjs/stargate";
import { QueryGetClobPairRequest, QueryClobPairResponse, QueryAllClobPairRequest, QueryClobPairAllResponse, MevNodeToNodeCalculationRequest, MevNodeToNodeCalculationResponse, QueryEquityTierLimitConfigurationRequest, QueryEquityTierLimitConfigurationResponse, QueryBlockRateLimitConfigurationRequest, QueryBlockRateLimitConfigurationResponse, QueryLiquidationsConfigurationRequest, QueryLiquidationsConfigurationResponse } from "./query";
import { QueryGetClobPairRequest, QueryClobPairResponse, QueryAllClobPairRequest, QueryClobPairAllResponse, MevNodeToNodeCalculationRequest, MevNodeToNodeCalculationResponse, QueryEquityTierLimitConfigurationRequest, QueryEquityTierLimitConfigurationResponse, QueryBlockRateLimitConfigurationRequest, QueryBlockRateLimitConfigurationResponse, QueryLiquidationsConfigurationRequest, QueryLiquidationsConfigurationResponse, StreamOrderbookUpdatesRequest, StreamOrderbookUpdatesResponse } from "./query";
/** Query defines the gRPC querier service. */

export interface Query {
Expand All @@ -22,6 +22,9 @@ export interface Query {
/** Queries LiquidationsConfiguration. */

liquidationsConfiguration(request?: QueryLiquidationsConfigurationRequest): Promise<QueryLiquidationsConfigurationResponse>;
/** Streams orderbook updates. */

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse>;
}
export class QueryClientImpl implements Query {
private readonly rpc: Rpc;
Expand All @@ -34,6 +37,7 @@ export class QueryClientImpl implements Query {
this.equityTierLimitConfiguration = this.equityTierLimitConfiguration.bind(this);
this.blockRateLimitConfiguration = this.blockRateLimitConfiguration.bind(this);
this.liquidationsConfiguration = this.liquidationsConfiguration.bind(this);
this.streamOrderbookUpdates = this.streamOrderbookUpdates.bind(this);
}

clobPair(request: QueryGetClobPairRequest): Promise<QueryClobPairResponse> {
Expand Down Expand Up @@ -74,6 +78,12 @@ export class QueryClientImpl implements Query {
return promise.then(data => QueryLiquidationsConfigurationResponse.decode(new _m0.Reader(data)));
}

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse> {
const data = StreamOrderbookUpdatesRequest.encode(request).finish();
const promise = this.rpc.request("dydxprotocol.clob.Query", "StreamOrderbookUpdates", data);
return promise.then(data => StreamOrderbookUpdatesResponse.decode(new _m0.Reader(data)));
}

}
export const createRpcQueryExtension = (base: QueryClient) => {
const rpc = createProtobufRpcClient(base);
Expand Down Expand Up @@ -101,6 +111,10 @@ export const createRpcQueryExtension = (base: QueryClient) => {

liquidationsConfiguration(request?: QueryLiquidationsConfigurationRequest): Promise<QueryLiquidationsConfigurationResponse> {
return queryService.liquidationsConfiguration(request);
},

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse> {
return queryService.streamOrderbookUpdates(request);
}

};
Expand Down
203 changes: 203 additions & 0 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ClobPair, ClobPairSDKType } from "./clob_pair";
import { EquityTierLimitConfiguration, EquityTierLimitConfigurationSDKType } from "./equity_tier_limit_config";
import { BlockRateLimitConfiguration, BlockRateLimitConfigurationSDKType } from "./block_rate_limit_config";
import { LiquidationsConfig, LiquidationsConfigSDKType } from "./liquidations_config";
import { OffChainUpdateV1, OffChainUpdateV1SDKType } from "../indexer/off_chain_updates/off_chain_updates";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial, Long } from "../../helpers";
/** QueryGetClobPairRequest is request type for the ClobPair method. */
Expand Down Expand Up @@ -198,6 +199,76 @@ export interface QueryLiquidationsConfigurationResponse {
export interface QueryLiquidationsConfigurationResponseSDKType {
liquidations_config?: LiquidationsConfigSDKType;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesRequest {
/** Clob pair ids to stream orderbook updates for. */
clobPairId: number[];
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesRequestSDKType {
/** Clob pair ids to stream orderbook updates for. */
clob_pair_id: number[];
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponse {
/** Orderbook updates for the clob pair. */
updates: OffChainUpdateV1[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponseSDKType {
/** Orderbook updates for the clob pair. */
updates: OffChainUpdateV1SDKType[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
return {
Expand Down Expand Up @@ -789,4 +860,136 @@ export const QueryLiquidationsConfigurationResponse = {
return message;
}

};

function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesRequest {
return {
clobPairId: []
};
}

export const StreamOrderbookUpdatesRequest = {
encode(message: StreamOrderbookUpdatesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
writer.uint32(10).fork();

for (const v of message.clobPairId) {
writer.uint32(v);
}

writer.ldelim();
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdatesRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStreamOrderbookUpdatesRequest();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
if ((tag & 7) === 2) {
const end2 = reader.uint32() + reader.pos;

while (reader.pos < end2) {
message.clobPairId.push(reader.uint32());
}
} else {
message.clobPairId.push(reader.uint32());
}

break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StreamOrderbookUpdatesRequest>): StreamOrderbookUpdatesRequest {
const message = createBaseStreamOrderbookUpdatesRequest();
message.clobPairId = object.clobPairId?.map(e => e) || [];
return message;
}

};

function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false,
blockHeight: 0,
execMode: 0
};
}

export const StreamOrderbookUpdatesResponse = {
encode(message: StreamOrderbookUpdatesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
for (const v of message.updates) {
OffChainUpdateV1.encode(v!, writer.uint32(10).fork()).ldelim();
}

if (message.snapshot === true) {
writer.uint32(16).bool(message.snapshot);
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdatesResponse {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStreamOrderbookUpdatesResponse();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32()));
break;

case 2:
message.snapshot = reader.bool();
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StreamOrderbookUpdatesResponse>): StreamOrderbookUpdatesResponse {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

};
35 changes: 35 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "dydxprotocol/clob/clob_pair.proto";
import "dydxprotocol/clob/equity_tier_limit_config.proto";
import "dydxprotocol/clob/liquidations_config.proto";
import "dydxprotocol/clob/mev.proto";
import "dydxprotocol/indexer/off_chain_updates/off_chain_updates.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

Expand Down Expand Up @@ -50,6 +51,12 @@ service Query {
returns (QueryLiquidationsConfigurationResponse) {
option (google.api.http).get = "/dydxprotocol/clob/liquidations_config";
}

// GRPC Streams

// Streams orderbook updates.
rpc StreamOrderbookUpdates(StreamOrderbookUpdatesRequest)
returns (stream StreamOrderbookUpdatesResponse);
}

// QueryGetClobPairRequest is request type for the ClobPair method.
Expand Down Expand Up @@ -126,3 +133,31 @@ message QueryLiquidationsConfigurationRequest {}
message QueryLiquidationsConfigurationResponse {
LiquidationsConfig liquidations_config = 1 [ (gogoproto.nullable) = false ];
}

// StreamOrderbookUpdatesRequest is a request message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesRequest {
// Clob pair ids to stream orderbook updates for.
repeated uint32 clob_pair_id = 1;
}

// StreamOrderbookUpdatesResponse is a response message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesResponse {
// Orderbook updates for the clob pair.
repeated dydxprotocol.indexer.off_chain_updates.OffChainUpdateV1 updates = 1
[ (gogoproto.nullable) = false ];

// Snapshot indicates if the response is from a snapshot of the orderbook.
// This is true for the initial response and false for all subsequent updates.
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 3;

// Exec mode of the updates.
uint32 exec_mode = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.off_chain_updates;
import "dydxprotocol/indexer/shared/removal_reason.proto";
import "dydxprotocol/indexer/protocol/v1/clob.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types";

// Do not make any breaking changes to these protos, a new version should be
// created if a breaking change is needed.
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/protocol/v1/clob.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.protocol.v1;
import "dydxprotocol/indexer/protocol/v1/subaccount.proto";
import "gogoproto/gogo.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types";

// Initial copy of protos from dYdX chain application state protos for the clob
// module for use to send Indexer specific messages. Do not make any breaking
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/protocol/v1/subaccount.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.protocol.v1;
import "cosmos_proto/cosmos.proto";
import "gogoproto/gogo.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types";

// Initial copy of protos from dYdX chain application state protos for the
// subaccount module for use to send Indexer specific messages. Do not make any
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/shared/removal_reason.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
syntax = "proto3";
package dydxprotocol.indexer.shared;

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/shared";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types";

// TODO(DEC-869): Update reasons/statuses for Advanced Orders.

Expand Down
Loading
Loading