Skip to content

Commit

Permalink
handle our view change in collator protocol validator side
Browse files Browse the repository at this point in the history
network bridge will send a network bridge update message to collator protocol
when it see new active leaves.

this commit handles that network bridge event for our view change
  • Loading branch information
kishansagathiya committed Sep 24, 2024
1 parent 4a1d525 commit 5014401
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 302 deletions.
2 changes: 1 addition & 1 deletion dot/parachain/collator-protocol/validator_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (cpvs CollatorProtocolValidatorSide) handleNetworkBridgeEvents(msg any) err
// TODO #4155
case networkbridgeevents.OurViewChange:
// TODO #4156
cpvs.handleOurViewChange(msg.View)
return cpvs.handleOurViewChange(msg.View)
case networkbridgeevents.UpdatedAuthorityIDs:
// NOTE: The validator side doesn't deal with AuthorityDiscovery IDs
case networkbridgeevents.PeerMessage[collatorprotocolmessages.CollationProtocol]:
Expand Down
4 changes: 2 additions & 2 deletions dot/parachain/network-bridge/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type PeerViewChange struct {
// Up to `N` (5?) chain heads.
type View struct {
// a bounded amount of chain heads
Heads []common.Hash //nolint
Heads []common.Hash
// the highest known finalized number
FinalizedNumber uint32 //nolint
FinalizedNumber uint32
}

type OurViewChange struct {
Expand Down
301 changes: 2 additions & 299 deletions dot/parachain/network-bridge/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"fmt"
"slices"
"sort"
"strconv"
"strings"

"github.com/ChainSafe/gossamer/dot/network"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
Expand All @@ -23,7 +21,6 @@ import (
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -46,22 +43,6 @@ type NetworkBridgeReceiver struct {

localView *View

// Parachains we're currently assigned to. With async backing enabled
// this includes assignments from the implicit view.
currentAssignments map[parachaintypes.ParaID]uint

// All active leaves observed by us, including both that do and do not
// support prospective parachains. This mapping works as a replacement for
// [`polkadot_node_network_protocol::View`] and can be dropped once the transition
// to asynchronous backing is done.
activeLeaves map[common.Hash]parachaintypes.ProspectiveParachainsMode

// state tracked per relay parent
perRelayParent map[common.Hash]PerRelayParent // map[relay parent]PerRelayParent

// Collations that we have successfully requested from peers and waiting
// on validation.
fetchedCandidates map[string]CollationEvent
// heads are sorted in descending order by block number
liveHeads []parachaintypes.ActivatedLeaf

Expand Down Expand Up @@ -93,71 +74,6 @@ type CollationEvent struct {
PendingCollation PendingCollation
}

// Identifier of a fetched collation
type fetchedCollationInfo struct {
// Candidate's relay parent
relayParent common.Hash
paraID parachaintypes.ParaID
candidateHash parachaintypes.CandidateHash
// Id of the collator the collation was fetched from
collatorID parachaintypes.CollatorID
}

func (f fetchedCollationInfo) String() string {
return fmt.Sprintf("relay parent: %s, para id: %d, candidate hash: %s, collator id: %+v",
f.relayParent.String(), f.paraID, f.candidateHash.Value.String(), f.collatorID)
}

func fetchedCandidateFromString(str string) (fetchedCollationInfo, error) {
splits := strings.Split(str, ",")
if len(splits) != 4 {
return fetchedCollationInfo{}, fmt.Errorf("%w: %s", ErrInvalidStringFormat, str)
}

relayParent, err := common.HexToHash(strings.TrimSpace(splits[0]))
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting relay parent: %w", err)
}

paraID, err := strconv.ParseUint(strings.TrimSpace(splits[1]), 10, 64)
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting para id: %w", err)
}

candidateHashBytes, err := common.HexToBytes(strings.TrimSpace(splits[2]))
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting candidate hash bytes: %w", err)
}

candidateHash := parachaintypes.CandidateHash{
Value: common.NewHash(candidateHashBytes),
}

var collatorID parachaintypes.CollatorID
collatorIDBytes, err := common.HexToBytes(strings.TrimSpace(splits[3]))
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting collator id bytes: %w", err)
}
copy(collatorID[:], collatorIDBytes)

return fetchedCollationInfo{
relayParent: relayParent,
paraID: parachaintypes.ParaID(paraID),
candidateHash: candidateHash,
collatorID: collatorID,
}, nil
}

type PerRelayParent struct {
prospectiveParachainMode parachaintypes.ProspectiveParachainsMode
assignment *parachaintypes.ParaID
}

type UnfetchedCollation struct {
CollatorID parachaintypes.CollatorID
PendingCollation PendingCollation
}

type PendingCollation struct {
RelayParent common.Hash
ParaID parachaintypes.ParaID
Expand Down Expand Up @@ -275,7 +191,7 @@ func (s SortableActivatedLeaves) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (nbr *NetworkBridgeReceiver) updateOurView() error {
func (nbr *NetworkBridgeReceiver) updateOurView() error { //nolint
headHashes := []common.Hash{}
for _, head := range nbr.liveHeads {
headHashes = append(headHashes, head.Hash)
Expand Down Expand Up @@ -304,134 +220,12 @@ func (nbr *NetworkBridgeReceiver) updateOurView() error {
// TODO: send ViewUpdate to all the collation peers and validation peers (v1, v2, v3)
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/bridge/src/rx/mod.rs#L969-L1013

// TODO: Create our view and send collation events to all subsystems about our view change
// TODO #4156 Create our view and send collation events to all subsystems about our view change
// Just create the network bridge and do both of these tasks as part of those. That's the only way it makes sense.

err := nbr.handleOurViewChange(newView)
if err != nil {
return fmt.Errorf("handling our view change: %w", err)
}
return nil
}

func (nbr *NetworkBridgeReceiver) handleOurViewChange(view View) error {
// 1. Find out removed leaves (hashes) and newly added leaves
// 2. Go over each new leaves,
// - check if perspective parachain mode is enabled
// - assign incoming
// - insert active leaves and per relay parent
activeLeaves := nbr.activeLeaves

removed := []common.Hash{}
for activeLeaf := range activeLeaves {
if !slices.Contains(view.heads, activeLeaf) {
removed = append(removed, activeLeaf)
}
}

newlyAdded := []common.Hash{}
for _, head := range view.heads {
if _, ok := activeLeaves[head]; !ok {
newlyAdded = append(newlyAdded, head)
}
}

// handled newly added leaves
for _, leaf := range newlyAdded {
mode := prospectiveParachainMode()

perRelayParent := &PerRelayParent{
prospectiveParachainMode: mode,
}

err := nbr.assignIncoming(leaf, perRelayParent)
if err != nil {
return fmt.Errorf("assigning incoming: %w", err)
}
nbr.activeLeaves[leaf] = mode
nbr.perRelayParent[leaf] = *perRelayParent

//nolint:staticcheck
if mode.IsEnabled {
// TODO: Add it when we have async backing
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1303 //nolint
}
}

// handle removed leaves
for _, leaf := range removed {
delete(nbr.activeLeaves, leaf)

mode := prospectiveParachainMode()
pruned := []common.Hash{}
if mode.IsEnabled {
// TODO: Do this when we have async backing
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1340 //nolint
} else {
pruned = append(pruned, leaf)
}

for _, prunedLeaf := range pruned {
perRelayParent, ok := nbr.perRelayParent[prunedLeaf]
if ok {
nbr.removeOutgoing(perRelayParent)
delete(nbr.perRelayParent, prunedLeaf)
}

for fetchedCandidateStr := range nbr.fetchedCandidates {
fetchedCollation, err := fetchedCandidateFromString(fetchedCandidateStr)
if err != nil {
// this should never really happen
return fmt.Errorf("getting fetched collation from string: %w", err)
}

if fetchedCollation.relayParent == prunedLeaf {
delete(nbr.fetchedCandidates, fetchedCandidateStr)
}
}
}

// TODO
// Remove blocked advertisements that left the view. cpvs.BlockedAdvertisements
// Re-trigger previously failed requests again. requestUnBlockedCollations
// prune old advertisements
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1361-L1396

}

return nil
}

func (nbr *NetworkBridgeReceiver) removeOutgoing(perRelayParent PerRelayParent) {
if perRelayParent.assignment != nil {
entry := nbr.currentAssignments[*perRelayParent.assignment]
entry--
if entry == 0 {
logger.Infof("unassigned from parachain with ID %d", *perRelayParent.assignment)
delete(nbr.currentAssignments, *perRelayParent.assignment)
return
}

nbr.currentAssignments[*perRelayParent.assignment] = entry
}
}

// signingKeyAndIndex finds the first key we can sign with from the given set of validators,
// if any, and returns it along with the validator index.
func signingKeyAndIndex(validators []parachaintypes.ValidatorID, ks keystore.Keystore,
) (*parachaintypes.ValidatorID, parachaintypes.ValidatorIndex) {
for i, validator := range validators {
publicKey, _ := sr25519.NewPublicKey(validator[:])
keypair := ks.GetKeypair(publicKey)

if keypair != nil {
return &validator, parachaintypes.ValidatorIndex(i)
}
}

return nil, 0
}

func (nbr *NetworkBridgeReceiver) handleCollationMessage(
sender peer.ID, msg network.NotificationsMessage) (bool, error) {

Expand Down Expand Up @@ -486,97 +280,6 @@ func (nbr *NetworkBridgeReceiver) handleValidationMessage(
return propagate, nil
}

func (nbr *NetworkBridgeReceiver) assignIncoming(relayParent common.Hash, perRelayParent *PerRelayParent,
) error {
// TODO: get this instance using relay parent
instance, err := nbr.BlockState.GetRuntime(relayParent)
if err != nil {
return fmt.Errorf("getting runtime instance: %w", err)
}

validators, err := instance.ParachainHostValidators()
if err != nil {
return fmt.Errorf("getting validators: %w", err)
}

validatorGroups, err := instance.ParachainHostValidatorGroups()
if err != nil {
return fmt.Errorf("getting validator groups: %w", err)
}

availabilityCores, err := instance.ParachainHostAvailabilityCores()
if err != nil {
return fmt.Errorf("getting availability cores: %w", err)
}

validator, validatorIndex := signingKeyAndIndex(validators, nbr.Keystore)
if validator == nil {
// return with an error?
return nil
}

groupIndex, ok := findValidatorGroup(validatorIndex, *validatorGroups)
if !ok {
logger.Trace("not a validator")
return nil
}

coreIndexNow := validatorGroups.GroupRotationInfo.CoreForGroup(groupIndex, uint8(len(availabilityCores)))
coreNow, err := availabilityCores[coreIndexNow.Index].Value()
if err != nil {
return fmt.Errorf("getting core now: %w", err)
}

var paraNow parachaintypes.ParaID
var paraNowSet bool
switch c := coreNow.(type) /*coreNow.Index()*/ {
case parachaintypes.OccupiedCore:
paraNow = parachaintypes.ParaID(c.CandidateDescriptor.ParaID)
paraNowSet = true
case parachaintypes.ScheduledCore:
paraNow = c.ParaID
paraNowSet = true
case parachaintypes.Free:
// Nothing to do in case of free

}

if !paraNowSet {
entry := nbr.currentAssignments[paraNow]
entry++
nbr.currentAssignments[paraNow] = entry
if entry == 1 {
logger.Infof("got assigned to parachain with ID %d", paraNow)
}
} else {
perRelayParent.assignment = &paraNow
}

return nil
}

func findValidatorGroup(validatorIndex parachaintypes.ValidatorIndex, validatorGroups parachaintypes.ValidatorGroups,
) (parachaintypes.GroupIndex, bool) {
for groupIndex, validatorGroup := range validatorGroups.Validators {
for _, i := range validatorGroup {
if i == validatorIndex {
return parachaintypes.GroupIndex(groupIndex), true
}
}
}

return 0, false
}

func prospectiveParachainMode() parachaintypes.ProspectiveParachainsMode {
// TODO: complete this method by calling the runtime function
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/subsystem-util/src/runtime/mod.rs#L496 //nolint
// NOTE: We will return false until we have support for async backing
return parachaintypes.ProspectiveParachainsMode{
IsEnabled: false,
}
}

func (nbr *NetworkBridgeReceiver) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error {
if nbr.finalizedNumber >= signal.BlockNumber {
return ErrFinalizedNumber
Expand Down

0 comments on commit 5014401

Please sign in to comment.