Skip to content

Commit

Permalink
GRPC Streaming Batching (backport #1633) (#1663)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com>
Co-authored-by: Jonathan Fung <jonathan@dydx.exchange>
  • Loading branch information
3 people committed Jun 13, 2024
1 parent b3d7db4 commit ece4b42
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 86 deletions.
9 changes: 8 additions & 1 deletion protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ func New(
if app.SlinkyClient != nil {
app.SlinkyClient.Stop()
}
if app.GrpcStreamingManager != nil {
app.GrpcStreamingManager.Stop()
}
return nil
},
)
Expand Down Expand Up @@ -1930,7 +1933,11 @@ func getGrpcStreamingManagerFromOptions(
) (manager streamingtypes.GrpcStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager(logger)
return streaming.NewGrpcStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBufferSize,
)
}
return streaming.NewNoopGrpcStreamingManager()
}
53 changes: 46 additions & 7 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ type Flags struct {
GrpcEnable bool

// Grpc Streaming
GrpcStreamingEnabled bool
VEOracleEnabled bool // Slinky Vote Extensions
GrpcStreamingEnabled bool
GrpcStreamingFlushIntervalMs uint32
GrpcStreamingMaxBufferSize uint32

VEOracleEnabled bool // Slinky Vote Extensions
}

// List of CLI flags.
Expand All @@ -37,7 +40,9 @@ const (
GrpcEnable = "grpc.enable"

// Grpc Streaming
GrpcStreamingEnabled = "grpc-streaming-enabled"
GrpcStreamingEnabled = "grpc-streaming-enabled"
GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms"
GrpcStreamingMaxBufferSize = "grpc-streaming-max-buffer-size"

// Slinky VEs enabled
VEOracleEnabled = "slinky-vote-extension-oracle-enabled"
Expand All @@ -50,8 +55,11 @@ const (
DefaultNonValidatingFullNode = false
DefaultDdErrorTrackingFormat = false

DefaultGrpcStreamingEnabled = false
DefaultVEOracleEnabled = true
DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBufferSize = 10000

DefaultVEOracleEnabled = true
)

// AddFlagsToCmd adds flags to app initialization.
Expand Down Expand Up @@ -85,6 +93,16 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultGrpcStreamingEnabled,
"Whether to enable grpc streaming for full nodes",
)
cmd.Flags().Uint32(
GrpcStreamingFlushIntervalMs,
DefaultGrpcStreamingFlushIntervalMs,
"Flush interval (in ms) for grpc streaming",
)
cmd.Flags().Uint32(
GrpcStreamingMaxBufferSize,
DefaultGrpcStreamingMaxBufferSize,
"Maximum buffer size before grpc streaming cancels all connections",
)
cmd.Flags().Bool(
VEOracleEnabled,
DefaultVEOracleEnabled,
Expand All @@ -104,6 +122,12 @@ func (f *Flags) Validate() error {
if !f.GrpcEnable {
return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server")
}
if f.GrpcStreamingMaxBufferSize == 0 {
return fmt.Errorf("grpc streaming buffer size must be positive number")
}
if f.GrpcStreamingFlushIntervalMs == 0 {
return fmt.Errorf("grpc streaming flush interval must be positive number")
}
}
return nil
}
Expand All @@ -124,8 +148,11 @@ func GetFlagValuesFromOptions(
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,

GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
VEOracleEnabled: true,
GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs,
GrpcStreamingMaxBufferSize: DefaultGrpcStreamingMaxBufferSize,

VEOracleEnabled: true,
}

// Populate the flags if they exist.
Expand Down Expand Up @@ -171,6 +198,18 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(GrpcStreamingFlushIntervalMs); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.GrpcStreamingFlushIntervalMs = v
}
}

if option := appOpts.Get(GrpcStreamingMaxBufferSize); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.GrpcStreamingMaxBufferSize = v
}
}

if option := appOpts.Get(VEOracleEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.VEOracleEnabled = v
Expand Down
69 changes: 60 additions & 9 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): {
flagName: flags.GrpcStreamingEnabled,
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingFlushIntervalMs): {
flagName: flags.GrpcStreamingFlushIntervalMs,
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBufferSize): {
flagName: flags.GrpcStreamingMaxBufferSize,
},
}

for name, tc := range tests {
Expand Down Expand Up @@ -63,9 +69,11 @@ func TestValidate(t *testing.T) {
},
"success - gRPC streaming enabled for validating nodes": {
flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBufferSize: 10000,
},
},
"failure - gRPC disabled": {
Expand All @@ -82,6 +90,26 @@ func TestValidate(t *testing.T) {
},
expectedErr: fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server"),
},
"failure - gRPC streaming enabled with zero buffer size": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBufferSize: 0,
},
expectedErr: fmt.Errorf("grpc streaming buffer size must be positive number"),
},
"failure - gRPC streaming enabled with zero flush interval ms": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 0,
GrpcStreamingMaxBufferSize: 10000,
},
expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -107,6 +135,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcAddress string
expectedGrpcEnable bool
expectedGrpcStreamingEnable bool
expectedGrpcStreamingFlushMs uint32
expectedGrpcStreamingBufferSize uint32
}{
"Sets to default if unset": {
expectedNonValidatingFullNodeFlag: false,
Expand All @@ -115,22 +145,28 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcAddress: "localhost:9090",
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBufferSize: 10000,
},
"Sets values from options": {
optsMap: map[string]any{
flags.NonValidatingFullNodeFlag: true,
flags.DdAgentHost: "agentHostTest",
flags.DdTraceAgentPort: uint16(777),
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcStreamingEnabled: "true",
flags.NonValidatingFullNodeFlag: true,
flags.DdAgentHost: "agentHostTest",
flags.DdTraceAgentPort: uint16(777),
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcStreamingEnabled: "true",
flags.GrpcStreamingFlushIntervalMs: uint32(408),
flags.GrpcStreamingMaxBufferSize: uint32(650),
},
expectedNonValidatingFullNodeFlag: true,
expectedDdAgentHost: "agentHostTest",
expectedDdTraceAgentPort: 777,
expectedGrpcEnable: false,
expectedGrpcAddress: "localhost:9091",
expectedGrpcStreamingEnable: true,
expectedGrpcStreamingFlushMs: 408,
expectedGrpcStreamingBufferSize: 650,
},
}

Expand Down Expand Up @@ -168,6 +204,21 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
tc.expectedGrpcAddress,
flags.GrpcAddress,
)
require.Equal(
t,
tc.expectedGrpcStreamingEnable,
flags.GrpcStreamingEnabled,
)
require.Equal(
t,
tc.expectedGrpcStreamingFlushMs,
flags.GrpcStreamingFlushIntervalMs,
)
require.Equal(
t,
tc.expectedGrpcStreamingBufferSize,
flags.GrpcStreamingMaxBufferSize,
)
})
}
}
3 changes: 3 additions & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ const (
// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcEmitProtocolUpdateCount = "grpc_emit_protocol_update_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
6 changes: 3 additions & 3 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ece4b42

Please sign in to comment.