Skip to content

Commit

Permalink
feat(server/v2/stf): delayed marshalling of typed event (partial back…
Browse files Browse the repository at this point in the history
…port #22684) (#22751)

Co-authored-by: Julien Robert <julien@rbrt.fr>
  • Loading branch information
mergify[bot] and julienrbrt authored Dec 4, 2024
1 parent a595e3c commit 4c5632c
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 70 deletions.
28 changes: 21 additions & 7 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct {
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
chainID string
indexedEvents map[string]struct{}
cfg Config
chainID string
indexedABCIEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand Down Expand Up @@ -105,9 +105,16 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
return nil, err
}

events, err := intoABCIEvents(resp.Events, c.indexedEvents)
if err != nil {
return nil, err
events := make([]abci.Event, 0)
if !c.cfg.AppTomlConfig.DisableABCIEvents {
events, err = intoABCIEvents(
resp.Events,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

cometResp := &abciproto.CheckTxResponse{
Expand All @@ -116,6 +123,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
GasUsed: uint64ToInt64(resp.GasUsed),
Events: events,
}

if resp.Error != nil {
space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace)
cometResp.Code = code
Expand Down Expand Up @@ -557,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock(
return nil, err
}

return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
return finalizeBlockResponse(
resp,
cp,
appHash,
c.indexedABCIEvents,
c.cfg.AppTomlConfig,
)
}

func (c *consensus[T]) internalFinalizeBlock(
Expand Down
26 changes: 15 additions & 11 deletions server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Config struct {
func DefaultAppTomlConfig() *AppTomlConfig {
return &AppTomlConfig{
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
HaltHeight: 0,
HaltTime: 0,
Address: "tcp://127.0.0.1:26658",
Expand All @@ -28,22 +27,27 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
IndexABCIEvents: make([]string, 0),
DisableIndexABCIEvents: false,
DisableABCIEvents: false,
}
}

type AppTomlConfig struct {
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."`
DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b //main
cosmossdk.io/server/v2 v2.0.0-20241203212527-7d117425d880 // main
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 // main
cosmossdk.io/server/v2/stf v0.0.0-20241203212527-7d117425d880 // main
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa // main
cosmossdk.io/store/v2 v2.0.0-20241203212527-7d117425d880 // main
cosmossdk.io/x/consensus v0.0.0-00010101000000-000000000000
github.com/cometbft/cometbft v1.0.0-rc2.0.20241127125717-4ce33b646ac9
Expand Down
4 changes: 2 additions & 2 deletions server/v2/cometbft/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ cosmossdk.io/server/v2 v2.0.0-20241203212527-7d117425d880 h1:B/8gRehsQg7ewpg9wrn
cosmossdk.io/server/v2 v2.0.0-20241203212527-7d117425d880/go.mod h1:rHeg391S9rUcPlF/1WRpGUaw0AJ0CGUVGlxUVaNhSGk=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 h1:0mtB8fSvDjD835WwWF4rGk9qy5TjVjk2jsW14L37v0E=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880/go.mod h1:elhlrldWtm+9U4PxE0G3wjz83yQwVVGVAOncXJPY1Xc=
cosmossdk.io/server/v2/stf v0.0.0-20241203212527-7d117425d880 h1:DWpt/apUddpsKmoVOCRLMcVVkZ9r5ENjGuyYuj3LP1w=
cosmossdk.io/server/v2/stf v0.0.0-20241203212527-7d117425d880/go.mod h1:4e9SzLyeGptQ3tSR6nKCNwCu7Ye4uUS2WIJih29dG2c=
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa h1:2V9nqgL50nw45HcQw1UBRQ/y0QBzrgfLIStPSxFnMtY=
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa/go.mod h1:4e9SzLyeGptQ3tSR6nKCNwCu7Ye4uUS2WIJih29dG2c=
cosmossdk.io/store v1.0.0-rc.0.0.20241202115147-f350775d0ed2 h1:UCe04NMBR+1M5JRpZJvM+I0EZzD3zXrk9YOm2RZdKDg=
cosmossdk.io/store v1.0.0-rc.0.0.20241202115147-f350775d0ed2/go.mod h1:oZBBY4BrkYnghr6MFL0MP5mGqpkPedHcWkXwXddd6tU=
cosmossdk.io/store/v2 v2.0.0-20241203212527-7d117425d880 h1:oQiiB1e2yg6/ttUCKLBAdbLuyUp9UYW2K8QGnCs7dWg=
Expand Down
6 changes: 5 additions & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
return nil, errorsmod.Wrap(err, "failed to simulate tx")
}

bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
bz, err := intoABCISimulationResponse(
txResult,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
}
Expand Down
8 changes: 4 additions & 4 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func New[T transaction.Tx](
}
}

indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents))
for _, e := range srv.config.AppTomlConfig.IndexEvents {
indexEvents[e] = struct{}{}
indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents))
for _, e := range srv.config.AppTomlConfig.IndexABCIEvents {
indexedABCIEvents[e] = struct{}{}
}

sc := store.GetStateCommitment().(snapshots.CommitSnapshotter)
Expand Down Expand Up @@ -183,7 +183,7 @@ func New[T transaction.Tx](
checkTxHandler: srv.serverOptions.CheckTxHandler,
extendVote: srv.serverOptions.ExtendVoteHandler,
chainID: chainID,
indexedEvents: indexEvents,
indexedABCIEvents: indexedABCIEvents,
initialHeight: 0,
queryHandlersMap: queryHandlers,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
Expand Down
77 changes: 39 additions & 38 deletions server/v2/cometbft/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,23 @@ func finalizeBlockResponse(
cp *cmtproto.ConsensusParams,
appHash []byte,
indexSet map[string]struct{},
debug bool,
cfg *AppTomlConfig,
) (*abci.FinalizeBlockResponse, error) {
allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...)

events, err := intoABCIEvents(allEvents, indexSet)
if err != nil {
return nil, err
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
var err error
events, err = intoABCIEvents(
append(in.BeginBlockEvents, in.EndBlockEvents...),
indexSet,
cfg.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

txResults, err := intoABCITxResults(in.TxResults, indexSet, debug)
txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg)
if err != nil {
return nil, err
}
Expand All @@ -91,6 +98,7 @@ func finalizeBlockResponse(
AppHash: appHash,
ConsensusParamUpdates: cp,
}

return resp, nil
}

Expand All @@ -108,72 +116,65 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali
return valsetUpdates
}

func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) {
func intoABCITxResults(
results []server.TxResult,
indexSet map[string]struct{},
cfg *AppTomlConfig,
) ([]*abci.ExecTxResult, error) {
res := make([]*abci.ExecTxResult, len(results))
for i := range results {
events, err := intoABCIEvents(results[i].Events, indexSet)
if err != nil {
return nil, err
var err error
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents)
if err != nil {
return nil, err
}
}

res[i] = responseExecTxResultWithEvents(
results[i].Error,
results[i].GasWanted,
results[i].GasUsed,
events,
debug,
cfg.Trace,
)
}

return res, nil
}

func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) {
func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(events))
for i, e := range events {
attributes, err := e.Attributes()
attrs, err := e.Attributes()
if err != nil {
return nil, err
}

abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
Attributes: make([]abci.EventAttribute, len(attrs)),
}

for j, attr := range attributes {
for j, attr := range attrs {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
Index: !indexNone && (index || indexAll),
}
}
}
return abciEvents, nil
}

func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(txRes.Events))
for i, e := range txRes.Events {
attributes, err := e.Attributes()
if err != nil {
return nil, err
}
abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
}

for j, attr := range attributes {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
}
}
func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) {
abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone)
if err != nil {
return nil, err
}

msgResponses := make([]*gogoany.Any, len(txRes.Resp))
Expand Down
2 changes: 1 addition & 1 deletion simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ require (
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b // indirect
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 // indirect; main
cosmossdk.io/server/v2/stf v0.0.0-20241203212527-7d117425d880 // indirect; main
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa // indirect; main
cosmossdk.io/store v1.1.1-0.20240909133312-50288938d1b6 // indirect; main
cosmossdk.io/x/tx v1.0.0-alpha.2 // indirect; main
filippo.io/edwards25519 v1.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions simapp/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ cosmossdk.io/server/v2 v2.0.0-20241203212527-7d117425d880 h1:B/8gRehsQg7ewpg9wrn
cosmossdk.io/server/v2 v2.0.0-20241203212527-7d117425d880/go.mod h1:rHeg391S9rUcPlF/1WRpGUaw0AJ0CGUVGlxUVaNhSGk=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 h1:0mtB8fSvDjD835WwWF4rGk9qy5TjVjk2jsW14L37v0E=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880/go.mod h1:elhlrldWtm+9U4PxE0G3wjz83yQwVVGVAOncXJPY1Xc=
cosmossdk.io/server/v2/stf v0.0.0-20241203212527-7d117425d880 h1:DWpt/apUddpsKmoVOCRLMcVVkZ9r5ENjGuyYuj3LP1w=
cosmossdk.io/server/v2/stf v0.0.0-20241203212527-7d117425d880/go.mod h1:4e9SzLyeGptQ3tSR6nKCNwCu7Ye4uUS2WIJih29dG2c=
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa h1:2V9nqgL50nw45HcQw1UBRQ/y0QBzrgfLIStPSxFnMtY=
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa/go.mod h1:4e9SzLyeGptQ3tSR6nKCNwCu7Ye4uUS2WIJih29dG2c=
cosmossdk.io/store v1.0.0-rc.0.0.20241119134933-d697a3de0f95 h1:5hIgRL6VsicdJ7FVK6AG7cSy1C8tiVbCp6W3Y+QQ5ko=
cosmossdk.io/store v1.0.0-rc.0.0.20241119134933-d697a3de0f95/go.mod h1:ceNwMZIU8ZIDoeUdA9+sGxz3GVt0orEGoVpkBfa/UtU=
cosmossdk.io/store/v2 v2.0.0-20241203212527-7d117425d880 h1:oQiiB1e2yg6/ttUCKLBAdbLuyUp9UYW2K8QGnCs7dWg=
Expand Down
8 changes: 6 additions & 2 deletions tools/confix/data/v2-app.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
[comet]
# min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned.
min-retain-blocks = 0
# index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed.
index-events = []
# halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing.
halt-height = 0
# halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing.
Expand All @@ -15,6 +13,12 @@ transport = 'socket'
trace = false
# standalone starts the application without the CometBFT node. The node should be started separately.
standalone = false
# index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed.
index-abci-events = []
# disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse.
disable-index-abci-events = false
# disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing.
disable-abci-events = false

# mempool defines the configuration for the SDK built-in app-side mempool implementations.
[comet.mempool]
Expand Down
2 changes: 1 addition & 1 deletion tools/confix/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type v2KeyChangesMap map[string][]string
var v2KeyChanges = v2KeyChangesMap{
"minimum-gas-prices": []string{"server.minimum-gas-prices"},
"min-retain-blocks": []string{"comet.min-retain-blocks"},
"index-events": []string{"comet.index-events"},
"index-events": []string{"comet.index-abci-events"},
"halt-height": []string{"comet.halt-height"},
"halt-time": []string{"comet.halt-time"},
"app-db-backend": []string{"store.app-db-backend"},
Expand Down

0 comments on commit 4c5632c

Please sign in to comment.