Skip to content

Commit

Permalink
refactor: State Streaming Docs + Explicit Config Support (#13894)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2c0d445)

# Conflicts:
#	CHANGELOG.md
#	simapp/app.go
#	simapp/app_legacy.go
  • Loading branch information
alexanderbez authored and mergify[bot] committed Nov 17, 2022
1 parent e5fef13 commit 413958b
Show file tree
Hide file tree
Showing 7 changed files with 1,028 additions and 31 deletions.
170 changes: 170 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

// FileStreamer defines the store streaming type for file streaming.
FileStreamer = "file"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -196,6 +199,28 @@ type StateSyncConfig struct {
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
}

type (
// StoreConfig defines application configuration for state streaming and other
// storage related operations.
StoreConfig struct {
Streamers []string `mapstructure:"streamers"`
}

// StreamersConfig defines concrete state streaming configuration options. These
// fields are required to be set when state streaming is enabled via a non-empty
// list defined by 'StoreConfig.Streamers'.
StreamersConfig struct {
File FileStreamerConfig `mapstructure:"file"`
}

// FileStreamerConfig defines the file streaming configuration options.
FileStreamerConfig struct {
Keys []string `mapstructure:"keys"`
WriteDir string `mapstructure:"write_dir"`
Prefix string `mapstructure:"prefix"`
}
)

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -207,6 +232,8 @@ type Config struct {
Rosetta RosettaConfig `mapstructure:"rosetta"`
GRPCWeb GRPCWebConfig `mapstructure:"grpc-web"`
StateSync StateSyncConfig `mapstructure:"state-sync"`
Store StoreConfig `mapstructure:"store"`
Streamers StreamersConfig `mapstructure:"streamers"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -288,6 +315,14 @@ func DefaultConfig() *Config {
SnapshotInterval: 0,
SnapshotKeepRecent: 2,
},
Store: StoreConfig{
Streamers: []string{},
},
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
},
},
}
}

Expand Down
35 changes: 30 additions & 5 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -32,28 +31,54 @@ func TestIndexEventsMarshalling(t *testing.T) {
err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestParseStoreStreaming(t *testing.T) {
expectedContents := `[store]
streamers = ["file", ]
[streamers]
[streamers.file]
keys = ["*", ]
write_dir = "/foo/bar"
prefix = ""`

cfg := DefaultConfig()
cfg.Store.Streamers = []string{FileStreamer}
cfg.Streamers.File.Keys = []string{"*"}
cfg.Streamers.File.WriteDir = "/foo/bar"

var buffer bytes.Buffer
require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template")
require.Contains(t, buffer.String(), expectedContents, "config file contents")
}

func TestIndexEventsWriteRead(t *testing.T) {
expected := []string{"key3", "key4"}

// Create config with two IndexEvents entries, and write it to a file.
confFile := filepath.Join(t.TempDir(), "app.toml")
conf := DefaultConfig()
conf.IndexEvents = expected

WriteConfigFile(confFile, conf)

// Read that file into viper.
// read the file into Viper
vpr := viper.New()
vpr.SetConfigFile(confFile)

err := vpr.ReadInConfig()
require.NoError(t, err, "reading config file into viper")

// Check that the raw viper value is correct.
actualRaw := vpr.GetStringSlice("index-events")
require.Equal(t, expected, actualRaw, "viper's index events")

// Check that it is parsed into the config correctly.
cfg, perr := ParseConfig(vpr)
require.NoError(t, perr, "parsing config")

actual := cfg.IndexEvents
require.Equal(t, expected, actual, "config value")
}
Expand All @@ -62,15 +87,15 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) {
expectedIn := `global-labels = [
["labelname1", "labelvalue1"],
["labelname2", "labelvalue2"],
]` + "\n"
]`
cfg := DefaultConfig()
cfg.Telemetry.GlobalLabels = [][]string{{"labelname1", "labelvalue1"}, {"labelname2", "labelvalue2"}}
var buffer bytes.Buffer

err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestGlobalLabelsWriteRead(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,19 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }}
# snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all).
snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }}
###############################################################################
### Store / State Streaming ###
###############################################################################
[store]
streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
[streamers]
[streamers.file]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
`

var configTemplate *template.Template
Expand Down
12 changes: 12 additions & 0 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
tmos "github.com/tendermint/tendermint/libs/os"
dbm "github.com/tendermint/tm-db"

<<<<<<< HEAD
=======
"cosmossdk.io/depinject"

>>>>>>> 2c0d445ad (refactor: State Streaming Docs + Explicit Config Support (#13894))
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
Expand Down Expand Up @@ -228,6 +233,7 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

<<<<<<< HEAD
// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
Expand All @@ -243,6 +249,12 @@ func NewSimApp(
keys: keys,
tkeys: tkeys,
memKeys: memKeys,
=======
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, app.keys); err != nil {
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
>>>>>>> 2c0d445ad (refactor: State Streaming Docs + Explicit Config Support (#13894))
}

app.ParamsKeeper = initParamsKeeper(appCodec, legacyAmino, keys[paramstypes.StoreKey], tkeys[paramstypes.TStoreKey])
Expand Down
Loading

0 comments on commit 413958b

Please sign in to comment.