From 635bce55c087e7029caf9513b706bef00618a609 Mon Sep 17 00:00:00 2001 From: Matthew Sykes Date: Mon, 18 Jun 2018 11:48:33 -0400 Subject: [PATCH] [FAB-9132] Fix data race in core/chaincode/shim Fix data race in the shim A data race existed between reading and setting the error value returned from stream.Recv(). Modify the shim to avoid the race and add a bit more alignment between the peer and shim stream processing functions. Fix data races in MockCCComm - Quit was causing a race between channel send and close. Resolved by wiring a channel to Run and closing send and receive after the done channel had closed. - The keep alive go routine was racing on the keepAlive message during send. Wired the done channel into the keep-alive go routine. - Some tests are setting responses while go routines are running. Work around this bad behavior with a lock to avoid restructuring tests. Change-Id: I49bab82336bbbeb941b7db2d8bd279f254b4ec9f Signed-off-by: Matthew Sykes --- common/mocks/peer/mockccstream.go | 39 +++++---- common/mocks/peer/mockpeerccsupport.go | 2 +- core/chaincode/chaincode_support_test.go | 5 +- core/chaincode/handler.go | 37 ++++----- core/chaincode/shim/chaincode.go | 100 ++++++++++------------- core/chaincode/shim/handler.go | 3 +- core/chaincode/shim/shim_test.go | 25 +++--- 7 files changed, 100 insertions(+), 111 deletions(-) diff --git a/common/mocks/peer/mockccstream.go b/common/mocks/peer/mockccstream.go index 7e1f5e2726f..44a50b572d8 100644 --- a/common/mocks/peer/mockccstream.go +++ b/common/mocks/peer/mockccstream.go @@ -18,6 +18,7 @@ package peer import ( "fmt" + "sync" "time" pb "github.com/hyperledger/fabric/protos/peer" @@ -58,8 +59,10 @@ type MockCCComm struct { recvStream chan *pb.ChaincodeMessage sendStream chan *pb.ChaincodeMessage respIndex int + respLock sync.Mutex respSet *MockResponseSet pong bool + skipClose bool } func (s *MockCCComm) SetName(newname string) { @@ -68,9 +71,6 @@ func (s *MockCCComm) SetName(newname string) { //Send sends a message func (s *MockCCComm) Send(msg *pb.ChaincodeMessage) error { - defer func() { - recover() - }() s.sendStream <- msg return nil } @@ -96,16 +96,11 @@ func (s *MockCCComm) GetSendStream() chan *pb.ChaincodeMessage { return s.sendStream } -//Quit closes the channels...this will also close chaincode side +//Quit closes the channels... func (s *MockCCComm) Quit() { - if s.recvStream != nil { + if !s.skipClose { close(s.recvStream) - s.recvStream = nil - } - - if s.sendStream != nil { close(s.sendStream) - s.sendStream = nil } } @@ -126,31 +121,32 @@ func (s *MockCCComm) SetKeepAlive(ka *pb.ChaincodeMessage) { //SetResponses sets responses for an Init or Invoke func (s *MockCCComm) SetResponses(respSet *MockResponseSet) { + s.respLock.Lock() s.respSet = respSet s.respIndex = 0 + s.respLock.Unlock() } //keepAlive -func (s *MockCCComm) ka() { - defer recover() +func (s *MockCCComm) ka(done <-chan struct{}) { for { if s.keepAlive == nil { return } s.Send(s.keepAlive) - time.Sleep(10 * time.Millisecond) + select { + case <-time.After(10 * time.Millisecond): + case <-done: + return + } } } //Run receives and sends indefinitely -func (s *MockCCComm) Run() error { +func (s *MockCCComm) Run(done <-chan struct{}) error { //start the keepalive - go s.ka() - - //if we started keep alive this will kill it - defer func() { - s.keepAlive = nil - }() + go s.ka(done) + defer s.Quit() for { msg, err := s.Recv() @@ -181,6 +177,9 @@ func (s *MockCCComm) respond(msg *pb.ChaincodeMessage) error { return nil } + s.respLock.Lock() + defer s.respLock.Unlock() + var err error if s.respIndex < len(s.respSet.Responses) { mockResp := s.respSet.Responses[s.respIndex] diff --git a/common/mocks/peer/mockpeerccsupport.go b/common/mocks/peer/mockpeerccsupport.go index 7ddd02cd178..c7724957cd4 100644 --- a/common/mocks/peer/mockpeerccsupport.go +++ b/common/mocks/peer/mockpeerccsupport.go @@ -58,7 +58,7 @@ func (mp *MockPeerCCSupport) GetCCMirror(name string) *MockCCComm { return nil } - return &MockCCComm{name: name, recvStream: s.sendStream, sendStream: s.recvStream} + return &MockCCComm{name: name, recvStream: s.sendStream, sendStream: s.recvStream, skipClose: true} } //RemoveCC removes a cc diff --git a/core/chaincode/chaincode_support_test.go b/core/chaincode/chaincode_support_test.go index c3ec78c82bf..039e9b18ec5 100644 --- a/core/chaincode/chaincode_support_test.go +++ b/core/chaincode/chaincode_support_test.go @@ -335,6 +335,9 @@ func startCC(t *testing.T, channelID string, ccname string, chaincodeSupport *Ch done <- err } + ccDone := make(chan struct{}) + defer close(ccDone) + //start the mock peer go func() { respSet := &mockpeer.MockResponseSet{ @@ -346,7 +349,7 @@ func startCC(t *testing.T, channelID string, ccname string, chaincodeSupport *Ch }, } ccSide.SetResponses(respSet) - ccSide.Run() + ccSide.Run(ccDone) }() ccSide.Send(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: putils.MarshalOrPanic(&pb.ChaincodeID{Name: ccname + ":0"}), Txid: "0", ChannelId: channelID}) diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index 7dbfd4d945f..d47eb814e98 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -364,7 +364,7 @@ func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error { msg *pb.ChaincodeMessage err error } - msgAvail := make(chan *recvMsg) + msgAvail := make(chan *recvMsg, 1) receiveMessage := func() { in, err := h.chatStream.Recv() @@ -374,32 +374,31 @@ func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error { go receiveMessage() for { select { - case rMsg := <-msgAvail: + case rmsg := <-msgAvail: + switch { // Defer the deregistering of the this handler. - if rMsg.err == io.EOF { - chaincodeLogger.Debugf("received EOF, ending chaincode support stream: %s", rMsg.err) - return rMsg.err - } else if rMsg.err != nil { - err := errors.Wrap(rMsg.err, "receive failed") + case rmsg.err == io.EOF: + chaincodeLogger.Debugf("received EOF, ending chaincode support stream: %s", rmsg.err) + return rmsg.err + case rmsg.err != nil: + err := errors.Wrap(rmsg.err, "receive failed") chaincodeLogger.Errorf("handling chaincode support stream: %+v", err) return err - } else if rMsg.msg == nil { + case rmsg.msg == nil: err := errors.New("received nil message, ending chaincode support stream") chaincodeLogger.Debugf("%+v", err) return err + default: + err := h.handleMessage(rmsg.msg) + if err != nil { + err = errors.WithMessage(err, "error handling message, ending stream") + chaincodeLogger.Errorf("[%s] %+v", shorttxid(rmsg.msg.Txid), err) + return err + } + + go receiveMessage() } - in := rMsg.msg - - err := h.handleMessage(in) - if err != nil { - err = errors.WithMessage(err, "error handling message, ending stream") - chaincodeLogger.Errorf("[%s] %+v", shorttxid(in.Txid), err) - return err - } - - go receiveMessage() - case sendErr := <-h.errChan: err := errors.Wrapf(sendErr, "received error while sending message, ending chaincode support stream") chaincodeLogger.Errorf("%s", err) diff --git a/core/chaincode/shim/chaincode.go b/core/chaincode/shim/chaincode.go index 06bf6b391f1..aa68216223e 100644 --- a/core/chaincode/shim/chaincode.go +++ b/core/chaincode/shim/chaincode.go @@ -262,83 +262,71 @@ func newPeerClientConnection() (*grpc.ClientConn, error) { } func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error { - // Create the shim handler responsible for all control logic handler := newChaincodeHandler(stream, cc) - defer stream.CloseSend() + // Send the ChaincodeID during register. chaincodeID := &pb.ChaincodeID{Name: chaincodename} payload, err := proto.Marshal(chaincodeID) if err != nil { return errors.Wrap(err, "error marshalling chaincodeID during chaincode registration") } + // Register on the stream chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER) if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil { return errors.WithMessage(err, "error sending chaincode REGISTER") } - waitc := make(chan struct{}) + + // holds return values from gRPC Recv below + type recvMsg struct { + msg *pb.ChaincodeMessage + err error + } + msgAvail := make(chan *recvMsg, 1) errc := make(chan error) - go func() { - defer close(waitc) - msgAvail := make(chan *pb.ChaincodeMessage) - var in *pb.ChaincodeMessage - recv := true - for { - in = nil - err = nil - if recv { - recv = false - go func() { - var in2 *pb.ChaincodeMessage - in2, err = stream.Recv() - errc <- err - msgAvail <- in2 - }() - } - select { - case sendErr := <-errc: - //serialSendAsync successful? - if sendErr == nil { - continue - } - //no, bail - err = errors.Wrap(sendErr, "error sending") - return - case in = <-msgAvail: - if err == io.EOF { - err = errors.Wrapf(err, "received EOF, ending chaincode stream") - chaincodeLogger.Debugf("%+v", err) - return - } else if err != nil { - chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err) - return - } else if in == nil { - err = errors.New("received nil message, ending chaincode stream") - chaincodeLogger.Debugf("%+v", err) - return + + receiveMessage := func() { + in, err := stream.Recv() + msgAvail <- &recvMsg{in, err} + } + + go receiveMessage() + for { + select { + case rmsg := <-msgAvail: + switch { + case rmsg.err == io.EOF: + err = errors.Wrapf(rmsg.err, "received EOF, ending chaincode stream") + chaincodeLogger.Debugf("%+v", err) + return err + case rmsg.err != nil: + err := errors.Wrap(rmsg.err, "receive failed") + chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err) + return err + case rmsg.msg == nil: + err := errors.New("received nil message, ending chaincode stream") + chaincodeLogger.Debugf("%+v", err) + return err + default: + chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(rmsg.msg.Txid), rmsg.msg.Type) + err := handler.handleMessage(rmsg.msg, errc) + if err != nil { + err = errors.WithMessage(err, "error handling message") + return err } - chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(in.Txid), in.Type) - recv = true - } - err = handler.handleMessage(in, errc) - if err != nil { - err = errors.WithMessage(err, "error handling message") - return + go receiveMessage() } - //keepalive messages are PONGs to the fabric's PINGs - if in.Type == pb.ChaincodeMessage_KEEPALIVE { - chaincodeLogger.Debug("Sending KEEPALIVE response") - //ignore any errors, maybe next KEEPALIVE will work - handler.serialSendAsync(in, nil) + case sendErr := <-errc: + if sendErr != nil { + err := errors.Wrap(sendErr, "error sending") + return err } } - }() - <-waitc - return err + } } // -- init stub --- diff --git a/core/chaincode/shim/handler.go b/core/chaincode/shim/handler.go index c80b5ce1097..2312a197c18 100644 --- a/core/chaincode/shim/handler.go +++ b/core/chaincode/shim/handler.go @@ -728,7 +728,8 @@ func (handler *Handler) handleCreated(msg *pb.ChaincodeMessage, errc chan error) // handleMessage message handles loop for shim side of chaincode/peer stream. func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error { if msg.Type == pb.ChaincodeMessage_KEEPALIVE { - // Received a keep alive message, we don't do anything with it for now + chaincodeLogger.Debug("Sending KEEPALIVE response") + handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work return nil } chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) diff --git a/core/chaincode/shim/shim_test.go b/core/chaincode/shim/shim_test.go index 7ff93ebd7f4..b882aee4636 100644 --- a/core/chaincode/shim/shim_test.go +++ b/core/chaincode/shim/shim_test.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "testing" - "time" "github.com/hyperledger/fabric/common/flogging" mockpeer "github.com/hyperledger/fabric/common/mocks/peer" @@ -567,13 +566,16 @@ func TestInvoke(t *testing.T) { done <- err } + peerDone := make(chan struct{}) + defer close(peerDone) + //start the mock peer go func() { respSet := &mockpeer.MockResponseSet{errorFunc, nil, []*mockpeer.MockResponse{ {&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER}, &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}}}} peerSide.SetResponses(respSet) peerSide.SetKeepAlive(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}) - err := peerSide.Run() + err := peerSide.Run(peerDone) assert.NoError(t, err, "peer side run failed") }() @@ -831,9 +833,6 @@ func TestInvoke(t *testing.T) { //wait for done processDone(t, done, false) - - time.Sleep(1 * time.Second) - peerSide.Quit() } func TestStartInProc(t *testing.T) { @@ -849,13 +848,16 @@ func TestStartInProc(t *testing.T) { done <- err } + peerDone := make(chan struct{}) + defer close(peerDone) + //start the mock peer go func() { respSet := &mockpeer.MockResponseSet{doneFunc, nil, []*mockpeer.MockResponse{ {&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER}, &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}}}} peerSide.SetResponses(respSet) peerSide.SetKeepAlive(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}) - err := peerSide.Run() + err := peerSide.Run(peerDone) assert.NoError(t, err, "peer side run failed") }() @@ -867,9 +869,6 @@ func TestStartInProc(t *testing.T) { channelId := "testchannel" peerSide.Send(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY, Txid: "1", ChannelId: channelId}) - - time.Sleep(1 * time.Second) - peerSide.Quit() } func TestCC2CC(t *testing.T) { @@ -888,13 +887,16 @@ func TestCC2CC(t *testing.T) { done <- err } + peerDone := make(chan struct{}) + defer close(peerDone) + //start the mock peer go func() { respSet := &mockpeer.MockResponseSet{errorFunc, nil, []*mockpeer.MockResponse{ {&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER}, &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}}}} peerSide.SetResponses(respSet) peerSide.SetKeepAlive(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}) - err := peerSide.Run() + err := peerSide.Run(peerDone) assert.NoError(t, err, "peer side run failed") }() @@ -944,9 +946,6 @@ func TestCC2CC(t *testing.T) { //wait for done processDone(t, done, false) - - time.Sleep(1 * time.Second) - peerSide.Quit() } func TestRealPeerStream(t *testing.T) {