Skip to content

Commit

Permalink
updates/fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Nov 24, 2020
1 parent a7a4447 commit 94e3f10
Showing 1 changed file with 51 additions and 29 deletions.
80 changes: 51 additions & 29 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Proposed

## Abstract

This ADR defines a set of changes to enable state change listening of individual KVStores.
This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers.

## Context

Expand All @@ -19,8 +19,8 @@ In addition to these request/response queries, it would be beneficial to have a

## Decision

We will modify the MultiStore interface and its concrete (`basemulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores and routing the output to consumers.
We will also introduce two approaches for exposing the data to consumers: writing to files and writing to a gRPC stream.
We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores.
We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream.

### Listening interface
In a new file- `store/types/listening.go`- we will create a `Listening` interface for streaming out an allowed subset of state changes from a KVStore.
Expand Down Expand Up @@ -128,12 +128,12 @@ which direct the streaming of only certain allowed subsets of keys and/or operat
// underlying listeners with the proper key and operation permissions
type Store struct {
parent types.KVStore
listeners []types.Listener
listeners []types.Listening
}

// NewStore returns a reference to a new traceKVStore given a parent
// KVStore implementation and a buffered writer.
func NewStore(parent types.KVStore, listeners []types.Listener) *Store {
func NewStore(parent types.KVStore, listeners []types.Listening) *Store {
return &Store{parent: parent, listeners: listeners}
}

Expand All @@ -143,7 +143,6 @@ func NewStore(parent types.KVStore, listeners []types.Listener) *Store {
// delegates a Get call to the parent KVStore.
func (tkv *Store) Get(key []byte) []byte {
value := tkv.parent.Get(key)

writeOperation(tkv.listeners, types.ReadOp, key, value)
return value
}
Expand Down Expand Up @@ -174,7 +173,11 @@ func (tkv *Store) Has(key []byte) bool {
// writeOperation writes a KVStore operation to the underlying io.Writer of
// every listener that has permissions to listen to that operation at the given key
// The TraceOperation is JSON-encoded with the `key` and `value` fields as base64 encoded strings
func writeOperation(listeners []types.Listener, op types.Operation, key, value []byte) {
func writeOperation(listeners []types.Listening, op types.Operation, key, value []byte) {
// short circuit if there are no listeners so we don't waste time base64 encoding `key` and `value`
if len(listeners) == 0 {
return
}
traceOp := types.TraceOperation{
Operation: op,
Key: base64.StdEncoding.EncodeToString(key),
Expand All @@ -184,15 +187,15 @@ func writeOperation(listeners []types.Listener, op types.Operation, key, value [
if !l.Allowed(op, key) {
continue
}
traceOp.Metadata = l.Context
traceOp.Metadata = l.GetContext()
raw, err := json.Marshal(traceOp)
if err != nil {
panic(errors.Wrap(err, "failed to serialize listen operation"))
}
if _, err := l.Writer.Write(raw); err != nil {
if _, err := l.Write(raw); err != nil {
panic(errors.Wrap(err, "failed to write listen operation"))
}
io.WriteString(l.Writer, "\n")
io.WriteString(l, "\n")
}
}
```
Expand All @@ -210,7 +213,7 @@ type MultiStore interface {
ListeningEnabled(key StoreKey) bool

// SetListeners sets the listener set for the KVStore belonging to the provided StoreKey
SetListeners(key StoreKey, listeners []Listener)
SetListeners(key StoreKey, listeners []Listening)

// CacheListening enables or disables KVStore listening at the cache layer
CacheListening(listen bool)
Expand All @@ -219,23 +222,23 @@ type MultiStore interface {

```go
type CacheWrap interface {
...
...

// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(listeners []Listener) CacheWrap
CacheWrapWithListeners(listeners []Listening) CacheWrap
}

type CacheWrapper interface {
...

// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(listeners []Listener) CacheWrap
CacheWrapWithListeners(listeners []Listening) CacheWrap
}
```

### MultiStore implementation updates
We will modify all of the Stores and MultiStores to satisfy these new interfaces, and adjust the `rootmulti` MultiStore's `GetKVStore` method
to enable wrapping the returned `KVStore` with the `listenkv.Store`.
We will modify all of the `Store` and `MultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method
to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on.

```go
func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
Expand All @@ -252,15 +255,16 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
}
```

We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable cache listening when `CacheListening` is turned on.
We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable listening
in the cache layer when `CacheListening` is turned on.

```go
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
stores[k] = v
}
var cacheListeners map[types.StoreKey][]types.Listener
var cacheListeners map[types.StoreKey][]types.Listening
if rs.cacheListening {
cacheListeners = rs.listeners
}
Expand All @@ -273,26 +277,44 @@ We will introduce and document mechanisms for exposing data from the above liste

#### Writing to file
We will document and provide examples of how to configure a listener to write out to a file.
No new type implementation is needed, a `os.File` can be used as the underlying `io.Writer` for a listener.
No new type implementation will be needed, a `os.File` can be used as the underlying `io.Writer` for a listener.

Writing to a file is the simplest approach for streaming the data out to consumers.
This approach also provide the advantages of being persistent and durable.
The files can be read directly or an auxiliary streaming services can tail the files and serve the data remotely.
This approach also provide the advantages of being persistent and durable, and the files can be read directly
or an auxiliary streaming services can tail the files and serve the data remotely.

Without pruning the file size can grow indefinitely, this will need to be managed by
the developer in an application or even module-specific manner.

#### Writing to gRPC stream
We will implement a `io.Writer` type for exposing our listeners over a gRPC server stream.
Writing to a gRPC stream gRPC allows us to expose the data over the standard gRPC interface.
This interface can be exposed directly to consumers or we can implement a message queue or streaming service logic on top.
Using gRPC provides us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services.
We will implement and document an `io.Writer` type for exposing our listeners over a gRPC server stream.

Writing to a gRPC stream gRPC will allow us to expose the data over the standard gRPC interface.
This interface can be exposed directly to consumers or we can implement a message queue or secondary streaming service on top.
Using gRPC will provide us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services.

Proceeding through a gRPC intermediate will provide additional overhead, in most cases this is not expected to be rate limiting but in
instances where it is the developer can implement a more performant streaming mechanism for state listening.

### Configuration
We will provide detailed documentation for how to configure the state listeners and their external streaming services from within an app's `AppCreator`,
using the provided `AppOptions`.
We will provide detailed documentation on how to configure the state listeners and their external streaming services from within an app's `AppCreator`,
using the provided `AppOptions`. We will add two methods to the `BaseApp` to enable this configuration:

```go
// SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey
func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.Listening) {
app.cms.SetListeners(key, listeners)
}

// SetCacheListening turns on or off listening at the cache layer
func (app *BaseApp) SetCacheListening(listening bool) {
app.cms.CacheListening(listening)
}
```

As a demonstration, we will implement the state watching features as part of SimApp.
For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function:

e.g. SimApp with simple state streaming to files:

```go
func NewSimApp(
Expand Down Expand Up @@ -341,7 +363,7 @@ func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) {
if err != nil {
tmos.Exit(err.Error())
}
// using single listener with all operations and keys permitted
// using single listener with all operations and keys permitted and no TraceContext
listener := storeTypes.NewDefaultStateListener(fileHandler, nil)
bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener})
}
Expand Down

0 comments on commit 94e3f10

Please sign in to comment.