Skip to content

Commit

Permalink
Merge pull request #6795 from onflow/yurii/6785-dkg-index-map-backwar…
Browse files Browse the repository at this point in the history
…d-compatible
  • Loading branch information
durkmurder authored Dec 16, 2024
2 parents c7c7e5a + 92ec89a commit b1b6523
Show file tree
Hide file tree
Showing 13 changed files with 687 additions and 29 deletions.
118 changes: 117 additions & 1 deletion model/convert/service_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,46 @@ func convertServiceEventEpochSetup(event flow.Event) (*flow.ServiceEvent, error)
return serviceEvent, nil
}

// convertServiceEventEpochCommit is a wrapper function to support backward-compatible event parsing for [flow.EpochCommit] events.
// It delegates to the version-specific conversion function based on the number of fields in the event.
// TODO(EFM, #6794): Replace this function with the body of `convertServiceEventEpochCommitV1` once the network upgrade is complete.
func convertServiceEventEpochCommit(event flow.Event) (*flow.ServiceEvent, error) {
// decode bytes using ccf
payload, err := ccf.Decode(nil, event.Payload)
if err != nil {
return nil, fmt.Errorf("could not unmarshal event payload: %w", err)
}

cdcEvent, ok := payload.(cadence.Event)
if !ok {
return nil, invalidCadenceTypeError("payload", payload, cadence.Event{})
}

if cdcEvent.Type() == nil {
return nil, fmt.Errorf("EpochCommit event doesn't have type")
}

fields := cadence.FieldsMappedByName(cdcEvent)

switch len(fields) {
case 3:
return convertServiceEventEpochCommitV0(event)
case 5:
return convertServiceEventEpochCommitV1(event)
default:
return nil, fmt.Errorf(
"invalid number of fields in EpochCommit event, expect 3 or 5, got: %d",
len(fields),
)
}
}

// convertServiceEventEpochCommit converts a service event encoded as the generic
// flow.Event type to a ServiceEvent type for an EpochCommit event.
// CAUTION: This function must only be used for input events computed locally, by an
// Execution or Verification Node; it is not resilient to malicious inputs.
// No errors are expected during normal operation.
func convertServiceEventEpochCommit(event flow.Event) (*flow.ServiceEvent, error) {
func convertServiceEventEpochCommitV1(event flow.Event) (*flow.ServiceEvent, error) {
// decode bytes using ccf
payload, err := ccf.Decode(nil, event.Payload)
if err != nil {
Expand Down Expand Up @@ -304,6 +338,88 @@ func convertServiceEventEpochCommit(event flow.Event) (*flow.ServiceEvent, error
return serviceEvent, nil
}

// convertServiceEventEpochCommit converts a service event encoded as the generic
// flow.Event type to a ServiceEvent type for an EpochCommit event.
// CAUTION: This function must only be used for input events computed locally, by an
// Execution or Verification Node; it is not resilient to malicious inputs.
// No errors are expected during normal operation.
// TODO(EFM, #6794): Remove this once we complete the network upgrade
func convertServiceEventEpochCommitV0(event flow.Event) (*flow.ServiceEvent, error) {
// decode bytes using ccf
payload, err := ccf.Decode(nil, event.Payload)
if err != nil {
return nil, fmt.Errorf("could not unmarshal event payload: %w", err)
}

cdcEvent, ok := payload.(cadence.Event)
if !ok {
return nil, invalidCadenceTypeError("payload", payload, cadence.Event{})
}

if cdcEvent.Type() == nil {
return nil, fmt.Errorf("EpochCommit event doesn't have type")
}

fields := cadence.FieldsMappedByName(cdcEvent)

const expectedFieldCount = 3
if len(fields) < expectedFieldCount {
return nil, fmt.Errorf(
"insufficient fields in EpochCommit event (%d < %d)",
len(fields),
expectedFieldCount,
)
}

// Extract EpochCommit event fields

counter, err := getField[cadence.UInt64](fields, "counter")
if err != nil {
return nil, fmt.Errorf("failed to decode EpochCommit event: %w", err)
}

cdcClusterQCVotes, err := getField[cadence.Array](fields, "clusterQCs")
if err != nil {
return nil, fmt.Errorf("failed to decode EpochCommit event: %w", err)
}

cdcDKGKeys, err := getField[cadence.Array](fields, "dkgPubKeys")
if err != nil {
return nil, fmt.Errorf("failed to decode EpochCommit event: %w", err)
}

commit := &flow.EpochCommit{
Counter: uint64(counter),
}

// parse cluster qc votes
commit.ClusterQCs, err = convertClusterQCVotes(cdcClusterQCVotes.Values)
if err != nil {
return nil, fmt.Errorf("could not convert cluster qc votes: %w", err)
}

// parse DKG group key and participants
// Note: this is read in the same order as `DKGClient.SubmitResult` ie. with the group public key first followed by individual keys
// https://github.com/onflow/flow-go/blob/feature/dkg/module/dkg/client.go#L182-L183
commit.DKGGroupKey, err = convertDKGKey(cdcDKGKeys.Values[0])
if err != nil {
return nil, fmt.Errorf("could not convert DKG group key: %w", err)
}
commit.DKGParticipantKeys, err = convertDKGKeys(cdcDKGKeys.Values[1:])
if err != nil {
return nil, fmt.Errorf("could not convert DKG keys: %w", err)
}
commit.DKGIndexMap = nil

// create the service event
serviceEvent := &flow.ServiceEvent{
Type: flow.ServiceEventCommit,
Event: commit,
}

return serviceEvent, nil
}

// convertServiceEventEpochRecover converts a service event encoded as the generic
// flow.Event type to a ServiceEvent type for an EpochRecover event.
// CAUTION: This function must only be used for input events computed locally, by an
Expand Down
19 changes: 19 additions & 0 deletions model/convert/service_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ func TestEventConversion(t *testing.T) {
},
)

// TODO(EFM, #6794): Remove this once we complete the network upgrade, this is only to test
// backward compatibility of EpochCommit service event
t.Run("epoch commit, backward compatibility", func(t *testing.T) {

fixture, expected := unittest.EpochCommitV0FixtureByChainID(chainID)

// convert Cadence types to Go types
event, err := convert.ServiceEvent(chainID, fixture)
require.NoError(t, err)
require.NotNil(t, event)

// cast event type to epoch commit
actual, ok := event.Event.(*flow.EpochCommit)
require.True(t, ok)

assert.Equal(t, expected, actual)
},
)

t.Run("epoch recover", func(t *testing.T) {
fixture, expected := unittest.EpochRecoverFixtureByChainID(chainID)

Expand Down
57 changes: 42 additions & 15 deletions model/flow/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,23 +399,50 @@ func (commit *EpochCommit) UnmarshalMsgpack(b []byte) error {
// differently from JSON/msgpack, because it does not handle custom encoders
// within map types.
// NOTE: DecodeRLP is not needed, as this is only used for hashing.
// TODO(EFM, #6794): Currently we implement RLP encoding based on availability of DKGIndexMap
// this is needed to support backward compatibility, to guarantee that we will produce same hash
// for the same event. This should be removed once we complete the network upgrade.
func (commit *EpochCommit) EncodeRLP(w io.Writer) error {
rlpEncodable := struct {
Counter uint64
ClusterQCs []ClusterQCVoteData
DKGGroupKey []byte
DKGParticipantKeys [][]byte
}{
Counter: commit.Counter,
ClusterQCs: commit.ClusterQCs,
DKGGroupKey: commit.DKGGroupKey.Encode(),
DKGParticipantKeys: make([][]byte, 0, len(commit.DKGParticipantKeys)),
}
for _, key := range commit.DKGParticipantKeys {
rlpEncodable.DKGParticipantKeys = append(rlpEncodable.DKGParticipantKeys, key.Encode())
}
if commit.DKGIndexMap == nil {
rlpEncodable := struct {
Counter uint64
ClusterQCs []ClusterQCVoteData
DKGGroupKey []byte
DKGParticipantKeys [][]byte
}{
Counter: commit.Counter,
ClusterQCs: commit.ClusterQCs,
DKGGroupKey: commit.DKGGroupKey.Encode(),
DKGParticipantKeys: make([][]byte, 0, len(commit.DKGParticipantKeys)),
}
for _, key := range commit.DKGParticipantKeys {
rlpEncodable.DKGParticipantKeys = append(rlpEncodable.DKGParticipantKeys, key.Encode())
}

return rlp.Encode(w, rlpEncodable)
} else {
rlpEncodable := struct {
Counter uint64
ClusterQCs []ClusterQCVoteData
DKGGroupKey []byte
DKGParticipantKeys [][]byte
DKGIndexMap IdentifierList
}{
Counter: commit.Counter,
ClusterQCs: commit.ClusterQCs,
DKGGroupKey: commit.DKGGroupKey.Encode(),
DKGParticipantKeys: make([][]byte, 0, len(commit.DKGParticipantKeys)),
DKGIndexMap: make(IdentifierList, len(commit.DKGIndexMap)),
}
for _, key := range commit.DKGParticipantKeys {
rlpEncodable.DKGParticipantKeys = append(rlpEncodable.DKGParticipantKeys, key.Encode())
}
for id, index := range commit.DKGIndexMap {
rlpEncodable.DKGIndexMap[index] = id
}

return rlp.Encode(w, rlpEncodable)
return rlp.Encode(w, rlpEncodable)
}
}

// ID returns the hash of the event contents.
Expand Down
70 changes: 69 additions & 1 deletion state/protocol/inmem/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/onflow/crypto"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/state/protocol"
)

Expand All @@ -14,7 +15,15 @@ type DKG flow.EpochCommit

var _ protocol.DKG = (*DKG)(nil)

func NewDKG(commit *flow.EpochCommit) *DKG { return (*DKG)(commit) }
// NewDKG creates a new DKG instance from the given setup and commit events.
// TODO(EFM, #6794): Remove branch `commit.DKGIndexMap == nil` once we complete the network upgrade.
func NewDKG(setup *flow.EpochSetup, commit *flow.EpochCommit) protocol.DKG {
if commit.DKGIndexMap == nil {
return NewDKGv0(setup, commit)
} else {
return (*DKG)(commit)
}
}

func (d *DKG) Size() uint { return uint(len(d.DKGParticipantKeys)) }
func (d *DKG) GroupKey() crypto.PublicKey { return d.DKGGroupKey }
Expand Down Expand Up @@ -56,3 +65,62 @@ func (d *DKG) NodeID(index uint) (flow.Identifier, error) {
}
return flow.ZeroID, fmt.Errorf("inconsistent DKG state: missing index %d", index)
}

// DKGv0 implements the protocol.DKG interface for the EpochCommit model used before Protocol State Version 2.
// This model is used for [flow.EpochCommit] events without the DKGIndexMap field.
// TODO(EFM, #6794): Remove this once we complete the network upgrade
type DKGv0 struct {
Participants flow.IdentitySkeletonList
Commit *flow.EpochCommit
}

var _ protocol.DKG = (*DKGv0)(nil)

func NewDKGv0(setup *flow.EpochSetup, commit *flow.EpochCommit) *DKGv0 {
return &DKGv0{
Participants: setup.Participants.Filter(filter.IsConsensusCommitteeMember),
Commit: commit,
}
}

func (d DKGv0) Size() uint {
return uint(len(d.Participants))
}

func (d DKGv0) GroupKey() crypto.PublicKey {
return d.Commit.DKGGroupKey
}

// Index returns the DKG index for the given node.
// Expected error during normal operations:
// - protocol.IdentityNotFoundError if nodeID is not a known DKG participant
func (d DKGv0) Index(nodeID flow.Identifier) (uint, error) {
index, exists := d.Participants.GetIndex(nodeID)
if !exists {
return 0, protocol.IdentityNotFoundError{NodeID: nodeID}
}
return index, nil
}

// KeyShare returns the public key share for the given node.
// Expected error during normal operations:
// - protocol.IdentityNotFoundError if nodeID is not a known DKG participant
func (d DKGv0) KeyShare(nodeID flow.Identifier) (crypto.PublicKey, error) {
index, err := d.Index(nodeID)
if err != nil {
return nil, err
}
return d.Commit.DKGParticipantKeys[index], nil
}

// NodeID returns the node identifier for the given index.
// An exception is returned if the index is ≥ Size().
// Intended for use outside the hotpath, with runtime
// scaling linearly in the number of DKG participants (ie. Size())
func (d DKGv0) NodeID(index uint) (flow.Identifier, error) {
identity, exists := d.Participants.ByIndex(index)
if !exists {
return flow.ZeroID, fmt.Errorf("inconsistent DKG state: missing index %d", index)
}
return identity.NodeID, nil
}
59 changes: 59 additions & 0 deletions state/protocol/inmem/dkg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package inmem_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/utils/unittest"
)

// TestDKGv0 tests that the [inmem.DKG] is backward compatible with the v0 DKG protocol model.
// We test this by creating a [inmem.DKG] instance from a v0 [flow.EpochSetup] and [flow.EpochCommit]
// and verifying that the DKG methods return the expected values.
// TODO(EFM, #6794): Update this test to use the new DKG model.
func TestDKG(t *testing.T) {
consensusParticipants := unittest.IdentityListFixture(5, unittest.WithRole(flow.RoleConsensus)).Sort(flow.Canonical[flow.Identity])
otherParticipants := unittest.IdentityListFixture(10, unittest.WithAllRolesExcept(flow.RoleConsensus))
setup := unittest.EpochSetupFixture(unittest.WithParticipants(append(consensusParticipants, otherParticipants...).ToSkeleton()))
commit := unittest.EpochCommitFixture(unittest.WithDKGFromParticipants(setup.Participants))
dkg := inmem.NewDKGv0(setup, commit)
t.Run("Index", func(t *testing.T) {
for i, participant := range consensusParticipants {
index, err := dkg.Index(participant.NodeID)
require.NoError(t, err)
require.Equal(t, uint(i), index)
}
_, err := dkg.Index(otherParticipants[0].NodeID)
require.Error(t, err)
require.True(t, protocol.IsIdentityNotFound(err))
})
t.Run("NodeID", func(t *testing.T) {
for i, participant := range consensusParticipants {
nodeID, err := dkg.NodeID(uint(i))
require.NoError(t, err)
require.Equal(t, participant.NodeID, nodeID)
}
_, err := dkg.NodeID(uint(len(consensusParticipants)))
require.Error(t, err)
})
t.Run("KeyShare", func(t *testing.T) {
for i, participant := range consensusParticipants {
keyShare, err := dkg.KeyShare(participant.NodeID)
require.NoError(t, err)
require.Equal(t, commit.DKGParticipantKeys[i], keyShare)
}
_, err := dkg.KeyShare(otherParticipants[0].NodeID)
require.Error(t, err)
require.True(t, protocol.IsIdentityNotFound(err))
})
t.Run("Size", func(t *testing.T) {
require.Equal(t, uint(len(consensusParticipants)), dkg.Size())
})
t.Run("GroupKey", func(t *testing.T) {
require.Equal(t, commit.DKGGroupKey, dkg.GroupKey())
})
}
2 changes: 1 addition & 1 deletion state/protocol/inmem/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (es *committedEpoch) ClusterByChainID(chainID flow.ChainID) (protocol.Clust
}

func (es *committedEpoch) DKG() (protocol.DKG, error) {
return NewDKG(es.commitEvent), nil
return NewDKG(es.setupEvent, es.commitEvent), nil
}

// heightBoundedEpoch represents an epoch (with counter N) for which we know either
Expand Down
2 changes: 1 addition & 1 deletion state/protocol/inmem/epoch_protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *EpochProtocolStateAdapter) EpochCommit() *flow.EpochCommit {
// DKG returns the DKG information for the current epoch.
// No errors are expected during normal operations.
func (s *EpochProtocolStateAdapter) DKG() (protocol.DKG, error) {
return NewDKG(s.CurrentEpochCommit), nil
return NewDKG(s.CurrentEpochSetup, s.CurrentEpochCommit), nil
}

// Entry Returns low-level protocol state entry that was used to initialize this object.
Expand Down
Loading

0 comments on commit b1b6523

Please sign in to comment.