Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove base app from store #14417

Merged
merged 6 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#13881](https://github.com/cosmos/cosmos-sdk/pull/13881) Optimize iteration on nested cached KV stores and other operations in general.
* (x/gov) [#14347](https://github.com/cosmos/cosmos-sdk/pull/14347) Support `v1.Proposal` message in `v1beta1.Proposal.Content`.
* (x/gov) [#14390](https://github.com/cosmos/cosmos-sdk/pull/14390) Add title, proposer and summary to proposal struct
* (baseapp) [#14417](https://github.com/cosmos/cosmos-sdk/pull/14417) `SetStreamingService` accepts appOptions, AppCodec and Storekeys needed to set streamers.
* Store pacakge no longer has a dependency on baseapp.

### State Machine Breaking

Expand Down
2 changes: 1 addition & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type BaseApp struct { //nolint: maligned

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
abciListeners []storetypes.ABCIListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
25 changes: 19 additions & 6 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/codec/types"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store"
pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types"
"github.com/cosmos/cosmos-sdk/store/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
)
Expand Down Expand Up @@ -232,14 +235,24 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
func (app *BaseApp) SetStreamingService(
appOpts servertypes.AppOptions,
appCodec storetypes.Codec,
keys map[string]*storetypes.KVStoreKey) error {
streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, app.logger, keys)
if err != nil {
return err
}
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
for _, streamer := range streamers {
for key, lis := range streamer.Listeners() {
app.cms.AddListeners(key, lis)
}
Comment on lines +248 to +250

Check failure

Code scanning / gosec

the value in the range statement should be _ unless copying a map: want: for key := range m

the value in the range statement should be _ unless copying a map: want: for key := range m
Comment on lines +248 to +250

Check warning

Code scanning / CodeQL

Iteration over map

Iteration over map may be a possible source of non-determinism
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, streamer)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
return nil
}

// SetTxDecoder sets the TxDecoder if it wasn't provided in the BaseApp constructor.
Expand Down
5 changes: 2 additions & 3 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/std"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -262,8 +261,8 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, logger, keys); err != nil {
// register the streaming service with the BaseApp
if err := bApp.SetStreamingService(appOpts, appCodec, keys); err != nil {
logger.Error("failed to load state streaming", "err", err)
os.Exit(1)
}
Expand Down
4 changes: 1 addition & 3 deletions simapp/app_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar"
"github.com/cosmos/cosmos-sdk/types/module"
Expand Down Expand Up @@ -245,8 +244,7 @@ func NewSimApp(

app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...)

// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, logger, app.kvStoreKeys()); err != nil {
if err := app.App.BaseApp.SetStreamingService(appOpts, app.appCodec, app.kvStoreKeys()); err != nil {
logger.Error("failed to load state streaming", "err", err)
os.Exit(1)
}
Expand Down
5 changes: 2 additions & 3 deletions store/cachekv/internal/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -195,9 +194,9 @@ func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg stri
}

func int642Bytes(i int64) []byte {
return sdk.Uint64ToBigEndian(uint64(i))
return types.Uint64ToBigEndian(uint64(i))
}

func bytes2Int64(buf []byte) int64 {
return int64(sdk.BigEndianToUint64(buf))
return int64(types.BigEndianToUint64(buf))
}
2 changes: 1 addition & 1 deletion store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/tendermint/tendermint/libs/math"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/internal/conv"
"github.com/cosmos/cosmos-sdk/store/cachekv/internal"
"github.com/cosmos/cosmos-sdk/store/internal/conv"
"github.com/cosmos/cosmos-sdk/store/internal/kv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
Expand Down
2 changes: 2 additions & 0 deletions store/internal/conv/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package conv provides internal functions for convertions and data manipulation
package conv
26 changes: 26 additions & 0 deletions store/internal/conv/string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package conv

import (
"reflect"

Check failure

Code scanning / gosec

Blocklisted import unsafe

Blocklisted import reflect

Check notice

Code scanning / CodeQL

Sensitive package import

Certain system packages contain functions which may be a possible source of non-determinism
"unsafe"

Check failure

Code scanning / gosec

Blocklisted import unsafe

Blocklisted import unsafe

Check notice

Code scanning / CodeQL

Sensitive package import

Certain system packages contain functions which may be a possible source of non-determinism
)

// UnsafeStrToBytes uses unsafe to convert string into byte array. Returned bytes
// must not be altered after this function is called as it will cause a segmentation fault.
func UnsafeStrToBytes(s string) []byte {
var buf []byte
sHdr := (*reflect.StringHeader)(unsafe.Pointer(&s))

Check warning

Code scanning / gosec

Use of unsafe calls should be audited

Use of unsafe calls should be audited
bufHdr := (*reflect.SliceHeader)(unsafe.Pointer(&buf))

Check warning

Code scanning / gosec

Use of unsafe calls should be audited

Use of unsafe calls should be audited
bufHdr.Data = sHdr.Data
bufHdr.Cap = sHdr.Len
bufHdr.Len = sHdr.Len
return buf
}

// UnsafeBytesToStr is meant to make a zero allocation conversion
// from []byte -> string to speed up operations, it is not meant
// to be used generally, but for a specific pattern to delete keys
// from a map.
func UnsafeBytesToStr(b []byte) string {
return *(*string)(unsafe.Pointer(&b))

Check warning

Code scanning / gosec

Use of unsafe calls should be audited

Use of unsafe calls should be audited
}
54 changes: 54 additions & 0 deletions store/internal/conv/string_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package conv

import (
"runtime"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

func TestStringSuite(t *testing.T) {
suite.Run(t, new(StringSuite))
}

type StringSuite struct{ suite.Suite }

func unsafeConvertStr() []byte {
return UnsafeStrToBytes("abc")
}

func (s *StringSuite) TestUnsafeStrToBytes() {
// we convert in other function to trigger GC. We want to check that
// the underlying array in []bytes is accessible after GC will finish swapping.
for i := 0; i < 5; i++ {
b := unsafeConvertStr()
runtime.GC()
<-time.NewTimer(2 * time.Millisecond).C
b2 := append(b, 'd') //nolint:gocritic // append is fine here
s.Equal("abc", string(b))
s.Equal("abcd", string(b2))
}
}

func unsafeConvertBytes() string {
return UnsafeBytesToStr([]byte("abc"))
}

func (s *StringSuite) TestUnsafeBytesToStr() {
// we convert in other function to trigger GC. We want to check that
// the underlying array in []bytes is accessible after GC will finish swapping.
for i := 0; i < 5; i++ {
str := unsafeConvertBytes()
runtime.GC()
<-time.NewTimer(2 * time.Millisecond).C
s.Equal("abc", str)
}
}

func BenchmarkUnsafeStrToBytes(b *testing.B) {
for i := 0; i < b.N; i++ {
UnsafeStrToBytes(strconv.Itoa(i))
}
}
2 changes: 1 addition & 1 deletion store/streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This package contains the constructors for the `StreamingService`s used to write
state changes out from individual KVStores to a file or stream, as described in
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/store/types/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to
Expand Down
13 changes: 4 additions & 9 deletions store/streaming/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"sync"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
Expand All @@ -18,7 +17,7 @@ import (
)

// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (baseapp.StreamingService, error)
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (types.StreamingService, error)

// ServiceType enum for specifying the type of StreamingService
type ServiceType int
Expand Down Expand Up @@ -90,7 +89,7 @@ func NewFileStreamingService(
keys []types.StoreKey,
marshaller types.Codec,
logger log.Logger,
) (baseapp.StreamingService, error) {
) (types.StreamingService, error) {
homePath := cast.ToString(opts.Get(flags.FlagHome))
filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix))
fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir))
Expand Down Expand Up @@ -118,18 +117,17 @@ func NewFileStreamingService(
// WaitGroup and quit channel used to synchronize with the streaming services
// and any error that occurs during the setup.
func LoadStreamingServices(
bApp *baseapp.BaseApp,
appOpts serverTypes.AppOptions,
appCodec types.Codec,
logger log.Logger,
keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
) ([]types.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)

// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))
activeStreamers := make([]types.StreamingService, 0, len(streamers))

for _, streamerName := range streamers {
var exposeStoreKeys []types.StoreKey
Expand Down Expand Up @@ -180,9 +178,6 @@ func LoadStreamingServices(
return nil, nil, err
}

// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)

// kick off the background streaming service loop
streamingService.Stream(wg)

Expand Down
7 changes: 2 additions & 5 deletions store/streaming/constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/baseapp"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
Expand Down Expand Up @@ -49,10 +47,9 @@ func TestStreamingServiceConstructor(t *testing.T) {
}

func TestLoadStreamingServices(t *testing.T) {
db := dbm.NewMemDB()

encCdc := types.NewTestCodec()
keys := types.NewKVStoreKeys("mockKey1", "mockKey2")
bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil)

testCases := map[string]struct {
appOpts serverTypes.AppOptions
Expand All @@ -76,7 +73,7 @@ func TestLoadStreamingServices(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc, log.NewNopLogger(), keys)
activeStreamers, _, err := streaming.LoadStreamingServices(tc.appOpts, encCdc, log.NewNopLogger(), keys)
require.NoError(t, err)
require.Equal(t, tc.activeStreamersLen, len(activeStreamers))
})
Expand Down
2 changes: 1 addition & 1 deletion store/streaming/file/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# File Streaming Service

This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes
This pkg contains an implementation of the [StreamingService](../../types/streaming.go) that writes
the data stream out to files on the local filesystem. This process is performed synchronously with the message processing
of the state machine.

Expand Down
3 changes: 1 addition & 2 deletions store/streaming/file/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/store/types"
)

var _ baseapp.StreamingService = &StreamingService{}
var _ types.StreamingService = &StreamingService{}

// StreamingService is a concrete implementation of StreamingService that writes
// state changes out to files.
Expand Down
6 changes: 2 additions & 4 deletions baseapp/streaming.go → store/types/streaming.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package baseapp
package types

import (
"context"
"io"
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp.
Expand All @@ -29,7 +27,7 @@ type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
Listeners() map[StoreKey][]WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
Expand Down