From 319c851b9e6c1e5879d52f12e33ca182df019dee Mon Sep 17 00:00:00 2001 From: yacovm Date: Thu, 9 Apr 2020 12:14:26 +0300 Subject: [PATCH] Properly handle malformed gossip envelopes If a malformed envelope is read from the stream, an error is propagated synchronously up the stack. However, the envelope is unmarshaled into a nil message which is also propagated further up the stack asynchronously. Under very rare circumstances, the error is picked up later than the message, and a nil pointer panic occurs. This patch fixes this by returning early in case of an error. Change-Id: Ia17767ec2483d83d5fa4e7e22514c539232108a8 Signed-off-by: yacovm --- gossip/comm/comm_test.go | 121 ++++++++++++++++++++++++- gossip/comm/conn.go | 7 ++ gossip/comm/mocks/mock_stream.go | 151 +++++++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 3 deletions(-) create mode 100644 gossip/comm/mocks/mock_stream.go diff --git a/gossip/comm/comm_test.go b/gossip/comm/comm_test.go index 812dc75e204..257dda87009 100644 --- a/gossip/comm/comm_test.go +++ b/gossip/comm/comm_test.go @@ -12,7 +12,9 @@ import ( "crypto/hmac" "crypto/sha256" "crypto/tls" + "errors" "fmt" + "io" "math/rand" "net" "strconv" @@ -22,9 +24,11 @@ import ( "time" "github.com/hyperledger/fabric/bccsp/factory" + "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/metrics/disabled" "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/gossip/api" + gmocks "github.com/hyperledger/fabric/gossip/comm/mocks" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/identity" "github.com/hyperledger/fabric/gossip/metrics" @@ -928,15 +932,126 @@ func TestPresumedDead(t *testing.T) { } } +func TestReadFromStream(t *testing.T) { + stream := &gmocks.MockStream{} + stream.On("CloseSend").Return(nil) + stream.On("Recv").Return(&proto.Envelope{Payload: []byte{1}}, nil).Once() + stream.On("Recv").Return(nil, errors.New("stream closed")).Once() + + conn := newConnection(nil, nil, stream, nil, disabledMetrics, ConnConfig{1, 1}) + conn.logger = flogging.MustGetLogger("test") + + errChan := make(chan error, 2) + msgChan := make(chan *proto.SignedGossipMessage, 1) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + conn.readFromStream(errChan, msgChan) + }() + + select { + case <-msgChan: + assert.Fail(t, "malformed message shouldn't have been received") + case <-time.After(time.Millisecond * 100): + assert.Len(t, errChan, 1) + } + + conn.close() + wg.Wait() +} + +func TestSendBadEnvelope(t *testing.T) { + comm1, port := newCommInstance(t, naiveSec) + defer comm1.Stop() + + stream, err := establishSession(t, port) + assert.NoError(t, err) + + inc := comm1.Accept(acceptAll) + + goodMsg := createGossipMsg() + err = stream.Send(goodMsg.Envelope) + assert.NoError(t, err) + + select { + case goodMsgReceived := <-inc: + assert.Equal(t, goodMsg.Envelope.Payload, goodMsgReceived.GetSourceEnvelope().Payload) + case <-time.After(time.Minute): + assert.Fail(t, "Didn't receive message within a timely manner") + return + } + + // Next, we corrupt a message and send it until the stream is closed forcefully from the remote peer + start := time.Now() + for { + badMsg := createGossipMsg() + badMsg.Envelope.Payload = []byte{1} + err = stream.Send(badMsg.Envelope) + if err != nil { + assert.Equal(t, io.EOF, err) + break + } + if time.Now().After(start.Add(time.Second * 30)) { + assert.Fail(t, "Didn't close stream within a timely manner") + return + } + } +} + +func establishSession(t *testing.T, port int) (proto.Gossip_GossipStreamClient, error) { + cert := GenerateCertificatesOrPanic() + secureOpts := grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + Certificates: []tls.Certificate{cert}, + })) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + endpoint := fmt.Sprintf("127.0.0.1:%d", port) + conn, err := grpc.DialContext(ctx, endpoint, secureOpts, grpc.WithBlock()) + assert.NoError(t, err, "%v", err) + if err != nil { + return nil, err + } + cl := proto.NewGossipClient(conn) + stream, err := cl.GossipStream(context.Background()) + assert.NoError(t, err, "%v", err) + if err != nil { + return nil, err + } + + clientCertHash := certHashFromRawCert(cert.Certificate[0]) + pkiID := common.PKIidType([]byte{1, 2, 3}) + c := &commImpl{} + assert.NoError(t, err, "%v", err) + msg, _ := c.createConnectionMsg(pkiID, clientCertHash, []byte{1, 2, 3}, func(msg []byte) ([]byte, error) { + mac := hmac.New(sha256.New, hmacKey) + mac.Write(msg) + return mac.Sum(nil), nil + }) + // Send your own connection message + stream.Send(msg.Envelope) + // Wait for connection message from the other side + envelope, err := stream.Recv() + if err != nil { + return nil, err + } + assert.NotNil(t, envelope) + return stream, nil +} + func createGossipMsg() *proto.SignedGossipMessage { - msg, _ := (&proto.GossipMessage{ + msg := &proto.GossipMessage{ Tag: proto.GossipMessage_EMPTY, Nonce: uint64(rand.Int()), Content: &proto.GossipMessage_DataMsg{ DataMsg: &proto.DataMessage{}, }, - }).NoopSign() - return msg + } + sMsg, _ := msg.NoopSign() + return sMsg } func remotePeer(port int) *RemotePeer { diff --git a/gossip/comm/conn.go b/gossip/comm/conn.go index 5555ac1b32a..5e6ff325559 100644 --- a/gossip/comm/conn.go +++ b/gossip/comm/conn.go @@ -385,6 +385,7 @@ func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.S if err != nil { errChan <- err conn.logger.Warningf("Got error, aborting: %v", err) + return } select { case msgChan <- msg: @@ -419,3 +420,9 @@ type msgSending struct { envelope *proto.Envelope onErr func(error) } + +//go:generate mockery -dir . -name MockStream -case underscore -output mocks/ + +type MockStream interface { + proto.Gossip_GossipStreamClient +} diff --git a/gossip/comm/mocks/mock_stream.go b/gossip/comm/mocks/mock_stream.go new file mode 100644 index 00000000000..8ed9d81af62 --- /dev/null +++ b/gossip/comm/mocks/mock_stream.go @@ -0,0 +1,151 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + gossip "github.com/hyperledger/fabric/protos/gossip" + metadata "google.golang.org/grpc/metadata" + + mock "github.com/stretchr/testify/mock" +) + +// MockStream is an autogenerated mock type for the MockStream type +type MockStream struct { + mock.Mock +} + +// CloseSend provides a mock function with given fields: +func (_m *MockStream) CloseSend() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Context provides a mock function with given fields: +func (_m *MockStream) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// Header provides a mock function with given fields: +func (_m *MockStream) Header() (metadata.MD, error) { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Recv provides a mock function with given fields: +func (_m *MockStream) Recv() (*gossip.Envelope, error) { + ret := _m.Called() + + var r0 *gossip.Envelope + if rf, ok := ret.Get(0).(func() *gossip.Envelope); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*gossip.Envelope) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *MockStream) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Send provides a mock function with given fields: _a0 +func (_m *MockStream) Send(_a0 *gossip.Envelope) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*gossip.Envelope) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *MockStream) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Trailer provides a mock function with given fields: +func (_m *MockStream) Trailer() metadata.MD { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + return r0 +}