Skip to content

Commit

Permalink
[occ] Add concurrency worker configuration (#324)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
- `ConcurrencyWorkers` represents the number of workers to use for
concurrent transactions
- since concurrrency-workers is a baseapp-level setting, implementations
(like sei-chain) shouldn't have to pass it (but can)
- it defaults to 10 if not set (via cli default value)
- it defaults to 10 in app.toml only if that file is being created (and
doesn't exist)
- if explicitly set to zero on command line, it will override with the
default (for safety)
- cli takes precedence over the config file
- no one has to do anything to get it to be 10 (no config changes no
sei-chain changes required (aside from new cosmos version))

## Testing performed to validate your change
- Unit Tests for setting the value
- Manually testing scenarios with sei-chain
  • Loading branch information
stevenlanders authored and udpatil committed Jan 18, 2024
1 parent 953836e commit b67a28e
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 1 deletion.
21 changes: 20 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/armon/go-metrics"
"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/utils/tracing"
"github.com/gogo/protobuf/proto"
sdbm "github.com/sei-protocol/sei-tm-db/backends"
Expand Down Expand Up @@ -56,7 +57,8 @@ const (
FlagArchivalArweaveIndexDBFullPath = "archival-arweave-index-db-full-path"
FlagArchivalArweaveNodeURL = "archival-arweave-node-url"

FlagChainID = "chain-id"
FlagChainID = "chain-id"
FlagConcurrencyWorkers = "concurrency-workers"
)

var (
Expand Down Expand Up @@ -163,6 +165,8 @@ type BaseApp struct { //nolint: maligned
TmConfig *tmcfg.Config

TracingInfo *tracing.Info

concurrencyWorkers int
}

type appStore struct {
Expand Down Expand Up @@ -287,6 +291,16 @@ func NewBaseApp(
}
app.startCompactionRoutine(db)

// if no option overrode already, initialize to the flags value
// this avoids forcing every implementation to pass an option, but allows it
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = cast.ToInt(appOpts.Get(FlagConcurrencyWorkers))
}
// safely default this to the default value if 0
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = config.DefaultConcurrencyWorkers
}

return app
}

Expand All @@ -300,6 +314,11 @@ func (app *BaseApp) AppVersion() uint64 {
return app.appVersion
}

// ConcurrencyWorkers returns the number of concurrent workers for the BaseApp.
func (app *BaseApp) ConcurrencyWorkers() int {
return app.concurrencyWorkers
}

// Version returns the application's version string.
func (app *BaseApp) Version() string {
return app.version
Expand Down
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func SetSnapshotInterval(interval uint64) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotInterval(interval) }
}

func SetConcurrencyWorkers(workers int) func(*BaseApp) {
return func(app *BaseApp) { app.SetConcurrencyWorkers(workers) }
}

// SetSnapshotKeepRecent sets the recent snapshots to keep.
func SetSnapshotKeepRecent(keepRecent uint32) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotKeepRecent(keepRecent) }
Expand Down Expand Up @@ -290,6 +294,13 @@ func (app *BaseApp) SetSnapshotInterval(snapshotInterval uint64) {
app.snapshotInterval = snapshotInterval
}

func (app *BaseApp) SetConcurrencyWorkers(workers int) {
if app.sealed {
panic("SetConcurrencyWorkers() on sealed BaseApp")
}
app.concurrencyWorkers = workers
}

// SetSnapshotKeepRecent sets the number of recent snapshots to keep.
func (app *BaseApp) SetSnapshotKeepRecent(snapshotKeepRecent uint32) {
if app.sealed {
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (

// DefaultGRPCWebAddress defines the default address to bind the gRPC-web server to.
DefaultGRPCWebAddress = "0.0.0.0:9091"

// DefaultConcurrencyWorkers defines the default workers to use for concurrent transactions
DefaultConcurrencyWorkers = 10
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -88,6 +91,10 @@ type BaseConfig struct {
SeparateOrphanVersionsToKeep int64 `mapstructure:"separate-orphan-versions-to-keep"`
NumOrphanPerFile int `mapstructure:"num-orphan-per-file"`
OrphanDirectory string `mapstructure:"orphan-dir"`

// ConcurrencyWorkers defines the number of workers to use for concurrent
// transaction execution. A value of -1 means unlimited workers. Default value is 10.
ConcurrencyWorkers int `mapstructure:"concurrency-workers"`
}

// APIConfig defines the API listener configuration.
Expand Down Expand Up @@ -238,6 +245,7 @@ func DefaultConfig() *Config {
IAVLDisableFastNode: true,
CompactionInterval: 0,
NoVersioning: false,
ConcurrencyWorkers: DefaultConcurrencyWorkers,
},
Telemetry: telemetry.Config{
Enabled: false,
Expand Down Expand Up @@ -314,6 +322,7 @@ func GetConfig(v *viper.Viper) (Config, error) {
SeparateOrphanVersionsToKeep: v.GetInt64("separate-orphan-versions-to-keep"),
NumOrphanPerFile: v.GetInt("num-orphan-per-file"),
OrphanDirectory: v.GetString("orphan-dir"),
ConcurrencyWorkers: v.GetInt("concurrency-workers"),
},
Telemetry: telemetry.Config{
ServiceName: v.GetString("telemetry.service-name"),
Expand Down
5 changes: 5 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ func TestSetSnapshotDirectory(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, "", cfg.StateSync.SnapshotDirectory)
}

func TestSetConcurrencyWorkers(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, DefaultConcurrencyWorkers, cfg.ConcurrencyWorkers)
}
3 changes: 3 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ num-orphan-per-file = {{ .BaseConfig.NumOrphanPerFile }}
# if separate-orphan-storage is true, where to store orphan data
orphan-dir = "{{ .BaseConfig.OrphanDirectory }}"
# concurrency-workers defines how many workers to run for concurrent transaction execution
# concurrency-workers = {{ .BaseConfig.ConcurrencyWorkers }}
###############################################################################
### Telemetry Configuration ###
###############################################################################
Expand Down
2 changes: 2 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
FlagSeparateOrphanVersionsToKeep = "separate-orphan-versions-to-keep"
FlagNumOrphanPerFile = "num-orphan-per-file"
FlagOrphanDirectory = "orphan-dir"
FlagConcurrencyWorkers = "concurrency-workers"

// state sync-related flags
FlagStateSyncSnapshotInterval = "state-sync.snapshot-interval"
Expand Down Expand Up @@ -252,6 +253,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
cmd.Flags().Int64(FlagSeparateOrphanVersionsToKeep, 2, "Number of versions to keep if storing orphans separately")
cmd.Flags().Int(FlagNumOrphanPerFile, 100000, "Number of orphans to store on each file if storing orphans separately")
cmd.Flags().String(FlagOrphanDirectory, path.Join(defaultNodeHome, "orphans"), "Directory to store orphan files if storing orphans separately")
cmd.Flags().Int(FlagConcurrencyWorkers, config.DefaultConcurrencyWorkers, "Number of workers to process concurrent transactions")

cmd.Flags().Bool(flagGRPCOnly, false, "Start the node in gRPC query only mode (no Tendermint process is started)")
cmd.Flags().Bool(flagGRPCEnable, true, "Define if the gRPC server should be enabled")
Expand Down

0 comments on commit b67a28e

Please sign in to comment.