Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/stf): delayed marshalling of typed event #22684

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct {
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
chainID string
indexedEvents map[string]struct{}
cfg Config
chainID string
indexedABCIEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand Down Expand Up @@ -105,9 +105,16 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
return nil, err
}

events, err := intoABCIEvents(resp.Events, c.indexedEvents)
if err != nil {
return nil, err
events := make([]abci.Event, 0)
if !c.cfg.AppTomlConfig.DisableABCIEvents {
events, err = intoABCIEvents(
resp.Events,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

cometResp := &abciproto.CheckTxResponse{
Expand All @@ -116,6 +123,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
GasUsed: uint64ToInt64(resp.GasUsed),
Events: events,
}

if resp.Error != nil {
space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace)
cometResp.Code = code
Expand Down Expand Up @@ -557,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock(
return nil, err
}

return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
return finalizeBlockResponse(
resp,
cp,
appHash,
c.indexedABCIEvents,
c.cfg.AppTomlConfig,
)
}

func (c *consensus[T]) internalFinalizeBlock(
Expand Down
26 changes: 15 additions & 11 deletions server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Config struct {
func DefaultAppTomlConfig() *AppTomlConfig {
return &AppTomlConfig{
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
HaltHeight: 0,
HaltTime: 0,
Address: "tcp://127.0.0.1:26658",
Expand All @@ -28,22 +27,27 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
IndexABCIEvents: make([]string, 0),
DisableIndexABCIEvents: false,
DisableABCIEvents: false,
}
}

type AppTomlConfig struct {
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."`
DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."`
Comment on lines +49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont get why the first one is needed? in both cases when its true it doesnt emit events for comet but one still includes it in the finalize block response? why is that needed?

Copy link
Member Author

@julienrbrt julienrbrt Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, well I didn't know if it was needed so I added it if someone was relying on that use case (block streaming, and doing manual indexing, but still while not willing comet to do it)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okay, i see what you mean. lets make this one value. The second one. if events dont want to be indexed then the node operator should set tx_index: to null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thought about that too, but learned that toml doesn't support nil/null/none value, so that was our only option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@julienrbrt julienrbrt Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but there null is simply a string that is probably parsed by comet and do some logic (EDIT: looks like anything other than kv and psql will endup as null -> https://github.com/cometbft/cometbft/blob/e2c3fd400f52aecf50c78ec0a3bda73ebb5ff76b/state/indexer/block/indexer.go#L66-L67, so an empty string or foo would have worked too)

Our field is a string array, and we cannot have it a string array and a string.
If we have it as an any in our config, then everytime you parse it, you need to check if it is an array or a string.
IMHO the bool is a tad nicer.

}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
6 changes: 5 additions & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
return nil, errorsmod.Wrap(err, "failed to simulate tx")
}

bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
bz, err := intoABCISimulationResponse(
txResult,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
}
Expand Down
8 changes: 4 additions & 4 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func New[T transaction.Tx](
}
}

indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents))
for _, e := range srv.config.AppTomlConfig.IndexEvents {
indexEvents[e] = struct{}{}
indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents))
for _, e := range srv.config.AppTomlConfig.IndexABCIEvents {
indexedABCIEvents[e] = struct{}{}
Comment on lines +125 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Inconsistent event indexing variable names across the codebase

The codebase shows two different naming conventions being used:

  • indexEvents in baseapp/ directory
  • indexedABCIEvents in server/v2/cometbft/ directory

Both variables serve the same purpose of indexing ABCI events, but the inconsistent naming could lead to confusion. Consider:

  • Unifying the naming convention across the codebase
  • Using indexedABCIEvents consistently as it's more descriptive and explicit about its ABCI-specific purpose
🔗 Analysis chain

LGTM: Efficient map initialization with pre-allocated capacity

The map initialization with pre-allocated capacity is a good performance optimization. The renamed variable better reflects its ABCI-specific purpose.

Let's verify the consistency of this change across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the consistent usage of indexedABCIEvents across the codebase
# and ensure no old references to indexEvents remain

# Check for any remaining references to the old name
rg "indexEvents" --type go

# Check the usage pattern of the new name
rg "indexedABCIEvents" --type go -A 2 -B 2

Length of output: 2687

}

ss := store.GetStateStorage().(snapshots.StorageSnapshotter)
Expand Down Expand Up @@ -185,7 +185,7 @@ func New[T transaction.Tx](
checkTxHandler: srv.serverOptions.CheckTxHandler,
extendVote: srv.serverOptions.ExtendVoteHandler,
chainID: chainID,
indexedEvents: indexEvents,
indexedABCIEvents: indexedABCIEvents,
initialHeight: 0,
queryHandlersMap: queryHandlers,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
Expand Down
77 changes: 39 additions & 38 deletions server/v2/cometbft/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,23 @@ func finalizeBlockResponse(
cp *cmtproto.ConsensusParams,
appHash []byte,
indexSet map[string]struct{},
debug bool,
cfg *AppTomlConfig,
) (*abci.FinalizeBlockResponse, error) {
allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...)

events, err := intoABCIEvents(allEvents, indexSet)
if err != nil {
return nil, err
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
var err error
events, err = intoABCIEvents(
append(in.BeginBlockEvents, in.EndBlockEvents...),
indexSet,
cfg.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

txResults, err := intoABCITxResults(in.TxResults, indexSet, debug)
txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg)
if err != nil {
return nil, err
}
Expand All @@ -91,6 +98,7 @@ func finalizeBlockResponse(
AppHash: appHash,
ConsensusParamUpdates: cp,
}

return resp, nil
}

Expand All @@ -108,72 +116,65 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali
return valsetUpdates
}

func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) {
func intoABCITxResults(
results []server.TxResult,
indexSet map[string]struct{},
cfg *AppTomlConfig,
) ([]*abci.ExecTxResult, error) {
res := make([]*abci.ExecTxResult, len(results))
for i := range results {
events, err := intoABCIEvents(results[i].Events, indexSet)
if err != nil {
return nil, err
var err error
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents)
if err != nil {
return nil, err
}
}

res[i] = responseExecTxResultWithEvents(
results[i].Error,
results[i].GasWanted,
results[i].GasUsed,
events,
debug,
cfg.Trace,
)
}

return res, nil
}

func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) {
func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(events))
for i, e := range events {
attributes, err := e.Attributes()
attrs, err := e.Attributes()
if err != nil {
return nil, err
}

abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
Attributes: make([]abci.EventAttribute, len(attrs)),
}

for j, attr := range attributes {
for j, attr := range attrs {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
Index: !indexNone && (index || indexAll),
}
}
}
return abciEvents, nil
}

func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(txRes.Events))
for i, e := range txRes.Events {
attributes, err := e.Attributes()
if err != nil {
return nil, err
}
abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
}

for j, attr := range attributes {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
}
}
func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) {
abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone)
if err != nil {
return nil, err
}

msgResponses := make([]*gogoany.Any, len(txRes.Resp))
Expand Down
36 changes: 31 additions & 5 deletions server/v2/stf/core_event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,44 @@ type eventManager struct {
// Emit emits an typed event that is defined in the protobuf file.
// In the future these events will be added to consensus.
func (em *eventManager) Emit(tev transaction.Msg) error {
res, err := TypedEventToEvent(tev)
if err != nil {
return err
ev := event.Event{
Type: gogoproto.MessageName(tev),
Attributes: func() ([]event.Attribute, error) {
outerEvent, err := TypedEventToEvent(tev)
if err != nil {
return nil, err
}

return outerEvent.Attributes()
},
Data: func() (json.RawMessage, error) {
buf := new(bytes.Buffer)
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
if err := jm.Marshal(buf, tev); err != nil {
return nil, err
}

return buf.Bytes(), nil
},
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}

em.executionContext.events = append(em.executionContext.events, res)
em.executionContext.events = append(em.executionContext.events, ev)
return nil
}

// EmitKV emits a key value pair event.
func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error {
em.executionContext.events = append(em.executionContext.events, event.NewEvent(eventType, attrs...))
ev := event.Event{
Type: eventType,
Attributes: func() ([]event.Attribute, error) {
return attrs, nil
},
Data: func() (json.RawMessage, error) {
return json.Marshal(attrs)
},
}

em.executionContext.events = append(em.executionContext.events, ev)
return nil
}

Expand Down
Loading
Loading