Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: State Streaming Docs + Explicit Config Support #13894

Merged
merged 8 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration.
* (x/nft) [#13836](https://github.com/cosmos/cosmos-sdk/pull/13836) Remove the validation for `classID` and `nftID` from the NFT module.
* [#13789](https://github.com/cosmos/cosmos-sdk/pull/13789) Add tx `encode` and `decode` endpoints to tx service.
> Note: This endpoint will only encode proto messages, Amino encoding is not supported.
Expand Down
35 changes: 35 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

// FileStreamer defines the store streaming type for file streaming.
FileStreamer = "file"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -196,6 +199,28 @@ type StateSyncConfig struct {
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
}

type (
// StoreConfig defines application configuration for state streaming and other
// storage related operations.
StoreConfig struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still a bit confused how this gets fed into the the streamer? is it viper magic?

Copy link
Collaborator

@yihuang yihuang Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamer just do appOpts.Get("streamers.file.keys")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the magic is already there (granted that too could use some cleanup), but this PR just sets up the default config and template so it's clear to devs how to structure the config.

Streamers []string `mapstructure:"streamers"`
}

// StreamersConfig defines concrete state streaming configuration options. These
// fields are required to be set when state streaming is enabled via a non-empty
// list defined by 'StoreConfig.Streamers'.
StreamersConfig struct {
File FileStreamerConfig `mapstructure:"file"`
}

// FileStreamerConfig defines the file streaming configuration options.
FileStreamerConfig struct {
Keys []string `mapstructure:"keys"`
WriteDir string `mapstructure:"write_dir"`
Prefix string `mapstructure:"prefix"`
}
)

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -207,6 +232,8 @@ type Config struct {
Rosetta RosettaConfig `mapstructure:"rosetta"`
GRPCWeb GRPCWebConfig `mapstructure:"grpc-web"`
StateSync StateSyncConfig `mapstructure:"state-sync"`
Store StoreConfig `mapstructure:"store"`
Streamers StreamersConfig `mapstructure:"streamers"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -288,6 +315,14 @@ func DefaultConfig() *Config {
SnapshotInterval: 0,
SnapshotKeepRecent: 2,
},
Store: StoreConfig{
Streamers: []string{},
},
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
},
},
}
}

Expand Down
35 changes: 30 additions & 5 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -32,28 +31,54 @@ func TestIndexEventsMarshalling(t *testing.T) {
err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestParseStoreStreaming(t *testing.T) {
expectedContents := `[store]
streamers = ["file", ]

[streamers]
[streamers.file]
keys = ["*", ]
write_dir = "/foo/bar"
prefix = ""`

cfg := DefaultConfig()
cfg.Store.Streamers = []string{FileStreamer}
cfg.Streamers.File.Keys = []string{"*"}
cfg.Streamers.File.WriteDir = "/foo/bar"

var buffer bytes.Buffer
require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template")
require.Contains(t, buffer.String(), expectedContents, "config file contents")
}

func TestIndexEventsWriteRead(t *testing.T) {
expected := []string{"key3", "key4"}

// Create config with two IndexEvents entries, and write it to a file.
confFile := filepath.Join(t.TempDir(), "app.toml")
conf := DefaultConfig()
conf.IndexEvents = expected

WriteConfigFile(confFile, conf)

// Read that file into viper.
// read the file into Viper
vpr := viper.New()
vpr.SetConfigFile(confFile)

err := vpr.ReadInConfig()
require.NoError(t, err, "reading config file into viper")

// Check that the raw viper value is correct.
actualRaw := vpr.GetStringSlice("index-events")
require.Equal(t, expected, actualRaw, "viper's index events")

// Check that it is parsed into the config correctly.
cfg, perr := ParseConfig(vpr)
require.NoError(t, perr, "parsing config")

actual := cfg.IndexEvents
require.Equal(t, expected, actual, "config value")
}
Expand All @@ -62,15 +87,15 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) {
expectedIn := `global-labels = [
["labelname1", "labelvalue1"],
["labelname2", "labelvalue2"],
]` + "\n"
]`
cfg := DefaultConfig()
cfg.Telemetry.GlobalLabels = [][]string{{"labelname1", "labelvalue1"}, {"labelname2", "labelvalue2"}}
var buffer bytes.Buffer

err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestGlobalLabelsWriteRead(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,19 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }}

# snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all).
snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }}

###############################################################################
### Store / State Streaming ###
###############################################################################

[store]
streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]

[streamers]
[streamers.file]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
`

var configTemplate *template.Template
Expand Down
6 changes: 3 additions & 3 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
dbm "github.com/tendermint/tm-db"

"cosmossdk.io/depinject"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
Expand Down Expand Up @@ -266,10 +267,9 @@ func NewSimApp(

app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...)

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, app.keys); err != nil {
fmt.Println(err.Error())
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
}

Expand Down
6 changes: 3 additions & 3 deletions simapp/app_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
dbm "github.com/tendermint/tm-db"

simappparams "cosmossdk.io/simapp/params"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -246,10 +247,9 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
fmt.Println(err.Error())
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
}

Expand Down
73 changes: 47 additions & 26 deletions store/streaming/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
# State Streaming Service

This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
This package contains the constructors for the `StreamingService`s used to write
state changes out from individual KVStores to a file or stream, as described in
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
output destinations can be added.
Currently, a `StreamingService` implementation that writes state changes out to
files is supported, in the future support for additional output destinations can
be added.

The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
The `StreamingService` is configured from within an App using the `AppOptions`
loaded from the `app.toml` file:

```toml
# ...

[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
# streaming is enabled if one or more streamers are defined
streamers = [
# name of the streaming service, used by constructor
"file"
]

[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```

`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
to return the `ServiceConstructor` for that particular implementation:
The `store.streamers` field contains a list of the names of the `StreamingService`
implementations to employ which are used by `ServiceTypeFromString` to return
the `ServiceConstructor` for that particular implementation:

```go
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
Expand All @@ -35,18 +44,27 @@ for _, listenerName := range listeners {
}
```

`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
The `streamers` field contains a mapping of the specific `StreamingService`
implementation name to the configuration parameters for that specific service.

The `streamers.x.keys` field contains the list of `StoreKey` names for the
KVStores to expose using this service and is required by every type of
`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in
this list. An empty list is equivalent to turning the service off.

Additional configuration parameters are optional and specific to the implementation.
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.
In the case of the file streaming service, the `streamers.file.write_dir` field
contains the path to the directory to write the files to, and `streamers.file.prefix`
contains an optional prefix to prepend to the output files to prevent potential
collisions with other App `StreamingService` output files.

The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using
`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService
implementation.

The `AppOptions` are passed in to provide access to any implementation specific
configuration options, e.g. in the case of the file streaming service the
`streamers.file.write_dir` and `streamers.file.prefix`.

```go
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
Expand All @@ -55,9 +73,12 @@ if err != nil {
}
```

The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
may be synchronous or asynchronous with the message processing of the state machine.
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's
`SetStreamingService` method.

The `Stream` method is called on the service to begin the streaming process.
Depending on the implementation this process may be synchronous or asynchronous
with the message processing of the state machine.

```go
bApp.SetStreamingService(streamingService)
Expand Down