Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansagathiya committed Sep 24, 2024
1 parent 7a9bef9 commit 4a1d525
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 2 deletions.
250 changes: 250 additions & 0 deletions dot/parachain/collator-protocol/validator_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/ChainSafe/gossamer/dot/network"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
"github.com/ChainSafe/gossamer/dot/parachain/network-bridge/events"
networkbridgeevents "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/events"
networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -119,6 +123,211 @@ func (cpvs *CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal(
return nil
}

func (cpvs *CollatorProtocolValidatorSide) handleOurViewChange(view events.View) error {
// 1. Find out removed leaves (hashes) and newly added leaves
// 2. Go over each new leaves,
// - check if perspective parachain mode is enabled
// - assign incoming
// - insert active leaves and per relay parent
activeLeaves := cpvs.activeLeaves

removed := []common.Hash{}
for activeLeaf := range activeLeaves {
if !slices.Contains(view.Heads, activeLeaf) {
removed = append(removed, activeLeaf)
}
}

newlyAdded := []common.Hash{}
for _, head := range view.Heads {
if _, ok := activeLeaves[head]; !ok {
newlyAdded = append(newlyAdded, head)
}
}

// handled newly added leaves
for _, leaf := range newlyAdded {
mode := prospectiveParachainMode()

perRelayParent := &PerRelayParent{
prospectiveParachainMode: mode,
}

err := cpvs.assignIncoming(leaf, perRelayParent)
if err != nil {
return fmt.Errorf("assigning incoming: %w", err)
}
cpvs.activeLeaves[leaf] = mode
cpvs.perRelayParent[leaf] = *perRelayParent

//nolint:staticcheck
if mode.IsEnabled {
// TODO: Add it when we have async backing
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1303 //nolint
}
}

// handle removed leaves
for _, leaf := range removed {
delete(cpvs.activeLeaves, leaf)

mode := prospectiveParachainMode()
pruned := []common.Hash{}
if mode.IsEnabled {
// TODO: Do this when we have async backing
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1340 //nolint
} else {
pruned = append(pruned, leaf)
}

for _, prunedLeaf := range pruned {
perRelayParent, ok := cpvs.perRelayParent[prunedLeaf]
if ok {
cpvs.removeOutgoing(perRelayParent)
delete(cpvs.perRelayParent, prunedLeaf)
}

for fetchedCandidateStr := range cpvs.fetchedCandidates {
fetchedCollation, err := fetchedCandidateFromString(fetchedCandidateStr)
if err != nil {
// this should never really happen
return fmt.Errorf("getting fetched collation from string: %w", err)
}

if fetchedCollation.relayParent == prunedLeaf {
delete(cpvs.fetchedCandidates, fetchedCandidateStr)
}
}
}

// TODO
// Remove blocked advertisements that left the view. cpvs.BlockedAdvertisements
// Re-trigger previously failed requests again. requestUnBlockedCollations
// prune old advertisements
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1361-L1396

}

return nil
}

func (cpvs *CollatorProtocolValidatorSide) removeOutgoing(perRelayParent PerRelayParent) {
if perRelayParent.assignment != nil {
entry := cpvs.currentAssignments[*perRelayParent.assignment]
entry--
if entry == 0 {
logger.Infof("unassigned from parachain with ID %d", *perRelayParent.assignment)
delete(cpvs.currentAssignments, *perRelayParent.assignment)
return
}

cpvs.currentAssignments[*perRelayParent.assignment] = entry
}
}

func (cpvs *CollatorProtocolValidatorSide) assignIncoming(relayParent common.Hash, perRelayParent *PerRelayParent,
) error {
// TODO: get this instance using relay parent
instance, err := cpvs.BlockState.GetRuntime(relayParent)
if err != nil {
return fmt.Errorf("getting runtime instance: %w", err)
}

validators, err := instance.ParachainHostValidators()
if err != nil {
return fmt.Errorf("getting validators: %w", err)
}

validatorGroups, err := instance.ParachainHostValidatorGroups()
if err != nil {
return fmt.Errorf("getting validator groups: %w", err)
}

availabilityCores, err := instance.ParachainHostAvailabilityCores()
if err != nil {
return fmt.Errorf("getting availability cores: %w", err)
}

validator, validatorIndex := signingKeyAndIndex(validators, cpvs.Keystore)
if validator == nil {
// return with an error?
return nil
}

groupIndex, ok := findValidatorGroup(validatorIndex, *validatorGroups)
if !ok {
logger.Trace("not a validator")
return nil
}

coreIndexNow := validatorGroups.GroupRotationInfo.CoreForGroup(groupIndex, uint8(len(availabilityCores)))
coreNow, err := availabilityCores[coreIndexNow.Index].Value()
if err != nil {
return fmt.Errorf("getting core now: %w", err)
}
var paraNow *parachaintypes.ParaID

switch c := coreNow.(type) /*coreNow.Index()*/ {
case parachaintypes.OccupiedCore:
*paraNow = parachaintypes.ParaID(c.CandidateDescriptor.ParaID)
case parachaintypes.ScheduledCore:
*paraNow = c.ParaID
case parachaintypes.Free:
// Nothing to do in case of free

}

if paraNow != nil {
entry := cpvs.currentAssignments[*paraNow]
entry++
cpvs.currentAssignments[*paraNow] = entry
if entry == 1 {
logger.Infof("got assigned to parachain with ID %d", *paraNow)
}
}

perRelayParent.assignment = paraNow
return nil
}

func findValidatorGroup(validatorIndex parachaintypes.ValidatorIndex, validatorGroups parachaintypes.ValidatorGroups,
) (parachaintypes.GroupIndex, bool) {
for groupIndex, validatorGroup := range validatorGroups.Validators {
for _, i := range validatorGroup {
if i == validatorIndex {
return parachaintypes.GroupIndex(groupIndex), true
}
}
}

return 0, false
}

// signingKeyAndIndex finds the first key we can sign with from the given set of validators,
// if any, and returns it along with the validator index.
func signingKeyAndIndex(validators []parachaintypes.ValidatorID, ks keystore.Keystore,
) (*parachaintypes.ValidatorID, parachaintypes.ValidatorIndex) {
for i, validator := range validators {
publicKey, _ := sr25519.NewPublicKey(validator[:])
keypair := ks.GetKeypair(publicKey)

if keypair != nil {
return &validator, parachaintypes.ValidatorIndex(i)
}
}

return nil, 0
}

func prospectiveParachainMode() parachaintypes.ProspectiveParachainsMode {
// TODO: complete this method by calling the runtime function
// https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/subsystem-util/src/runtime/mod.rs#L496 //nolint
// NOTE: We will return false until we have support for async backing
return parachaintypes.ProspectiveParachainsMode{
IsEnabled: false,
}
}

type SortableActivatedLeaves []parachaintypes.ActivatedLeaf

func (s SortableActivatedLeaves) Len() int {
Expand Down Expand Up @@ -441,6 +650,46 @@ func (f fetchedCollationInfo) String() string {
f.relayParent.String(), f.paraID, f.candidateHash.Value.String(), f.collatorID)
}

func fetchedCandidateFromString(str string) (fetchedCollationInfo, error) {
splits := strings.Split(str, ",")
if len(splits) != 4 {
return fetchedCollationInfo{}, fmt.Errorf("%w: %s", ErrInvalidStringFormat, str)
}

relayParent, err := common.HexToHash(strings.TrimSpace(splits[0]))
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting relay parent: %w", err)
}

paraID, err := strconv.ParseUint(strings.TrimSpace(splits[1]), 10, 64)
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting para id: %w", err)
}

candidateHashBytes, err := common.HexToBytes(strings.TrimSpace(splits[2]))
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting candidate hash bytes: %w", err)
}

candidateHash := parachaintypes.CandidateHash{
Value: common.NewHash(candidateHashBytes),
}

var collatorID parachaintypes.CollatorID
collatorIDBytes, err := common.HexToBytes(strings.TrimSpace(splits[3]))
if err != nil {
return fetchedCollationInfo{}, fmt.Errorf("getting collator id bytes: %w", err)
}
copy(collatorID[:], collatorIDBytes)

return fetchedCollationInfo{
relayParent: relayParent,
paraID: parachaintypes.ParaID(paraID),
candidateHash: candidateHash,
collatorID: collatorID,
}, nil
}

type PerRelayParent struct {
prospectiveParachainMode parachaintypes.ProspectiveParachainsMode
assignment *parachaintypes.ParaID
Expand Down Expand Up @@ -499,6 +748,7 @@ func (cpvs CollatorProtocolValidatorSide) handleNetworkBridgeEvents(msg any) err
// TODO #4155
case networkbridgeevents.OurViewChange:
// TODO #4156
cpvs.handleOurViewChange(msg.View)
case networkbridgeevents.UpdatedAuthorityIDs:
// NOTE: The validator side doesn't deal with AuthorityDiscovery IDs
case networkbridgeevents.PeerMessage[collatorprotocolmessages.CollationProtocol]:
Expand Down
4 changes: 2 additions & 2 deletions dot/parachain/network-bridge/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type PeerViewChange struct {
// Up to `N` (5?) chain heads.
type View struct {
// a bounded amount of chain heads
heads []common.Hash //nolint
Heads []common.Hash //nolint
// the highest known finalized number
finalizedNumber uint32 //nolint
FinalizedNumber uint32 //nolint
}

type OurViewChange struct {
Expand Down
4 changes: 4 additions & 0 deletions dot/parachain/network-bridge/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,15 @@ func (nbr *NetworkBridgeReceiver) updateOurView() error {
finalizedNumber: nbr.finalizedNumber,
}

// If this is the first view update since becoming active, but our view is empty,
// there is no need to send anything.
if nbr.localView == nil {
*nbr.localView = newView
return nil
}

// we only want to send a view update if the heads have changed.
// A change in finalized block is not enough to trigger a view update.
if nbr.localView.checkHeadsEqual(newView) {
// nothing to update
return nil
Expand Down

0 comments on commit 4a1d525

Please sign in to comment.