From 2c942e7f3b53a81b7ceb43256da6ddc8416683d7 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 28 Nov 2024 15:34:46 +0100 Subject: [PATCH 1/5] feat(server/v2/stf): delayed marshalling of typed event --- server/v2/stf/core_event_service.go | 49 +++++++---------------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/server/v2/stf/core_event_service.go b/server/v2/stf/core_event_service.go index 9f9b48080cae..a8b2c265d1e8 100644 --- a/server/v2/stf/core_event_service.go +++ b/server/v2/stf/core_event_service.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "encoding/json" - "maps" - "slices" "github.com/cosmos/gogoproto/jsonpb" gogoproto "github.com/cosmos/gogoproto/proto" @@ -39,12 +37,20 @@ type eventManager struct { // Emit emits an typed event that is defined in the protobuf file. // In the future these events will be added to consensus. func (em *eventManager) Emit(tev transaction.Msg) error { - res, err := TypedEventToEvent(tev) - if err != nil { - return err + event := event.Event{ + Type: gogoproto.MessageName(tev), + Data: func() (json.RawMessage, error) { + buf := new(bytes.Buffer) + jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil} + if err := jm.Marshal(buf, tev); err != nil { + return nil, err + } + + return buf.Bytes(), nil + }, } - em.executionContext.events = append(em.executionContext.events, res) + em.executionContext.events = append(em.executionContext.events, event) return nil } @@ -53,34 +59,3 @@ func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error em.executionContext.events = append(em.executionContext.events, event.NewEvent(eventType, attrs...)) return nil } - -// TypedEventToEvent takes typed event and converts to Event object -func TypedEventToEvent(tev transaction.Msg) (event.Event, error) { - evtType := gogoproto.MessageName(tev) - buf := new(bytes.Buffer) - jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil} - if err := jm.Marshal(buf, tev); err != nil { - return event.Event{}, err - } - - var attrMap map[string]json.RawMessage - if err := json.Unmarshal(buf.Bytes(), &attrMap); err != nil { - return event.Event{}, err - } - - // sort the keys to ensure the order is always the same - keys := slices.Sorted(maps.Keys(attrMap)) - attrs := make([]event.Attribute, 0, len(attrMap)) - for _, k := range keys { - v := attrMap[k] - attrs = append(attrs, event.Attribute{ - Key: k, - Value: string(v), - }) - } - - return event.Event{ - Type: evtType, - Attributes: func() ([]event.Attribute, error) { return attrs, nil }, - }, nil -} From 816183f57d2551edb1626077038a0ef446461791 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 29 Nov 2024 12:41:48 +0100 Subject: [PATCH 2/5] updates --- server/v2/cometbft/abci.go | 18 ++++--- server/v2/cometbft/config.go | 24 +++++---- server/v2/cometbft/query.go | 2 +- server/v2/cometbft/server.go | 10 ++-- server/v2/cometbft/utils.go | 94 ++++++++++++++++++++++------------- tools/confix/data/v2-app.toml | 46 +++++++++++++++-- tools/confix/migrations.go | 2 +- 7 files changed, 134 insertions(+), 62 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index e6a07fd86f07..771c1ef1fa6a 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct { streamingManager streaming.Manager mempool mempool.Mempool[T] - cfg Config - chainID string - indexedEvents map[string]struct{} + cfg Config + chainID string + indexedABCIEvents map[string]struct{} initialHeight uint64 // this is only available after this node has committed a block (in FinalizeBlock), @@ -105,9 +105,12 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques return nil, err } - events, err := intoABCIEvents(resp.Events, c.indexedEvents) - if err != nil { - return nil, err + events := make([]abci.Event, 0) + if !c.cfg.AppTomlConfig.DisableABCIEvents { + events, err = intoABCIEvents(resp.Events, c.indexedABCIEvents) + if err != nil { + return nil, err + } } cometResp := &abciproto.CheckTxResponse{ @@ -116,6 +119,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques GasUsed: uint64ToInt64(resp.GasUsed), Events: events, } + if resp.Error != nil { space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace) cometResp.Code = code @@ -557,7 +561,7 @@ func (c *consensus[T]) FinalizeBlock( return nil, err } - return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace) + return finalizeBlockResponse(resp, cp, appHash, c.indexedABCIEvents, c.cfg.AppTomlConfig.DisableABCIEvents, c.cfg.AppTomlConfig.Trace) } func (c *consensus[T]) internalFinalizeBlock( diff --git a/server/v2/cometbft/config.go b/server/v2/cometbft/config.go index d8e591a9695c..6fc4629270f9 100644 --- a/server/v2/cometbft/config.go +++ b/server/v2/cometbft/config.go @@ -16,7 +16,6 @@ type Config struct { func DefaultAppTomlConfig() *AppTomlConfig { return &AppTomlConfig{ MinRetainBlocks: 0, - IndexEvents: make([]string, 0), HaltHeight: 0, HaltTime: 0, Address: "tcp://127.0.0.1:26658", @@ -28,22 +27,25 @@ func DefaultAppTomlConfig() *AppTomlConfig { Target: make(map[string]indexer.Config), ChannelBufferSize: 1024, }, + IndexABCIEvents: make([]string, 0), + DisableABCIEvents: false, } } type AppTomlConfig struct { - MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."` - IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."` - HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` - HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` - Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."` - Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"` - Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."` - Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."` + MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."` + HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` + HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` + Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."` + Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"` + Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."` + Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."` // Sub configs - Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` - Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` + Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` + Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` + IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."` + DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables the ABCI event indexing. It is useful when relying on the indexer for event indexing."` } // CfgOption is a function that allows to overwrite the default server configuration. diff --git a/server/v2/cometbft/query.go b/server/v2/cometbft/query.go index f7fbe811d1b1..0049489868ce 100644 --- a/server/v2/cometbft/query.go +++ b/server/v2/cometbft/query.go @@ -61,7 +61,7 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a return nil, errorsmod.Wrap(err, "failed to simulate tx") } - bz, err := intoABCISimulationResponse(txResult, c.indexedEvents) + bz, err := intoABCISimulationResponse(txResult, c.indexedABCIEvents) if err != nil { return nil, errorsmod.Wrap(err, "failed to marshal txResult") } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 6567e01caef0..75e9f1a1857d 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -2,7 +2,6 @@ package cometbft import ( "context" - "cosmossdk.io/server/v2/cometbft/oe" "crypto/sha256" "encoding/json" "fmt" @@ -35,6 +34,7 @@ import ( "cosmossdk.io/server/v2/appmanager" cometlog "cosmossdk.io/server/v2/cometbft/log" "cosmossdk.io/server/v2/cometbft/mempool" + "cosmossdk.io/server/v2/cometbft/oe" "cosmossdk.io/server/v2/cometbft/types" "cosmossdk.io/store/v2/snapshots" @@ -122,9 +122,9 @@ func New[T transaction.Tx]( } } - indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents)) - for _, e := range srv.config.AppTomlConfig.IndexEvents { - indexEvents[e] = struct{}{} + indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents)) + for _, e := range srv.config.AppTomlConfig.IndexABCIEvents { + indexedABCIEvents[e] = struct{}{} } ss := store.GetStateStorage().(snapshots.StorageSnapshotter) @@ -185,7 +185,7 @@ func New[T transaction.Tx]( checkTxHandler: srv.serverOptions.CheckTxHandler, extendVote: srv.serverOptions.ExtendVoteHandler, chainID: chainID, - indexedEvents: indexEvents, + indexedABCIEvents: indexedABCIEvents, initialHeight: 0, queryHandlersMap: queryHandlers, getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), diff --git a/server/v2/cometbft/utils.go b/server/v2/cometbft/utils.go index 09b32c3d00b1..3b2b506e6c35 100644 --- a/server/v2/cometbft/utils.go +++ b/server/v2/cometbft/utils.go @@ -2,9 +2,12 @@ package cometbft import ( "context" + "encoding/json" "errors" "fmt" + "maps" "math" + "slices" "strings" "time" @@ -70,16 +73,20 @@ func finalizeBlockResponse( cp *cmtproto.ConsensusParams, appHash []byte, indexSet map[string]struct{}, + disableABCIEvents, debug bool, ) (*abci.FinalizeBlockResponse, error) { - allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...) + events := make([]abci.Event, 0) - events, err := intoABCIEvents(allEvents, indexSet) - if err != nil { - return nil, err + if !disableABCIEvents { + var err error + events, err = intoABCIEvents(append(in.BeginBlockEvents, in.EndBlockEvents...), indexSet) + if err != nil { + return nil, err + } } - txResults, err := intoABCITxResults(in.TxResults, indexSet, debug) + txResults, err := intoABCITxResults(in.TxResults, indexSet, disableABCIEvents, debug) if err != nil { return nil, err } @@ -91,6 +98,7 @@ func finalizeBlockResponse( AppHash: appHash, ConsensusParamUpdates: cp, } + return resp, nil } @@ -108,12 +116,21 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali return valsetUpdates } -func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) { +func intoABCITxResults( + results []server.TxResult, + indexSet map[string]struct{}, + disableABCIEvents, debug bool, +) ([]*abci.ExecTxResult, error) { res := make([]*abci.ExecTxResult, len(results)) for i := range results { - events, err := intoABCIEvents(results[i].Events, indexSet) - if err != nil { - return nil, err + var err error + events := make([]abci.Event, 0) + + if !disableABCIEvents { + events, err = intoABCIEvents(results[i].Events, indexSet) + if err != nil { + return nil, err + } } res[i] = responseExecTxResultWithEvents( @@ -132,16 +149,42 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci. indexAll := len(indexSet) == 0 abciEvents := make([]abci.Event, len(events)) for i, e := range events { - attributes, err := e.Attributes() - if err != nil { - return nil, err + attrs := make([]event.Attribute, 0) + + if e.Data != nil { + resp, err := e.Data() + if err != nil { + return nil, fmt.Errorf("failed to marshal event data: %w", err) + } + + var attrMap map[string]json.RawMessage + if err := json.Unmarshal(resp, &attrMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal event data: %w", err) + } + + // sort the keys to ensure the order is always the same + keys := slices.Sorted(maps.Keys(attrMap)) + for _, k := range keys { + v := attrMap[k] + attrs = append(attrs, event.Attribute{ + Key: k, + Value: string(v), + }) + } + } else { + var err error + attrs, err = e.Attributes() + if err != nil { + return nil, err + } } + abciEvents[i] = abci.Event{ Type: e.Type, - Attributes: make([]abci.EventAttribute, len(attributes)), + Attributes: make([]abci.EventAttribute, len(attrs)), } - for j, attr := range attributes { + for j, attr := range attrs { _, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)] abciEvents[i].Attributes[j] = abci.EventAttribute{ Key: attr.Key, @@ -154,26 +197,9 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci. } func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) { - indexAll := len(indexSet) == 0 - abciEvents := make([]abci.Event, len(txRes.Events)) - for i, e := range txRes.Events { - attributes, err := e.Attributes() - if err != nil { - return nil, err - } - abciEvents[i] = abci.Event{ - Type: e.Type, - Attributes: make([]abci.EventAttribute, len(attributes)), - } - - for j, attr := range attributes { - _, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)] - abciEvents[i].Attributes[j] = abci.EventAttribute{ - Key: attr.Key, - Value: attr.Value, - Index: index || indexAll, - } - } + abciEvents, err := intoABCIEvents(txRes.Events, indexSet) + if err != nil { + return nil, err } msgResponses := make([]*gogoany.Any, len(txRes.Resp)) diff --git a/tools/confix/data/v2-app.toml b/tools/confix/data/v2-app.toml index e478eed3609a..5518f2b2a323 100644 --- a/tools/confix/data/v2-app.toml +++ b/tools/confix/data/v2-app.toml @@ -1,28 +1,41 @@ [comet] + # min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned. min-retain-blocks = 0 -# index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed. -index-events = [] + # halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing. halt-height = 0 + # halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing. halt-time = 0 + # address defines the CometBFT RPC server address to bind to. address = 'tcp://127.0.0.1:26658' + # transport defines the CometBFT RPC server transport protocol: socket, grpc transport = 'socket' + # trace enables the CometBFT RPC server to output trace information about its internal operations. trace = false + # standalone starts the application without the CometBFT node. The node should be started separately. standalone = false +# index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed. +index-abci-events = [] + +# disable-abci-events disables the ABCI event indexing. It is useful when relying on the indexer for event indexing. +disable-abci-events = false + # mempool defines the configuration for the SDK built-in app-side mempool implementations. [comet.mempool] + # max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool. max-txs = -1 # indexer defines the configuration for the SDK built-in indexer implementation. [comet.indexer] + # Buffer size of the channels used for buffering data sent to indexer go routines. channel_buffer_size = 1024 @@ -30,79 +43,106 @@ channel_buffer_size = 1024 [comet.indexer.target] [grpc] + # Enable defines if the gRPC server should be enabled. enable = true + # Address defines the gRPC server address to bind to. address = 'localhost:9090' + # MaxRecvMsgSize defines the max message size in bytes the server can receive. # The default value is 10MB. max-recv-msg-size = 10485760 + # MaxSendMsgSize defines the max message size in bytes the server can send. # The default value is math.MaxInt32. max-send-msg-size = 2147483647 [rest] + # Enable defines if the REST server should be enabled. enable = true + # Address defines the REST server address to bind to. address = 'localhost:8080' [server] + # minimum-gas-prices defines the price which a validator is willing to accept for processing a transaction. A transaction's fees must meet the minimum of any denomination specified in this config (e.g. 0.25token1;0.0001token2). minimum-gas-prices = '0stake' [store] + # The type of database for application and snapshots databases. app-db-backend = 'goleveldb' [store.options] + # State storage database type. Currently we support: "sqlite", "pebble" and "rocksdb" ss-type = 'sqlite' + # State commitment database type. Currently we support: "iavl" and "iavl-v2" sc-type = 'iavl' # Pruning options for state storage [store.options.ss-pruning-option] + # Number of recent heights to keep on disk. keep-recent = 2 + # Height interval at which pruned heights are removed from disk. interval = 100 # Pruning options for state commitment [store.options.sc-pruning-option] + # Number of recent heights to keep on disk. keep-recent = 2 + # Height interval at which pruned heights are removed from disk. interval = 100 [store.options.iavl-config] + # CacheSize set the size of the iavl tree cache. cache-size = 100000 + # If true, the tree will work like no fast storage and always not upgrade fast storage. skip-fast-storage-upgrade = true [telemetry] + # Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus. enable = true + # Address defines the metrics server address to bind to. address = 'localhost:1327' + # Prefixed with keys to separate services. service-name = '' + # Enable prefixing gauge values with hostname. enable-hostname = false + # Enable adding hostname to labels. enable-hostname-label = false + # Enable adding service to labels. enable-service-label = false + # PrometheusRetentionTime, when positive, enables a Prometheus metrics sink. It defines the retention duration in seconds. prometheus-retention-time = 0 + # GlobalLabels defines a global set of name/value label tuples applied to all metrics emitted using the wrapper functions defined in telemetry package. # Example: # [["chain_id", "cosmoshub-1"]] global-labels = [] + # MetricsSink defines the type of metrics backend to use. Default is in memory metrics-sink = '' + # StatsdAddr defines the address of a statsd server to send metrics to. Only utilized if MetricsSink is set to "statsd" or "dogstatsd". stats-addr = '' + # DatadogHostname defines the hostname to use when emitting metrics to Datadog. Only utilized if MetricsSink is set to "dogstatsd". -data-dog-hostname = '' +data-dog-hostname = '' \ No newline at end of file diff --git a/tools/confix/migrations.go b/tools/confix/migrations.go index 77fc4a671cb4..1deb1acbaa87 100644 --- a/tools/confix/migrations.go +++ b/tools/confix/migrations.go @@ -41,7 +41,7 @@ type v2KeyChangesMap map[string][]string var v2KeyChanges = v2KeyChangesMap{ "minimum-gas-prices": []string{"server.minimum-gas-prices"}, "min-retain-blocks": []string{"comet.min-retain-blocks"}, - "index-events": []string{"comet.index-events"}, + "index-events": []string{"comet.index-abci-events"}, "halt-height": []string{"comet.halt-height"}, "halt-time": []string{"comet.halt-time"}, "app-db-backend": []string{"store.app-db-backend"}, From 9331c6713a7aa2c2eb8bb5e0e51fda35e9a8232b Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 29 Nov 2024 13:03:55 +0100 Subject: [PATCH 3/5] add extra choice --- server/v2/cometbft/abci.go | 14 ++++++++++++-- server/v2/cometbft/config.go | 14 ++++++++------ server/v2/cometbft/query.go | 6 +++++- server/v2/cometbft/utils.go | 29 ++++++++++++++++------------- tools/confix/data/v2-app.toml | 7 +++++-- 5 files changed, 46 insertions(+), 24 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 771c1ef1fa6a..c8fc58649e41 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -107,7 +107,11 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques events := make([]abci.Event, 0) if !c.cfg.AppTomlConfig.DisableABCIEvents { - events, err = intoABCIEvents(resp.Events, c.indexedABCIEvents) + events, err = intoABCIEvents( + resp.Events, + c.indexedABCIEvents, + c.cfg.AppTomlConfig.DisableIndexABCIEvents, + ) if err != nil { return nil, err } @@ -561,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock( return nil, err } - return finalizeBlockResponse(resp, cp, appHash, c.indexedABCIEvents, c.cfg.AppTomlConfig.DisableABCIEvents, c.cfg.AppTomlConfig.Trace) + return finalizeBlockResponse( + resp, + cp, + appHash, + c.indexedABCIEvents, + c.cfg.AppTomlConfig, + ) } func (c *consensus[T]) internalFinalizeBlock( diff --git a/server/v2/cometbft/config.go b/server/v2/cometbft/config.go index 6fc4629270f9..a97ec834c541 100644 --- a/server/v2/cometbft/config.go +++ b/server/v2/cometbft/config.go @@ -27,8 +27,9 @@ func DefaultAppTomlConfig() *AppTomlConfig { Target: make(map[string]indexer.Config), ChannelBufferSize: 1024, }, - IndexABCIEvents: make([]string, 0), - DisableABCIEvents: false, + IndexABCIEvents: make([]string, 0), + DisableIndexABCIEvents: false, + DisableABCIEvents: false, } } @@ -42,10 +43,11 @@ type AppTomlConfig struct { Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."` // Sub configs - Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` - Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` - IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."` - DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables the ABCI event indexing. It is useful when relying on the indexer for event indexing."` + Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` + Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` + IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."` + DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."` + DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."` } // CfgOption is a function that allows to overwrite the default server configuration. diff --git a/server/v2/cometbft/query.go b/server/v2/cometbft/query.go index 0049489868ce..bf6bcfe02eb6 100644 --- a/server/v2/cometbft/query.go +++ b/server/v2/cometbft/query.go @@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a return nil, errorsmod.Wrap(err, "failed to simulate tx") } - bz, err := intoABCISimulationResponse(txResult, c.indexedABCIEvents) + bz, err := intoABCISimulationResponse( + txResult, + c.indexedABCIEvents, + c.cfg.AppTomlConfig.DisableIndexABCIEvents, + ) if err != nil { return nil, errorsmod.Wrap(err, "failed to marshal txResult") } diff --git a/server/v2/cometbft/utils.go b/server/v2/cometbft/utils.go index 3b2b506e6c35..5ea9635440ad 100644 --- a/server/v2/cometbft/utils.go +++ b/server/v2/cometbft/utils.go @@ -73,20 +73,23 @@ func finalizeBlockResponse( cp *cmtproto.ConsensusParams, appHash []byte, indexSet map[string]struct{}, - disableABCIEvents, - debug bool, + cfg *AppTomlConfig, ) (*abci.FinalizeBlockResponse, error) { events := make([]abci.Event, 0) - if !disableABCIEvents { + if !cfg.DisableABCIEvents { var err error - events, err = intoABCIEvents(append(in.BeginBlockEvents, in.EndBlockEvents...), indexSet) + events, err = intoABCIEvents( + append(in.BeginBlockEvents, in.EndBlockEvents...), + indexSet, + cfg.DisableIndexABCIEvents, + ) if err != nil { return nil, err } } - txResults, err := intoABCITxResults(in.TxResults, indexSet, disableABCIEvents, debug) + txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg) if err != nil { return nil, err } @@ -119,15 +122,15 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali func intoABCITxResults( results []server.TxResult, indexSet map[string]struct{}, - disableABCIEvents, debug bool, + cfg *AppTomlConfig, ) ([]*abci.ExecTxResult, error) { res := make([]*abci.ExecTxResult, len(results)) for i := range results { var err error events := make([]abci.Event, 0) - if !disableABCIEvents { - events, err = intoABCIEvents(results[i].Events, indexSet) + if !cfg.DisableABCIEvents { + events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents) if err != nil { return nil, err } @@ -138,14 +141,14 @@ func intoABCITxResults( results[i].GasWanted, results[i].GasUsed, events, - debug, + cfg.Trace, ) } return res, nil } -func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) { +func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) { indexAll := len(indexSet) == 0 abciEvents := make([]abci.Event, len(events)) for i, e := range events { @@ -189,15 +192,15 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci. abciEvents[i].Attributes[j] = abci.EventAttribute{ Key: attr.Key, Value: attr.Value, - Index: index || indexAll, + Index: !indexNone && (index || indexAll), } } } return abciEvents, nil } -func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) { - abciEvents, err := intoABCIEvents(txRes.Events, indexSet) +func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) { + abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone) if err != nil { return nil, err } diff --git a/tools/confix/data/v2-app.toml b/tools/confix/data/v2-app.toml index 5518f2b2a323..00a4e98335d1 100644 --- a/tools/confix/data/v2-app.toml +++ b/tools/confix/data/v2-app.toml @@ -24,7 +24,10 @@ standalone = false # index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed. index-abci-events = [] -# disable-abci-events disables the ABCI event indexing. It is useful when relying on the indexer for event indexing. +# disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse. +disable-index-abci-events = false + +# disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing. disable-abci-events = false # mempool defines the configuration for the SDK built-in app-side mempool implementations. @@ -145,4 +148,4 @@ metrics-sink = '' stats-addr = '' # DatadogHostname defines the hostname to use when emitting metrics to Datadog. Only utilized if MetricsSink is set to "dogstatsd". -data-dog-hostname = '' \ No newline at end of file +data-dog-hostname = '' From d5302e2a92ba485355fb2a1f7f2b8687e082f050 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 2 Dec 2024 20:41:39 +0100 Subject: [PATCH 4/5] feedback --- server/v2/cometbft/utils.go | 34 ++--------------- server/v2/stf/core_event_service.go | 57 +++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/server/v2/cometbft/utils.go b/server/v2/cometbft/utils.go index 5ea9635440ad..658008deb85d 100644 --- a/server/v2/cometbft/utils.go +++ b/server/v2/cometbft/utils.go @@ -2,12 +2,9 @@ package cometbft import ( "context" - "encoding/json" "errors" "fmt" - "maps" "math" - "slices" "strings" "time" @@ -153,33 +150,10 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNon abciEvents := make([]abci.Event, len(events)) for i, e := range events { attrs := make([]event.Attribute, 0) - - if e.Data != nil { - resp, err := e.Data() - if err != nil { - return nil, fmt.Errorf("failed to marshal event data: %w", err) - } - - var attrMap map[string]json.RawMessage - if err := json.Unmarshal(resp, &attrMap); err != nil { - return nil, fmt.Errorf("failed to unmarshal event data: %w", err) - } - - // sort the keys to ensure the order is always the same - keys := slices.Sorted(maps.Keys(attrMap)) - for _, k := range keys { - v := attrMap[k] - attrs = append(attrs, event.Attribute{ - Key: k, - Value: string(v), - }) - } - } else { - var err error - attrs, err = e.Attributes() - if err != nil { - return nil, err - } + var err error + attrs, err = e.Attributes() + if err != nil { + return nil, err } abciEvents[i] = abci.Event{ diff --git a/server/v2/stf/core_event_service.go b/server/v2/stf/core_event_service.go index a8b2c265d1e8..182848db45eb 100644 --- a/server/v2/stf/core_event_service.go +++ b/server/v2/stf/core_event_service.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "encoding/json" + "maps" + "slices" "github.com/cosmos/gogoproto/jsonpb" gogoproto "github.com/cosmos/gogoproto/proto" @@ -37,8 +39,16 @@ type eventManager struct { // Emit emits an typed event that is defined in the protobuf file. // In the future these events will be added to consensus. func (em *eventManager) Emit(tev transaction.Msg) error { - event := event.Event{ + ev := event.Event{ Type: gogoproto.MessageName(tev), + Attributes: func() ([]event.Attribute, error) { + outerEvent, err := TypedEventToEvent(tev) + if err != nil { + return nil, err + } + + return outerEvent.Attributes() + }, Data: func() (json.RawMessage, error) { buf := new(bytes.Buffer) jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil} @@ -50,12 +60,53 @@ func (em *eventManager) Emit(tev transaction.Msg) error { }, } - em.executionContext.events = append(em.executionContext.events, event) + em.executionContext.events = append(em.executionContext.events, ev) return nil } // EmitKV emits a key value pair event. func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error { - em.executionContext.events = append(em.executionContext.events, event.NewEvent(eventType, attrs...)) + ev := event.Event{ + Type: eventType, + Attributes: func() ([]event.Attribute, error) { + return attrs, nil + }, + Data: func() (json.RawMessage, error) { + return json.Marshal(attrs) + }, + } + + em.executionContext.events = append(em.executionContext.events, ev) return nil } + +// TypedEventToEvent takes typed event and converts to Event object +func TypedEventToEvent(tev transaction.Msg) (event.Event, error) { + evtType := gogoproto.MessageName(tev) + buf := new(bytes.Buffer) + jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil} + if err := jm.Marshal(buf, tev); err != nil { + return event.Event{}, err + } + + var attrMap map[string]json.RawMessage + if err := json.Unmarshal(buf.Bytes(), &attrMap); err != nil { + return event.Event{}, err + } + + // sort the keys to ensure the order is always the same + keys := slices.Sorted(maps.Keys(attrMap)) + attrs := make([]event.Attribute, 0, len(attrMap)) + for _, k := range keys { + v := attrMap[k] + attrs = append(attrs, event.Attribute{ + Key: k, + Value: string(v), + }) + } + + return event.Event{ + Type: evtType, + Attributes: func() ([]event.Attribute, error) { return attrs, nil }, + }, nil +} From 5a873c5d550df352912d972da5d2f760d12121ab Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 2 Dec 2024 21:06:10 +0100 Subject: [PATCH 5/5] cleanup --- server/v2/cometbft/utils.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/v2/cometbft/utils.go b/server/v2/cometbft/utils.go index 658008deb85d..3929debe65a0 100644 --- a/server/v2/cometbft/utils.go +++ b/server/v2/cometbft/utils.go @@ -149,9 +149,7 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNon indexAll := len(indexSet) == 0 abciEvents := make([]abci.Event, len(events)) for i, e := range events { - attrs := make([]event.Attribute, 0) - var err error - attrs, err = e.Attributes() + attrs, err := e.Attributes() if err != nil { return nil, err }