diff --git a/dot/parachain/collator-protocol/validator_side.go b/dot/parachain/collator-protocol/validator_side.go index 75aefdfc17..7e22184261 100644 --- a/dot/parachain/collator-protocol/validator_side.go +++ b/dot/parachain/collator-protocol/validator_side.go @@ -7,10 +7,13 @@ import ( "context" "errors" "fmt" + "strconv" + "strings" "time" "github.com/ChainSafe/gossamer/dot/network" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" + "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/events" networkbridgeevents "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/events" networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" @@ -18,6 +21,7 @@ import ( "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/crypto/sr25519" "github.com/ChainSafe/gossamer/lib/keystore" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -119,6 +123,211 @@ func (cpvs *CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal( return nil } +func (cpvs *CollatorProtocolValidatorSide) handleOurViewChange(view events.View) error { + // 1. Find out removed leaves (hashes) and newly added leaves + // 2. Go over each new leaves, + // - check if perspective parachain mode is enabled + // - assign incoming + // - insert active leaves and per relay parent + activeLeaves := cpvs.activeLeaves + + removed := []common.Hash{} + for activeLeaf := range activeLeaves { + if !slices.Contains(view.Heads, activeLeaf) { + removed = append(removed, activeLeaf) + } + } + + newlyAdded := []common.Hash{} + for _, head := range view.Heads { + if _, ok := activeLeaves[head]; !ok { + newlyAdded = append(newlyAdded, head) + } + } + + // handled newly added leaves + for _, leaf := range newlyAdded { + mode := prospectiveParachainMode() + + perRelayParent := &PerRelayParent{ + prospectiveParachainMode: mode, + } + + err := cpvs.assignIncoming(leaf, perRelayParent) + if err != nil { + return fmt.Errorf("assigning incoming: %w", err) + } + cpvs.activeLeaves[leaf] = mode + cpvs.perRelayParent[leaf] = *perRelayParent + + //nolint:staticcheck + if mode.IsEnabled { + // TODO: Add it when we have async backing + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1303 //nolint + } + } + + // handle removed leaves + for _, leaf := range removed { + delete(cpvs.activeLeaves, leaf) + + mode := prospectiveParachainMode() + pruned := []common.Hash{} + if mode.IsEnabled { + // TODO: Do this when we have async backing + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1340 //nolint + } else { + pruned = append(pruned, leaf) + } + + for _, prunedLeaf := range pruned { + perRelayParent, ok := cpvs.perRelayParent[prunedLeaf] + if ok { + cpvs.removeOutgoing(perRelayParent) + delete(cpvs.perRelayParent, prunedLeaf) + } + + for fetchedCandidateStr := range cpvs.fetchedCandidates { + fetchedCollation, err := fetchedCandidateFromString(fetchedCandidateStr) + if err != nil { + // this should never really happen + return fmt.Errorf("getting fetched collation from string: %w", err) + } + + if fetchedCollation.relayParent == prunedLeaf { + delete(cpvs.fetchedCandidates, fetchedCandidateStr) + } + } + } + + // TODO + // Remove blocked advertisements that left the view. cpvs.BlockedAdvertisements + // Re-trigger previously failed requests again. requestUnBlockedCollations + // prune old advertisements + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1361-L1396 + + } + + return nil +} + +func (cpvs *CollatorProtocolValidatorSide) removeOutgoing(perRelayParent PerRelayParent) { + if perRelayParent.assignment != nil { + entry := cpvs.currentAssignments[*perRelayParent.assignment] + entry-- + if entry == 0 { + logger.Infof("unassigned from parachain with ID %d", *perRelayParent.assignment) + delete(cpvs.currentAssignments, *perRelayParent.assignment) + return + } + + cpvs.currentAssignments[*perRelayParent.assignment] = entry + } +} + +func (cpvs *CollatorProtocolValidatorSide) assignIncoming(relayParent common.Hash, perRelayParent *PerRelayParent, +) error { + // TODO: get this instance using relay parent + instance, err := cpvs.BlockState.GetRuntime(relayParent) + if err != nil { + return fmt.Errorf("getting runtime instance: %w", err) + } + + validators, err := instance.ParachainHostValidators() + if err != nil { + return fmt.Errorf("getting validators: %w", err) + } + + validatorGroups, err := instance.ParachainHostValidatorGroups() + if err != nil { + return fmt.Errorf("getting validator groups: %w", err) + } + + availabilityCores, err := instance.ParachainHostAvailabilityCores() + if err != nil { + return fmt.Errorf("getting availability cores: %w", err) + } + + validator, validatorIndex := signingKeyAndIndex(validators, cpvs.Keystore) + if validator == nil { + // return with an error? + return nil + } + + groupIndex, ok := findValidatorGroup(validatorIndex, *validatorGroups) + if !ok { + logger.Trace("not a validator") + return nil + } + + coreIndexNow := validatorGroups.GroupRotationInfo.CoreForGroup(groupIndex, uint8(len(availabilityCores))) + coreNow, err := availabilityCores[coreIndexNow.Index].Value() + if err != nil { + return fmt.Errorf("getting core now: %w", err) + } + var paraNow *parachaintypes.ParaID + + switch c := coreNow.(type) /*coreNow.Index()*/ { + case parachaintypes.OccupiedCore: + *paraNow = parachaintypes.ParaID(c.CandidateDescriptor.ParaID) + case parachaintypes.ScheduledCore: + *paraNow = c.ParaID + case parachaintypes.Free: + // Nothing to do in case of free + + } + + if paraNow != nil { + entry := cpvs.currentAssignments[*paraNow] + entry++ + cpvs.currentAssignments[*paraNow] = entry + if entry == 1 { + logger.Infof("got assigned to parachain with ID %d", *paraNow) + } + } + + perRelayParent.assignment = paraNow + return nil +} + +func findValidatorGroup(validatorIndex parachaintypes.ValidatorIndex, validatorGroups parachaintypes.ValidatorGroups, +) (parachaintypes.GroupIndex, bool) { + for groupIndex, validatorGroup := range validatorGroups.Validators { + for _, i := range validatorGroup { + if i == validatorIndex { + return parachaintypes.GroupIndex(groupIndex), true + } + } + } + + return 0, false +} + +// signingKeyAndIndex finds the first key we can sign with from the given set of validators, +// if any, and returns it along with the validator index. +func signingKeyAndIndex(validators []parachaintypes.ValidatorID, ks keystore.Keystore, +) (*parachaintypes.ValidatorID, parachaintypes.ValidatorIndex) { + for i, validator := range validators { + publicKey, _ := sr25519.NewPublicKey(validator[:]) + keypair := ks.GetKeypair(publicKey) + + if keypair != nil { + return &validator, parachaintypes.ValidatorIndex(i) + } + } + + return nil, 0 +} + +func prospectiveParachainMode() parachaintypes.ProspectiveParachainsMode { + // TODO: complete this method by calling the runtime function + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/subsystem-util/src/runtime/mod.rs#L496 //nolint + // NOTE: We will return false until we have support for async backing + return parachaintypes.ProspectiveParachainsMode{ + IsEnabled: false, + } +} + type SortableActivatedLeaves []parachaintypes.ActivatedLeaf func (s SortableActivatedLeaves) Len() int { @@ -441,6 +650,46 @@ func (f fetchedCollationInfo) String() string { f.relayParent.String(), f.paraID, f.candidateHash.Value.String(), f.collatorID) } +func fetchedCandidateFromString(str string) (fetchedCollationInfo, error) { + splits := strings.Split(str, ",") + if len(splits) != 4 { + return fetchedCollationInfo{}, fmt.Errorf("%w: %s", ErrInvalidStringFormat, str) + } + + relayParent, err := common.HexToHash(strings.TrimSpace(splits[0])) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting relay parent: %w", err) + } + + paraID, err := strconv.ParseUint(strings.TrimSpace(splits[1]), 10, 64) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting para id: %w", err) + } + + candidateHashBytes, err := common.HexToBytes(strings.TrimSpace(splits[2])) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting candidate hash bytes: %w", err) + } + + candidateHash := parachaintypes.CandidateHash{ + Value: common.NewHash(candidateHashBytes), + } + + var collatorID parachaintypes.CollatorID + collatorIDBytes, err := common.HexToBytes(strings.TrimSpace(splits[3])) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting collator id bytes: %w", err) + } + copy(collatorID[:], collatorIDBytes) + + return fetchedCollationInfo{ + relayParent: relayParent, + paraID: parachaintypes.ParaID(paraID), + candidateHash: candidateHash, + collatorID: collatorID, + }, nil +} + type PerRelayParent struct { prospectiveParachainMode parachaintypes.ProspectiveParachainsMode assignment *parachaintypes.ParaID @@ -499,6 +748,7 @@ func (cpvs CollatorProtocolValidatorSide) handleNetworkBridgeEvents(msg any) err // TODO #4155 case networkbridgeevents.OurViewChange: // TODO #4156 + cpvs.handleOurViewChange(msg.View) case networkbridgeevents.UpdatedAuthorityIDs: // NOTE: The validator side doesn't deal with AuthorityDiscovery IDs case networkbridgeevents.PeerMessage[collatorprotocolmessages.CollationProtocol]: diff --git a/dot/parachain/network-bridge/events/events.go b/dot/parachain/network-bridge/events/events.go index 08d6bdb2ea..727999f913 100644 --- a/dot/parachain/network-bridge/events/events.go +++ b/dot/parachain/network-bridge/events/events.go @@ -55,9 +55,9 @@ type PeerViewChange struct { // Up to `N` (5?) chain heads. type View struct { // a bounded amount of chain heads - heads []common.Hash //nolint + Heads []common.Hash //nolint // the highest known finalized number - finalizedNumber uint32 //nolint + FinalizedNumber uint32 //nolint } type OurViewChange struct { diff --git a/dot/parachain/network-bridge/receiver.go b/dot/parachain/network-bridge/receiver.go index af5181fb19..9aa4c10695 100644 --- a/dot/parachain/network-bridge/receiver.go +++ b/dot/parachain/network-bridge/receiver.go @@ -285,11 +285,15 @@ func (nbr *NetworkBridgeReceiver) updateOurView() error { finalizedNumber: nbr.finalizedNumber, } + // If this is the first view update since becoming active, but our view is empty, + // there is no need to send anything. if nbr.localView == nil { *nbr.localView = newView return nil } + // we only want to send a view update if the heads have changed. + // A change in finalized block is not enough to trigger a view update. if nbr.localView.checkHeadsEqual(newView) { // nothing to update return nil