Skip to content

Commit

Permalink
[CT-652] add command line flag for full node streaming (#1145)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 committed Mar 15, 2024
1 parent ea73729 commit fb750c8
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
10 changes: 6 additions & 4 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func New(
indexerFlags.SendOffchainData,
)

app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger)
app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, logger)

timeProvider := &timelib.TimeProviderImpl{}

Expand Down Expand Up @@ -1608,9 +1608,11 @@ func getIndexerFromOptions(
// options. This function will default to returning a no-op instance.
func getGrpcStreamingManagerFromOptions(
appFlags flags.Flags,
appOpts servertypes.AppOptions,
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
return streaming.NewGrpcStreamingManager()
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
}
32 changes: 32 additions & 0 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Flags struct {
// Existing flags
GrpcAddress string
GrpcEnable bool

// Grpc Streaming
GrpcStreamingEnabled bool
}

// List of CLI flags.
Expand All @@ -31,6 +34,9 @@ const (
// Cosmos flags below. These config values can be set as flags or in config.toml.
GrpcAddress = "grpc.address"
GrpcEnable = "grpc.enable"

// Grpc Streaming
GrpcStreamingEnabled = "grpc-streaming-enabled"
)

// Default values.
Expand All @@ -39,6 +45,8 @@ const (
DefaultDdTraceAgentPort = 8126
DefaultNonValidatingFullNode = false
DefaultDdErrorTrackingFormat = false

DefaultGrpcStreamingEnabled = false
)

// AddFlagsToCmd adds flags to app initialization.
Expand Down Expand Up @@ -67,6 +75,11 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultDdErrorTrackingFormat,
"Enable formatting of log error tags to datadog error tracking format",
)
cmd.Flags().Bool(
GrpcStreamingEnabled,
DefaultGrpcStreamingEnabled,
"Whether to enable grpc streaming for full nodes",
)
}

// Validate checks that the flags are valid.
Expand All @@ -75,6 +88,17 @@ func (f *Flags) Validate() error {
if !f.NonValidatingFullNode && !f.GrpcEnable {
return fmt.Errorf("grpc.enable must be set to true - validating requires gRPC server")
}

// Grpc streaming
if f.GrpcStreamingEnabled {
if !f.GrpcEnable {
return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server")
}

if !f.NonValidatingFullNode {
return fmt.Errorf("grpc-streaming-enabled can only be set to true for non-validating full nodes")
}
}
return nil
}

Expand All @@ -93,6 +117,8 @@ func GetFlagValuesFromOptions(
// These are the default values from the Cosmos flags.
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,

GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
}

// Populate the flags if they exist.
Expand Down Expand Up @@ -132,5 +158,11 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(GrpcStreamingEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.GrpcStreamingEnabled = v
}
}

return result
}
29 changes: 27 additions & 2 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package flags_test

import (
"fmt"
"github.com/cosmos/cosmos-sdk/server/config"
"testing"

"github.com/cosmos/cosmos-sdk/server/config"

"github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/spf13/cobra"
Expand All @@ -27,7 +28,11 @@ func TestAddFlagsToCommand(t *testing.T) {
},
fmt.Sprintf("Has %s flag", flags.DdTraceAgentPort): {
flagName: flags.DdTraceAgentPort,
}}
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): {
flagName: flags.GrpcStreamingEnabled,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -62,6 +67,22 @@ func TestValidate(t *testing.T) {
},
expectedErr: fmt.Errorf("grpc.enable must be set to true - validating requires gRPC server"),
},
"failure - gRPC streaming enabled for validating nodes": {
flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
},
expectedErr: fmt.Errorf("grpc-streaming-enabled can only be set to true for non-validating full nodes"),
},
"failure - gRPC streaming enabled with gRPC disabled": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: false,
GrpcStreamingEnabled: true,
},
expectedErr: fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server"),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -86,13 +107,15 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedDdTraceAgentPort uint16
expectedGrpcAddress string
expectedGrpcEnable bool
expectedGrpcStreamingEnable bool
}{
"Sets to default if unset": {
expectedNonValidatingFullNodeFlag: false,
expectedDdAgentHost: "",
expectedDdTraceAgentPort: 8126,
expectedGrpcAddress: "localhost:9090",
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
},
"Sets values from options": {
optsMap: map[string]any{
Expand All @@ -101,12 +124,14 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
flags.DdTraceAgentPort: uint16(777),
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcStreamingEnabled: "true",
},
expectedNonValidatingFullNodeFlag: true,
expectedDdAgentHost: "agentHostTest",
expectedDdTraceAgentPort: 777,
expectedGrpcEnable: false,
expectedGrpcAddress: "localhost:9091",
expectedGrpcStreamingEnable: true,
},
}

Expand Down

0 comments on commit fb750c8

Please sign in to comment.