Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ADR-038 Part 3: State change files auxiliary gRPC server #9647

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0a88c29
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
875f23e
adjust multistores to fit new MultiStore interface and enable wrappin…
i-norden Feb 8, 2021
5e02e3d
update server mock KVStore and MultiStore
i-norden Feb 9, 2021
c394e5d
fix bug identified in CI
i-norden Feb 9, 2021
c92556a
improve codecov, minor fixes/adjustments
i-norden Feb 10, 2021
18bf622
review fixes
i-norden Feb 24, 2021
d92f1d0
review updates; flip set to delete in KVStorePair, updated proto-docs…
i-norden Mar 5, 2021
72d6a88
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
602867e
review fixes
i-norden Feb 24, 2021
93dbea9
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
441af0e
review fixes
i-norden Feb 24, 2021
f7ecbcb
hook and streaming service interfaces
i-norden Feb 8, 2021
5f74b68
integrate Hooks and StreamingService into BaseApp
i-norden Feb 8, 2021
d3c44a4
begin file streaming service implementation
i-norden Feb 8, 2021
7cc4dbd
update Hook interface to return errors so that they can be logged at …
i-norden Feb 11, 2021
7d2e9d6
finish implementation of the file streaming service
i-norden Feb 11, 2021
8f72e00
streaming service unit tests; minor adjustments
i-norden Feb 18, 2021
613a4c5
streaming service constuctor, constructor unit test, update adr
i-norden Feb 22, 2021
c6ad1ed
example toml configuration
i-norden Feb 22, 2021
7198289
ci/linting fixes
i-norden Feb 22, 2021
2fffbd0
simapp integration
i-norden Mar 1, 2021
d398cbf
update changelog
i-norden Mar 3, 2021
e0a1f32
documentation for configuring and using a StreamingService
i-norden Mar 3, 2021
caf96a0
update to use new KVStorePair type
i-norden Mar 5, 2021
6fdb3c1
fix double cache wrap issue; prefer wrapping with listener vs tracer
i-norden Mar 30, 2021
921f289
review refactor
i-norden Apr 16, 2021
d5bbb0a
fix linting
i-norden Apr 16, 2021
901e62f
review fixes
i-norden Apr 20, 2021
6cf023f
adjustments after rebase
i-norden Jun 4, 2021
317f7f1
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
d6ae81f
review fixes
i-norden Feb 24, 2021
327972a
minor updates
i-norden Jul 6, 2021
95be3c1
file server protobuf definitions
i-norden Jul 6, 2021
6972af6
file server protobuf generated Go types
i-norden Jul 6, 2021
0d26da6
state file server
i-norden Jul 6, 2021
29f09ab
move pkg
i-norden Jul 6, 2021
eb8570a
finish server, server types, handler, config
i-norden Jul 6, 2021
ee2ac55
begin cmd and context
i-norden Jul 6, 2021
f62ec6f
cmd and main.go
i-norden Jul 7, 2021
415aad8
finish config; config template
i-norden Jul 7, 2021
94465f8
simple client
i-norden Jul 7, 2021
10915ae
work on grpc backend, handler, and server- enable filtering of served…
i-norden Jul 7, 2021
55d45de
update proto types
i-norden Jul 7, 2021
e23d7d6
update changelog
i-norden Jul 7, 2021
e6da8c7
linting fix
i-norden Jul 7, 2021
2549cf0
improve shutdown comms; improve readability; filter stream by file pr…
i-norden Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Ref: https://keepachangelog.com/en/1.0.0/

* [\#9533](https://github.com/cosmos/cosmos-sdk/pull/9533) Added a new gRPC method, `DenomOwners`, in `x/bank` to query for all account holders of a specific denomination.
* (bank) [\#9618](https://github.com/cosmos/cosmos-sdk/pull/9618) Update bank.Metadata: add URI and URIHash attributes.
* (store) [\#8664](https://github.com/cosmos/cosmos-sdk/pull/8664) Implementation of ADR-038 file StreamingService
* (store) [\#9647](https://github.com/cosmos/cosmos-sdk/pull/9647) Implementation of ADR-038 file auxiliary gRPC server

### API Breaking Changes

Expand Down
35 changes: 33 additions & 2 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, hook := range app.hooks {
if err := hook.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}

return res
}

Expand All @@ -215,6 +223,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the streaming service hooks with the EndBlock messages
for _, hook := range app.hooks {
if err := hook.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}

return res
}

Expand Down Expand Up @@ -275,16 +290,32 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx)
if err != nil {
resultStr = "failed"
return sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
// if we throw and error, be sure to still call the streaming service's hook
for _, hook := range app.hooks {
if err := hook.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}
return res
}

return abci.ResponseDeliverTx{
res := abci.ResponseDeliverTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}

// call the streaming service hooks with the DeliverTx messages
for _, hook := range app.hooks {
if err := hook.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}

return res
}

// Commit implements the ABCI interface. It will commit all state that exists in
Expand Down
4 changes: 4 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type BaseApp struct { // nolint: maligned
// indexEvents defines the set of events in the form {eventType}.{attributeKey},
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// hooked services
// these hooks will have the ABCI messages routed through them
hooks []Hook
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,14 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
app.grpcQueryRouter.SetInterfaceRegistry(registry)
app.msgServiceRouter.SetInterfaceRegistry(registry)
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the streaming service hooks within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
app.hooks = append(app.hooks, s)
}
30 changes: 30 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package baseapp

import (
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)

// Hook interface used to hook into the ABCI message processing of the BaseApp
type Hook interface {
// update the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// update the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// update the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup, quitChan <-chan struct{})
// returns the streaming service's listeners for the BaseApp to register
Listeners() map[types.StoreKey][]store.WriteListener
// interface for hooking into the ABCI messages from inside the BaseApp
Hook
}
126 changes: 63 additions & 63 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte
type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// set bool indicates if it was a set; true: set, false: delete
// delete bool indicates if it was a delete; true: delete, false: set
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
```
Expand All @@ -48,10 +48,10 @@ and determine the source of each KV pair.

```protobuf
message StoreKVPair {
optional string store_key = 1; // the store key for the KVStore this pair originates from
required bool set = 2; // true indicates a set operation, false indicates a delete operation
required bytes key = 3;
required bytes value = 4;
string store_key = 1; // the store key for the KVStore this pair originates from
bool set = 2; // true indicates a set operation, false indicates a delete operation
bytes key = 3;
bytes value = 4;
}
```

Expand Down Expand Up @@ -209,9 +209,9 @@ We will introduce a new `StreamingService` interface for exposing `WriteListener
```go
// Hook interface used to hook into the ABCI message processing of the BaseApp
type Hook interface {
ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) // update the streaming service with the latest BeginBlock messages
ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) // update the steaming service with the latest EndBlock messages
ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) // update the steaming service with the latest DeliverTx messages
ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error // update the streaming service with the latest BeginBlock messages
ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error// update the steaming service with the latest EndBlock messages
ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error // update the steaming service with the latest DeliverTx messages
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
Expand Down Expand Up @@ -384,8 +384,8 @@ using the provided `AppOptions` and TOML configuration fields.
We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s:

```go
// RegisterStreamingService is used to register a streaming service with the BaseApp
func (app *BaseApp) RegisterHooks(s StreamingService) {
// SetStreamingService is used to register a streaming service with the BaseApp
func (app *BaseApp) SetStreamingService(s StreamingService) {
// set the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
Expand Down Expand Up @@ -482,60 +482,60 @@ We will also provide a mapping of the TOML `store.streamers` "file" configuratio
streaming service. In the future, as other streaming services are added, their constructors will be added here as well.

```go
// StreamingServiceConstructor is used to construct a streaming service
type StreamingServiceConstructor func(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error)
// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error)

// StreamingServiceType enum for specifying the type of StreamingService
type StreamingServiceType int
// ServiceType enum for specifying the type of StreamingService
type ServiceType int

const (
Unknown StreamingServiceType = iota
File
// add more in the future
Unknown ServiceType = iota
File
// add more in the future
)

// NewStreamingServiceType returns the StreamingServiceType corresponding to the provided name
func NewStreamingServiceType(name string) StreamingServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}

// String returns the string name of a StreamingServiceType
func (sst StreamingServiceType) String() string {
switch sst {
case File:
return "file"
default:
return ""
}
}

// StreamingServiceConstructorLookupTable is a mapping of StreamingServiceTypes to StreamingServiceConstructors
var StreamingServiceConstructorLookupTable = map[StreamingServiceType]StreamingServiceConstructor{
File: FileStreamingConstructor,
}

// NewStreamingServiceConstructor returns the StreamingServiceConstructor corresponding to the provided name
func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, error) {
ssType := NewStreamingServiceType(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := StreamingServiceConstructorLookupTable[ssType]; ok {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// FileStreamingConstructor is the StreamingServiceConstructor function for creating a FileStreamingService
func FileStreamingConstructor(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.writeDir"))
return streaming.NewFileStreamingService(fileDir, filePrefix, keys), nil
// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name
func NewStreamingServiceType(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}

// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
default:
return ""
}
}

// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: FileStreamingConstructor,
}

// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := NewStreamingServiceType(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// FileStreamingConstructor is the streaming.ServiceConstructor function for creating a FileStreamingService
func FileStreamingConstructor(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.writeDir"))
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
}
```

Expand Down Expand Up @@ -564,8 +564,8 @@ func NewSimApp(
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, listenerName := range listeners {
// get the store keys allowed to be exposed for this streaming service/state listeners
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName))
exposeStoreKeys = make([]storeTypes.StoreKey, 0, len(exposeKeyStrs))
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName)))
exposeStoreKeys := make([]storeTypes.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
Expand All @@ -577,15 +577,15 @@ func NewSimApp(
tmos.Exit(err.Error()) // or continue?
}
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose
streamingService, err := constructor(appOpts, exposeStoreKeys)
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
tmos.Exit(err.Error())
}
// register the streaming service with the BaseApp
bApp.RegisterStreamingService(streamingService)
// waitgroup and quit channel for optional shutdown coordination of the streaming service
wg := new(sync.WaitGroup)
quitChan := new(chan struct{}))
quitChan := make(chan struct{}))
// kick off the background streaming service loop
streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead?
}
Expand Down
Loading