Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send AppErrors from p2p SDK #2753

Merged
merged 23 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/antithesishq/antithesis-sdk-go v0.3.8
github.com/ava-labs/coreth v0.13.5-rc.0
github.com/ava-labs/coreth v0.13.6-rc.0.0.20240702181806-8d07639a0335
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax
github.com/antithesishq/antithesis-sdk-go v0.3.8 h1:OvGoHxIcOXFJLyn9IJQ5DzByZ3YVAWNBc394ObzDRb8=
github.com/antithesishq/antithesis-sdk-go v0.3.8/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.13.5-rc.0 h1:PJQbR9o2RrW3j9ba4r1glXnmM2PNAP3xR569+gMcBd0=
github.com/ava-labs/coreth v0.13.5-rc.0/go.mod h1:cm5c12xo5NiTgtbmeduv8i2nYdzgkczz9Wm3yiwwTRU=
github.com/ava-labs/coreth v0.13.6-rc.0.0.20240702181806-8d07639a0335 h1:HNQK9tY77AY+s7FTGYJIPYVSOjFKjeFtMfWum/M75E4=
github.com/ava-labs/coreth v0.13.6-rc.0.0.20240702181806-8d07639a0335/go.mod h1:x/anZpvTZaQoYqpeYBoSoAd3Bfm8dVPgqoAGuCvdP+E=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95 h1:dOVbtdnZL++pENdTCNZ1nu41eYDQkTML4sWebDnnq8c=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
33 changes: 33 additions & 0 deletions network/p2p/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import "github.com/ava-labs/avalanchego/snow/engine/common"

var (
// ErrUnexpected should be used to indicate that a request failed due to a
// generic error
ErrUnexpected = &common.AppError{
Code: -1,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to handle this - we decided to reserve negative error codes for our use + positive error codes for developer use. This is a weird intersection since the p2p package is technically a vm implementation detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, the VM opts into this standard when it uses the p2p sdk, we can probably highlight it in the readme for the sdk.

Message: "unexpected error",
}
// ErrUnregisteredHandler should be used to indicate that a request failed
// due to it not matching a registered handler
ErrUnregisteredHandler = &common.AppError{
Code: -2,
Message: "unregistered handler",
}
// ErrNotValidator should be used to indicate that a request failed due to
// the requesting peer not being a validator
ErrNotValidator = &common.AppError{
Code: -3,
Message: "not a validator",
}
// ErrThrottled should be used to indicate that a request failed due to the
// requesting peer exceeding a rate limit
ErrThrottled = &common.AppError{
Code: -4,
Message: "throttled",
}
)
16 changes: 11 additions & 5 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/logging"
)
Expand Down Expand Up @@ -43,10 +44,10 @@ type Handler[T Gossipable] struct {
targetResponseSize int
}

func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, error) {
func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) {
filter, salt, err := ParseAppRequest(requestBytes)
if err != nil {
return nil, err
return nil, p2p.ErrUnexpected
}

responseSize := 0
Expand All @@ -73,14 +74,19 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
return responseSize <= h.targetResponseSize
})
if err != nil {
return nil, err
return nil, p2p.ErrUnexpected
}

if err := h.metrics.observeMessage(sentPullLabels, len(gossipBytes), responseSize); err != nil {
return nil, err
return nil, p2p.ErrUnexpected
}

return MarshalAppResponse(gossipBytes)
response, err := MarshalAppResponse(gossipBytes)
if err != nil {
return nil, p2p.ErrUnexpected
}

return response, nil
}

func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
Expand Down
18 changes: 8 additions & 10 deletions network/p2p/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package p2p

import (
"context"
"errors"
"time"

"go.uber.org/zap"
Expand All @@ -17,8 +16,6 @@ import (
)

var (
ErrNotValidator = errors.New("not a validator")

_ Handler = (*NoOpHandler)(nil)
_ Handler = (*TestHandler)(nil)
_ Handler = (*ValidatorHandler)(nil)
Expand All @@ -33,13 +30,14 @@ type Handler interface {
gossipBytes []byte,
)
// AppRequest is called when handling an AppRequest message.
// Returns the bytes for the response corresponding to [requestBytes]
// Sends a response with the response corresponding to [requestBytes] or
// an application-defined error.
AppRequest(
ctx context.Context,
nodeID ids.NodeID,
deadline time.Time,
requestBytes []byte,
) ([]byte, error)
) ([]byte, *common.AppError)
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
// CrossChainAppRequest is called when handling a CrossChainAppRequest
// message.
// Returns the bytes for the response corresponding to [requestBytes]
Expand All @@ -56,7 +54,7 @@ type NoOpHandler struct{}

func (NoOpHandler) AppGossip(context.Context, ids.NodeID, []byte) {}

func (NoOpHandler) AppRequest(context.Context, ids.NodeID, time.Time, []byte) ([]byte, error) {
func (NoOpHandler) AppRequest(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
return nil, nil
}

Expand Down Expand Up @@ -95,7 +93,7 @@ func (v ValidatorHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, goss
v.handler.AppGossip(ctx, nodeID, gossipBytes)
}

func (v ValidatorHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
func (v ValidatorHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) {
if !v.validatorSet.Has(ctx, nodeID) {
return nil, ErrNotValidator
}
Expand Down Expand Up @@ -128,7 +126,7 @@ func (r *responder) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID
zap.Binary("message", request),
zap.Error(err),
)
return nil
return r.sender.SendAppError(ctx, nodeID, requestID, err.Code, err.Message)
}

return r.sender.SendAppResponse(ctx, nodeID, requestID, appResponse)
Expand All @@ -155,7 +153,7 @@ func (r *responder) CrossChainAppRequest(ctx context.Context, chainID ids.ID, re

type TestHandler struct {
AppGossipF func(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)
AppRequestF func(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error)
AppRequestF func(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError)
CrossChainAppRequestF func(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error)
}

Expand All @@ -167,7 +165,7 @@ func (t TestHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipByt
t.AppGossipF(ctx, nodeID, gossipBytes)
}

func (t TestHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
func (t TestHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) {
if t.AppRequestF == nil {
return nil, nil
}
Expand Down
3 changes: 2 additions & 1 deletion network/p2p/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
)
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestValidatorHandlerAppRequest(t *testing.T) {
name string
validatorSet ValidatorSet
nodeID ids.NodeID
expected error
expected *common.AppError
}{
{
name: "message dropped",
Expand Down
108 changes: 96 additions & 12 deletions network/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestMessageRouting(t *testing.T) {
require.Equal(wantNodeID, nodeID)
require.Equal(wantMsg, msg)
},
AppRequestF: func(_ context.Context, nodeID ids.NodeID, _ time.Time, msg []byte) ([]byte, error) {
AppRequestF: func(_ context.Context, nodeID ids.NodeID, _ time.Time, msg []byte) ([]byte, *common.AppError) {
appRequestCalled = true
require.Equal(wantNodeID, nodeID)
require.Equal(wantMsg, msg)
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestCrossChainAppRequestFailed(t *testing.T) {
}

// Messages for unregistered handlers should be dropped gracefully
func TestMessageForUnregisteredHandler(t *testing.T) {
func TestAppGossipMessageForUnregisteredHandler(t *testing.T) {
tests := []struct {
name string
msg []byte
Expand All @@ -379,26 +379,110 @@ func TestMessageForUnregisteredHandler(t *testing.T) {
AppGossipF: func(context.Context, ids.NodeID, []byte) {
require.Fail("should not be called")
},
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, error) {
require.Fail("should not be called")
return nil, nil
},
CrossChainAppRequestF: func(context.Context, ids.ID, time.Time, []byte) ([]byte, error) {
}
network, err := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "")
require.NoError(err)
require.NoError(network.AddHandler(handlerID, handler))
require.NoError(network.AppGossip(ctx, ids.EmptyNodeID, tt.msg))
})
}
}

// An unregistered handler should gracefully drop messages by responding
// to the requester with a common.AppError
func TestAppRequestMessageForUnregisteredHandler(t *testing.T) {
tests := []struct {
name string
msg []byte
}{
{
name: "nil",
msg: nil,
},
{
name: "empty",
msg: []byte{},
},
{
name: "non-empty",
msg: []byte("foobar"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctx := context.Background()
handler := &TestHandler{
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
require.Fail("should not be called")
return nil, nil
},
}
network, err := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "")

wantNodeID := ids.GenerateTestNodeID()
wantRequestID := uint32(111)

done := make(chan struct{})
sender := &common.SenderTest{}
sender.SendAppErrorF = func(_ context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
defer close(done)

require.Equal(wantNodeID, nodeID)
require.Equal(wantRequestID, requestID)
require.Equal(ErrUnregisteredHandler.Code, errorCode)
require.Equal(ErrUnregisteredHandler.Message, errorMessage)

return nil
}
network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
require.NoError(err)
require.NoError(network.AddHandler(handlerID, handler))

require.NoError(network.AppRequest(ctx, ids.EmptyNodeID, 0, time.Time{}, tt.msg))
require.NoError(network.AppGossip(ctx, ids.EmptyNodeID, tt.msg))
require.NoError(network.CrossChainAppRequest(ctx, ids.Empty, 0, time.Time{}, tt.msg))
require.NoError(network.AppRequest(ctx, wantNodeID, wantRequestID, time.Time{}, tt.msg))
<-done
})
}
}

// A handler that errors should send an AppError to the requesting peer
func TestAppError(t *testing.T) {
require := require.New(t)
ctx := context.Background()
appError := &common.AppError{
Code: 123,
Message: "foo",
}
handler := &TestHandler{
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
return nil, appError
},
}

wantNodeID := ids.GenerateTestNodeID()
wantRequestID := uint32(111)

done := make(chan struct{})
sender := &common.SenderTest{}
sender.SendAppErrorF = func(_ context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
defer close(done)

require.Equal(wantNodeID, nodeID)
require.Equal(wantRequestID, requestID)
require.Equal(appError.Code, errorCode)
require.Equal(appError.Message, errorMessage)

return nil
}
network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
require.NoError(err)
require.NoError(network.AddHandler(handlerID, handler))
msg := PrefixMessage(ProtocolPrefix(handlerID), []byte("message"))

require.NoError(network.AppRequest(ctx, wantNodeID, wantRequestID, time.Time{}, msg))
<-done
}

// A response or timeout for a request we never made should return an error
func TestResponseForUnrequestedRequest(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -427,7 +511,7 @@ func TestResponseForUnrequestedRequest(t *testing.T) {
AppGossipF: func(context.Context, ids.NodeID, []byte) {
require.Fail("should not be called")
},
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, error) {
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
require.Fail("should not be called")
return nil, nil
},
Expand Down
5 changes: 3 additions & 2 deletions network/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui
start := time.Now()
parsedMsg, handler, handlerID, ok := r.parse(request)
if !ok {
r.log.Debug("failed to process message",
r.log.Debug("received message for unregistered handler",
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
zap.Stringer("messageOp", message.AppRequestOp),
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Time("deadline", deadline),
zap.Binary("message", request),
)
return nil

return r.sender.SendAppError(ctx, nodeID, requestID, ErrUnregisteredHandler.Code, ErrUnregisteredHandler.Message)
}

// call the corresponding handler and send back a response to nodeID
Expand Down
12 changes: 4 additions & 8 deletions network/p2p/throttler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@ package p2p

import (
"context"
"errors"
"fmt"
"time"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
)

var (
ErrThrottled = errors.New("throttled")
_ Handler = (*ThrottlerHandler)(nil)
)
var _ Handler = (*ThrottlerHandler)(nil)

func NewThrottlerHandler(handler Handler, throttler Throttler, log logging.Logger) *ThrottlerHandler {
return &ThrottlerHandler{
Expand Down Expand Up @@ -46,9 +42,9 @@ func (t ThrottlerHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, goss
t.handler.AppGossip(ctx, nodeID, gossipBytes)
}

func (t ThrottlerHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
func (t ThrottlerHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) {
if !t.throttler.Handle(nodeID) {
return nil, fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled)
return nil, ErrThrottled
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
}

return t.handler.AppRequest(ctx, nodeID, deadline, requestBytes)
Expand Down
Loading
Loading