Skip to content

Commit

Permalink
feat(indexer): add to modules and implement proto fields (backport #2…
Browse files Browse the repository at this point in the history
…2544) (#22676)

Co-authored-by: Facundo Medica <14063057+facundomedica@users.noreply.github.com>
Co-authored-by: Facundo <facundomedica@gmail.com>
Co-authored-by: Julien Robert <julien@rbrt.fr>
  • Loading branch information
4 people authored Nov 28, 2024
1 parent 112d0a8 commit 61691b5
Show file tree
Hide file tree
Showing 69 changed files with 549 additions and 159 deletions.
18 changes: 15 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,12 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
defer func() {
// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil {
if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
if app.streamingManager.StopNodeOnErr {
// if StopNodeOnErr is set, we should return the streamErr in order to stop the node
err = streamErr
}
}
}
}()
Expand Down Expand Up @@ -976,12 +980,12 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
rms.SetCommitHeader(header)
}

app.cms.Commit()

resp := &abci.CommitResponse{
RetainHeight: retainHeight,
}

app.cms.Commit()

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.Context()
Expand All @@ -991,6 +995,14 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
for _, abciListener := range abciListeners {
if err := abciListener.ListenCommit(ctx, *resp, changeSet); err != nil {
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
if app.streamingManager.StopNodeOnErr {
err = fmt.Errorf("Commit listening hook failed: %w", err)
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
if rollbackErr != nil {
return nil, errors.Join(err, rollbackErr)
}
return nil, err
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/v2/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module cosmossdk.io/client/v2

go 1.23.1
go 1.23.3

require (
cosmossdk.io/api v0.8.0
Expand Down Expand Up @@ -34,7 +34,7 @@ require (
require (
buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.35.2-20241120201313-68e42a58b301.1 // indirect
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.35.2-20240130113600-88ef6483f90f.1 // indirect
cosmossdk.io/collections v0.4.1-0.20241107084833-00f3065e70ee // indirect
cosmossdk.io/collections v0.4.1-0.20241128094659-bd76b47e1d8b // indirect
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/log v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions client/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cosmossdk.io/api v0.7.3-0.20240924065902-eb7653cfecdf h1:CttA/mEIxGm4E7vwrjUpju7/Iespns08d9bOza70cIc=
cosmossdk.io/api v0.7.3-0.20240924065902-eb7653cfecdf/go.mod h1:YMfx2ATpgITsoydD3hIBa8IkDHtyXp/14rmG0d3sEew=
cosmossdk.io/collections v0.4.1-0.20241107084833-00f3065e70ee h1:OLqvi9ekfShobmdgr4Q/8pu+LjzYJSrNl9tiadPg2xY=
cosmossdk.io/collections v0.4.1-0.20241107084833-00f3065e70ee/go.mod h1:DcD++Yfcq0OFtM3CJNYLIBjfZ+4DEyeJ/AUk6gkwlOE=
cosmossdk.io/collections v0.4.1-0.20241128094659-bd76b47e1d8b h1:MgU4EDOo/pXgepHCUFQFnIfUCxk/JO0AJGDTUQhhEhg=
cosmossdk.io/collections v0.4.1-0.20241128094659-bd76b47e1d8b/go.mod h1:uf12i1yKvzEIHt2ok7poNqFDQTb71O00RQLitSynmrg=
cosmossdk.io/core v1.0.0-alpha.6 h1:5ukC4JcQKmemLQXcAgu/QoOvJI50hpBkIIg4ZT2EN8E=
cosmossdk.io/core v1.0.0-alpha.6/go.mod h1:3u9cWq1FAVtiiCrDPpo4LhR+9V6k/ycSG4/Y/tREWCY=
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 h1:NxxUo0GMJUbIuVg0R70e3cbn9eFTEuMr7ev1AFvypdY=
Expand Down
245 changes: 244 additions & 1 deletion codec/collections.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package codec

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/cosmos/gogoproto/proto"
gogotypes "github.com/cosmos/gogoproto/types"
"google.golang.org/protobuf/encoding/protojson"
protov2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"cosmossdk.io/collections"
collcodec "cosmossdk.io/collections/codec"
"cosmossdk.io/schema"
)

// BoolValue implements a ValueCodec that saves the bool value
Expand Down Expand Up @@ -51,12 +58,17 @@ type protoMessage[T any] interface {
proto.Message
}

type protoCollValueCodec[T any] interface {
collcodec.HasSchemaCodec[T]
collcodec.ValueCodec[T]
}

// CollValue inits a collections.ValueCodec for a generic gogo protobuf message.
func CollValue[T any, PT protoMessage[T]](cdc interface {
Marshal(proto.Message) ([]byte, error)
Unmarshal([]byte, proto.Message) error
},
) collcodec.ValueCodec[T] {
) protoCollValueCodec[T] {
return &collValue[T, PT]{cdc.(Codec), proto.MessageName(PT(new(T)))}
}

Expand Down Expand Up @@ -91,6 +103,139 @@ func (c collValue[T, PT]) ValueType() string {
return "github.com/cosmos/gogoproto/" + c.messageName
}

func (c collValue[T, PT]) SchemaCodec() (collcodec.SchemaCodec[T], error) {
var (
t T
pt PT
)
msgName := proto.MessageName(pt)
desc, err := proto.HybridResolver.FindDescriptorByName(protoreflect.FullName(msgName))
if err != nil {
return collcodec.SchemaCodec[T]{}, fmt.Errorf("could not find descriptor for %s: %w", msgName, err)
}
schemaFields := protoCols(desc.(protoreflect.MessageDescriptor))

kind := schema.KindForGoValue(t)
if err := kind.Validate(); err == nil {
return collcodec.SchemaCodec[T]{
Fields: []schema.Field{{
// we don't set any name so that this can be set to a good default by the caller
Name: "",
Kind: kind,
}},
// these can be nil because T maps directly to a schema value for this kind
ToSchemaType: nil,
FromSchemaType: nil,
}, nil
} else {
return collcodec.SchemaCodec[T]{
Fields: schemaFields,
ToSchemaType: func(t T) (any, error) {
values := []interface{}{}
msgDesc, ok := desc.(protoreflect.MessageDescriptor)
if !ok {
return nil, fmt.Errorf("expected message descriptor, got %T", desc)
}

nm := dynamicpb.NewMessage(msgDesc)
bz, err := c.cdc.Marshal(any(&t).(PT))
if err != nil {
return nil, err
}

err = c.cdc.Unmarshal(bz, nm)
if err != nil {
return nil, err
}

for _, field := range schemaFields {
// Find the field descriptor by the Protobuf field name
fieldDesc := msgDesc.Fields().ByName(protoreflect.Name(field.Name))
if fieldDesc == nil {
return nil, fmt.Errorf("field %q not found in message %s", field.Name, desc.FullName())
}

val := nm.ProtoReflect().Get(fieldDesc)

// if the field is a map or list, we need to convert it to a slice of values
if fieldDesc.IsList() {
repeatedVals := []interface{}{}
list := val.List()
for i := 0; i < list.Len(); i++ {
repeatedVals = append(repeatedVals, list.Get(i).Interface())
}
values = append(values, repeatedVals)
continue
}

switch fieldDesc.Kind() {
case protoreflect.BoolKind:
values = append(values, val.Bool())
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind,
protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
values = append(values, val.Int())
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind, protoreflect.Uint64Kind,
protoreflect.Fixed64Kind:
values = append(values, val.Uint())
case protoreflect.FloatKind, protoreflect.DoubleKind:
values = append(values, val.Float())
case protoreflect.StringKind:
values = append(values, val.String())
case protoreflect.BytesKind:
values = append(values, val.Bytes())
case protoreflect.EnumKind:
// TODO: postgres uses the enum name, not the number
values = append(values, string(fieldDesc.Enum().Values().ByNumber(val.Enum()).Name()))
case protoreflect.MessageKind:
msg := val.Interface().(*dynamicpb.Message)
msgbz, err := c.cdc.Marshal(msg)
if err != nil {
return nil, err
}

if field.Kind == schema.TimeKind {
// make it a time.Time
ts := &timestamppb.Timestamp{}
err = c.cdc.Unmarshal(msgbz, ts)
if err != nil {
return nil, fmt.Errorf("error unmarshalling timestamp: %w %x %s", err, msgbz, fieldDesc.FullName())
}
values = append(values, ts.AsTime())
} else if field.Kind == schema.DurationKind {
// make it a time.Duration
dur := &durationpb.Duration{}
err = c.cdc.Unmarshal(msgbz, dur)
if err != nil {
return nil, fmt.Errorf("error unmarshalling duration: %w", err)
}
values = append(values, dur.AsDuration())
} else {
// if not a time or duration, just keep it as a JSON object
// we might want to change this to include the entire object as separate fields
bz, err := c.cdc.MarshalJSON(msg)
if err != nil {
return nil, fmt.Errorf("error marshaling message: %w", err)
}

values = append(values, json.RawMessage(bz))
}
}

}

// if there's only one value, return it directly
if len(values) == 1 {
return values[0], nil
}
return values, nil
},
FromSchemaType: func(a any) (T, error) {
panic("not implemented")
},
}, nil
}
}

type protoMessageV2[T any] interface {
*T
protov2.Message
Expand Down Expand Up @@ -179,3 +324,101 @@ func (c collInterfaceValue[T]) ValueType() string {
var t T
return fmt.Sprintf("%T", t)
}

// SchemaCodec returns a schema codec, which will always have a single JSON field
// as there is no way to know in advance the necessary fields for an interface.
func (c collInterfaceValue[T]) SchemaCodec() (collcodec.SchemaCodec[T], error) {
var pt T

kind := schema.KindForGoValue(pt)
if err := kind.Validate(); err == nil {
return collcodec.SchemaCodec[T]{
Fields: []schema.Field{{
// we don't set any name so that this can be set to a good default by the caller
Name: "",
Kind: kind,
}},
// these can be nil because T maps directly to a schema value for this kind
ToSchemaType: nil,
FromSchemaType: nil,
}, nil
} else {
return collcodec.SchemaCodec[T]{
Fields: []schema.Field{{
Name: "value",
Kind: schema.JSONKind,
}},
ToSchemaType: func(t T) (any, error) {
bz, err := c.codec.MarshalInterfaceJSON(t)
if err != nil {
return nil, err
}

return json.RawMessage(bz), nil
},
FromSchemaType: func(a any) (T, error) {
panic("not implemented")
},
}, nil
}
}

func protoCols(desc protoreflect.MessageDescriptor) []schema.Field {
nFields := desc.Fields()
cols := make([]schema.Field, 0, nFields.Len())
for i := 0; i < nFields.Len(); i++ {
f := nFields.Get(i)
cols = append(cols, protoCol(f))
}
return cols
}

func protoCol(f protoreflect.FieldDescriptor) schema.Field {
col := schema.Field{Name: string(f.Name())}
if f.IsMap() || f.IsList() {
col.Kind = schema.JSONKind
col.Nullable = true
} else {
switch f.Kind() {
case protoreflect.BoolKind:
col.Kind = schema.BoolKind
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
col.Kind = schema.Int32Kind
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
col.Kind = schema.Int64Kind
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
col.Kind = schema.Int64Kind
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
col.Kind = schema.Uint64Kind
case protoreflect.FloatKind:
col.Kind = schema.Float32Kind
case protoreflect.DoubleKind:
col.Kind = schema.Float64Kind
case protoreflect.StringKind:
col.Kind = schema.StringKind
case protoreflect.BytesKind:
col.Kind = schema.BytesKind
case protoreflect.EnumKind:
// TODO: support enums
col.Kind = schema.EnumKind
// use the full name to avoid collissions
col.ReferencedType = string(f.Enum().FullName())
col.ReferencedType = strings.ReplaceAll(col.ReferencedType, ".", "_")
case protoreflect.MessageKind:
col.Nullable = true
fullName := f.Message().FullName()
if fullName == "google.protobuf.Timestamp" {
col.Kind = schema.TimeKind
} else if fullName == "google.protobuf.Duration" {
col.Kind = schema.DurationKind
} else {
col.Kind = schema.JSONKind
}
}
if f.HasPresence() {
col.Nullable = true
}
}

return col
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
go 1.23.1
go 1.23.2

module github.com/cosmos/cosmos-sdk

require (
buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.35.2-20241120201313-68e42a58b301.1 // indirect
cosmossdk.io/api v0.8.0 // main
cosmossdk.io/collections v0.4.1-0.20241107084833-00f3065e70ee // main
cosmossdk.io/collections v0.4.1-0.20241128094659-bd76b47e1d8b // main
cosmossdk.io/core v1.0.0-alpha.6
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // main
cosmossdk.io/depinject v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cosmossdk.io/api v0.7.3-0.20241127063259-f296a5005ce8 h1:KCi0Wq5M0JST0I01HebEDHt7//tZMx2bW4tmlc4IRnI=
cosmossdk.io/api v0.7.3-0.20241127063259-f296a5005ce8/go.mod h1:vZy0Ev95gwANXt5ssiDui4L5nlMYO5bzqR77hCjIz9s=
cosmossdk.io/collections v0.4.1-0.20241107084833-00f3065e70ee h1:OLqvi9ekfShobmdgr4Q/8pu+LjzYJSrNl9tiadPg2xY=
cosmossdk.io/collections v0.4.1-0.20241107084833-00f3065e70ee/go.mod h1:DcD++Yfcq0OFtM3CJNYLIBjfZ+4DEyeJ/AUk6gkwlOE=
cosmossdk.io/collections v0.4.1-0.20241128094659-bd76b47e1d8b h1:MgU4EDOo/pXgepHCUFQFnIfUCxk/JO0AJGDTUQhhEhg=
cosmossdk.io/collections v0.4.1-0.20241128094659-bd76b47e1d8b/go.mod h1:uf12i1yKvzEIHt2ok7poNqFDQTb71O00RQLitSynmrg=
cosmossdk.io/core v1.0.0-alpha.6 h1:5ukC4JcQKmemLQXcAgu/QoOvJI50hpBkIIg4ZT2EN8E=
cosmossdk.io/core v1.0.0-alpha.6/go.mod h1:3u9cWq1FAVtiiCrDPpo4LhR+9V6k/ycSG4/Y/tREWCY=
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 h1:NxxUo0GMJUbIuVg0R70e3cbn9eFTEuMr7ev1AFvypdY=
Expand Down
Loading

0 comments on commit 61691b5

Please sign in to comment.