Skip to content

Commit

Permalink
add timeout checks to precheck
Browse files Browse the repository at this point in the history
statement distribution skeleton (#4169)

Statement Distribution subsystem is responsible for distributing signed statements that we have generated and forwarding statements generated by our peers.
This commit just introduces a skeleton for it.
- implements the subsystem interface
- registers the subsystem with overseer
- add the processMessage method with messages to be handled

Issue #3583

cleanup remove un-used code

add comments

regenerate mocks

feat(dot/parachain): receiver side of network bridge (#3955)

- handled active leaves update message
- handled block finalized message
- relay network protocol messages
- handle view update message for receiver side of the network bridge #3864
- decode messages to wire message (PR #4188), Fixes #4108
- handle our view change in collator protocol validator side (PR #4197), Issue #4156
- Handle network bridge messages (UpdateAuthorityIDs and NewGossipTopology) Fixes #3862
- process network events for receiver side of network bridge Issue Fixes #3863
-  process overseer signals for network bridge Fixes #3861
  • Loading branch information
edwardmack committed Oct 4, 2024
1 parent 57936ca commit 2750b20
Show file tree
Hide file tree
Showing 39 changed files with 1,133 additions and 577 deletions.
46 changes: 46 additions & 0 deletions dot/parachain/availability-store/mock_runtime_instance_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion dot/parachain/backing/candidate_backing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store"
candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
statementdistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
Expand Down Expand Up @@ -83,7 +84,7 @@ func mockOverseer(t *testing.T, subsystemToOverseer chan any) {
parachaintypes.ProvisionerMessageProvisionableData,
parachaintypes.ProspectiveParachainsMessageCandidateBacked,
collatorprotocolmessages.Backed,
parachaintypes.StatementDistributionMessageBacked:
statementdistributionmessages.Backed:
continue
default:
t.Errorf("unknown type: %T\n", data)
Expand Down
13 changes: 7 additions & 6 deletions dot/parachain/backing/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
"github.com/ChainSafe/gossamer/dot/parachain/overseer"
statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto"
Expand All @@ -34,7 +35,7 @@ func stopOverseerAndWaitForCompletion(overseer *overseer.MockableOverseer) {
func initBackingAndOverseerMock(t *testing.T) (*backing.CandidateBacking, *overseer.MockableOverseer) {
t.Helper()

overseerMock := overseer.NewMockableOverseer(t)
overseerMock := overseer.NewMockableOverseer(t, true)

backing := backing.New(overseerMock.SubsystemsToOverseer)
overseerMock.RegisterSubsystem(backing)
Expand Down Expand Up @@ -396,7 +397,7 @@ func TestSecondsValidCandidate(t *testing.T) {

distribute := func(msg any) bool {
// we have seconded a candidate and shared the statement to peers
share, ok := msg.(parachaintypes.StatementDistributionMessageShare)
share, ok := msg.(statementedistributionmessages.Share)
if !ok {
return false
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestCandidateReachesQuorum(t *testing.T) {
validate := validResponseForValidateFromExhaustive(headData, pvd)

distribute := func(msg any) bool {
_, ok := msg.(parachaintypes.StatementDistributionMessageShare)
_, ok := msg.(statementedistributionmessages.Share)
return ok
}

Expand Down Expand Up @@ -844,7 +845,7 @@ func TestCanNotSecondMultipleCandidatesPerRelayParent(t *testing.T) {

distribute := func(msg any) bool {
// we have seconded a candidate and shared the statement to peers
share, ok := msg.(parachaintypes.StatementDistributionMessageShare)
share, ok := msg.(statementedistributionmessages.Share)
if !ok {
return false
}
Expand Down Expand Up @@ -996,7 +997,7 @@ func TestNewLeafDoesNotClobberOld(t *testing.T) {

distribute := func(msg any) bool {
// we have seconded a candidate and shared the statement to peers
share, ok := msg.(parachaintypes.StatementDistributionMessageShare)
share, ok := msg.(statementedistributionmessages.Share)
if !ok {
return false
}
Expand Down Expand Up @@ -1137,7 +1138,7 @@ func TestConflictingStatementIsMisbehavior(t *testing.T) {
validate := validResponseForValidateFromExhaustive(headData, pvd)

distribute := func(msg any) bool {
_, ok := msg.(parachaintypes.StatementDistributionMessageShare)
_, ok := msg.(statementedistributionmessages.Share)
return ok
}

Expand Down
3 changes: 2 additions & 1 deletion dot/parachain/backing/per_relay_parent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store"
candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation"
collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/dot/parachain/util"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (rpState *perRelayParentState) postImportStatement(subSystemToOverseer chan
}

// Notify statement distribution of backed candidate.
subSystemToOverseer <- parachaintypes.StatementDistributionMessageBacked(candidateHash)
subSystemToOverseer <- statementedistributionmessages.Backed(candidateHash)

} else {
// TODO: figure out what this comment means by 'avoid cycles'.
Expand Down
3 changes: 2 additions & 1 deletion dot/parachain/backing/validated_candidate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages"
statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/keystore"
Expand Down Expand Up @@ -345,7 +346,7 @@ func signImportAndDistributeStatement(
}

// `Share` must always be sent before `Backed`. We send the latter in `postImportStatement` below.
subSystemToOverseer <- parachaintypes.StatementDistributionMessageShare{
subSystemToOverseer <- statementedistributionmessages.Share{
RelayParent: rpState.relayParent,
SignedFullStatementWithPVD: signedStatementWithPVD,
}
Expand Down
67 changes: 19 additions & 48 deletions dot/parachain/candidate-validation/candidate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package candidatevalidation

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -13,10 +12,8 @@ import (
parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/dot/parachain/util"
validationprotocol "github.com/ChainSafe/gossamer/dot/parachain/validation-protocol"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/klauspost/compress/zstd"
)

// CandidateValidation is a parachain subsystem that validates candidate parachain blocks
Expand Down Expand Up @@ -234,6 +231,8 @@ func (cv *CandidateValidation) validateFromChainState(msg ValidateFromChainState
}
}

// precheckPvF prechecks the parachain validation function by retrieving the validation code from the runtime instance
// and calling the precheck method on the pvf host. It returns the precheck outcome.
func (cv *CandidateValidation) precheckPvF(relayParent common.Hash, validationCodeHash parachaintypes.
ValidationCodeHash) PreCheckOutcome {
runtimeInstance, err := cv.BlockState.GetRuntime(relayParent)
Expand All @@ -254,11 +253,6 @@ func (cv *CandidateValidation) precheckPvF(relayParent common.Hash, validationCo
return PreCheckOutcomeInvalid
}

codeDecompressed, err := maybeCompressedBlobDecompress(*code, validationprotocol.MaxValidationMessageSize)
if err != nil {
logger.Errorf("failed to decompress code: %w", err)
return PreCheckOutcomeInvalid
}
kind := parachaintypes.NewPvfPrepTimeoutKind()
err = kind.SetValue(parachaintypes.Precheck{})
if err != nil {
Expand All @@ -269,7 +263,7 @@ func (cv *CandidateValidation) precheckPvF(relayParent common.Hash, validationCo
prepTimeout := pvfPrepTimeout(*executorParams, kind)

pvf := PvFPrepData{
code: codeDecompressed,
code: *code,
codeHash: validationCodeHash,
executorParams: *executorParams,
prepTimeout: prepTimeout,
Expand All @@ -283,56 +277,33 @@ func (cv *CandidateValidation) precheckPvF(relayParent common.Hash, validationCo
return PreCheckOutcomeValid
}

// An arbitrary prefix, that indicates a blob beginning with should be decompressed with
// Zstd compression.
//
// This differs from the WASM magic bytes, so real WASM blobs will not have this prefix.
var zstdPrefix = []byte{82, 188, 83, 118, 70, 219, 142, 5}

func maybeCompressedBlobDecompress(blob []byte, bombLimit uint64) ([]byte, error) {
// todo handle check for bombLimit
if len(blob) < len(zstdPrefix) {
return nil, fmt.Errorf("blob is too short")
}
if bytes.Equal(blob[0:len(zstdPrefix)], zstdPrefix) {
decoder, err := zstd.NewReader(nil)
if err != nil {
return nil, fmt.Errorf("creating zstd decoder: %w", err)
}
defer decoder.Close()
return decoder.DecodeAll(blob[len(zstdPrefix):], nil)
} else {
return blob, nil
}
}

// To determine the amount of timeout time for the pvf execution.
//
// Precheck
// pvfPrepTimeout To determine the amount of timeout time for the pvf execution.
//
// The time period after which the preparation worker is considered
//
// unresponsive and will be killed.
//
// Prepare
// The time period after which the preparation worker is considered
// unresponsive and will be killed.
func pvfPrepTimeout(params parachaintypes.ExecutorParams, kind parachaintypes.PvfPrepTimeoutKind) time.Duration {
for i, param := range params {
for _, param := range params {
val, err := param.Value()
if err != nil {
fmt.Printf("some error %v", err)
logger.Errorf("determining parameter values %w", err)
}
switch val := val.(type) {
case parachaintypes.PvfPrepTimeout:
// TODO: determine if we need to covert millisec to nano seconds for duration
return time.Duration(val.Millisec)
default:
fmt.Printf("default\n")
return time.Duration(val.Millisec * 1000000)
}
fmt.Printf("i %v, p %v", i, param)
}

// todo: handle case for getting default time from kind
return time.Second
timeoutKind, err := kind.Value()
if err != nil {
return time.Second * 2
}
switch timeoutKind.(type) {
case parachaintypes.Precheck:
return time.Second * 2
case parachaintypes.Lenient:
return time.Second * 10
default:
return time.Second * 2
}
}
Loading

0 comments on commit 2750b20

Please sign in to comment.