Skip to content

Commit

Permalink
[FAB-9132] Fix data race in core/chaincode/shim
Browse files Browse the repository at this point in the history
Fix data race in the shim

A data race existed between reading and setting the error value returned
from stream.Recv(). Modify the shim to avoid the race and add a bit more
alignment between the peer and shim stream processing functions.

Fix data races in MockCCComm

- Quit was causing a race between channel send and close. Resolved by
  wiring a channel to Run and closing send and receive after the done
  channel had closed.
- The keep alive go routine was racing on the keepAlive message during
  send. Wired the done channel into the keep-alive go routine.
- Some tests are setting responses while go routines are running. Work
  around this bad behavior with a lock to avoid restructuring tests.

Change-Id: I49bab82336bbbeb941b7db2d8bd279f254b4ec9f
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Jul 2, 2018
1 parent d1c39f9 commit 635bce5
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 111 deletions.
39 changes: 19 additions & 20 deletions common/mocks/peer/mockccstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package peer

import (
"fmt"
"sync"
"time"

pb "github.com/hyperledger/fabric/protos/peer"
Expand Down Expand Up @@ -58,8 +59,10 @@ type MockCCComm struct {
recvStream chan *pb.ChaincodeMessage
sendStream chan *pb.ChaincodeMessage
respIndex int
respLock sync.Mutex
respSet *MockResponseSet
pong bool
skipClose bool
}

func (s *MockCCComm) SetName(newname string) {
Expand All @@ -68,9 +71,6 @@ func (s *MockCCComm) SetName(newname string) {

//Send sends a message
func (s *MockCCComm) Send(msg *pb.ChaincodeMessage) error {
defer func() {
recover()
}()
s.sendStream <- msg
return nil
}
Expand All @@ -96,16 +96,11 @@ func (s *MockCCComm) GetSendStream() chan *pb.ChaincodeMessage {
return s.sendStream
}

//Quit closes the channels...this will also close chaincode side
//Quit closes the channels...
func (s *MockCCComm) Quit() {
if s.recvStream != nil {
if !s.skipClose {
close(s.recvStream)
s.recvStream = nil
}

if s.sendStream != nil {
close(s.sendStream)
s.sendStream = nil
}
}

Expand All @@ -126,31 +121,32 @@ func (s *MockCCComm) SetKeepAlive(ka *pb.ChaincodeMessage) {

//SetResponses sets responses for an Init or Invoke
func (s *MockCCComm) SetResponses(respSet *MockResponseSet) {
s.respLock.Lock()
s.respSet = respSet
s.respIndex = 0
s.respLock.Unlock()
}

//keepAlive
func (s *MockCCComm) ka() {
defer recover()
func (s *MockCCComm) ka(done <-chan struct{}) {
for {
if s.keepAlive == nil {
return
}
s.Send(s.keepAlive)
time.Sleep(10 * time.Millisecond)
select {
case <-time.After(10 * time.Millisecond):
case <-done:
return
}
}
}

//Run receives and sends indefinitely
func (s *MockCCComm) Run() error {
func (s *MockCCComm) Run(done <-chan struct{}) error {
//start the keepalive
go s.ka()

//if we started keep alive this will kill it
defer func() {
s.keepAlive = nil
}()
go s.ka(done)
defer s.Quit()

for {
msg, err := s.Recv()
Expand Down Expand Up @@ -181,6 +177,9 @@ func (s *MockCCComm) respond(msg *pb.ChaincodeMessage) error {
return nil
}

s.respLock.Lock()
defer s.respLock.Unlock()

var err error
if s.respIndex < len(s.respSet.Responses) {
mockResp := s.respSet.Responses[s.respIndex]
Expand Down
2 changes: 1 addition & 1 deletion common/mocks/peer/mockpeerccsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (mp *MockPeerCCSupport) GetCCMirror(name string) *MockCCComm {
return nil
}

return &MockCCComm{name: name, recvStream: s.sendStream, sendStream: s.recvStream}
return &MockCCComm{name: name, recvStream: s.sendStream, sendStream: s.recvStream, skipClose: true}
}

//RemoveCC removes a cc
Expand Down
5 changes: 4 additions & 1 deletion core/chaincode/chaincode_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ func startCC(t *testing.T, channelID string, ccname string, chaincodeSupport *Ch
done <- err
}

ccDone := make(chan struct{})
defer close(ccDone)

//start the mock peer
go func() {
respSet := &mockpeer.MockResponseSet{
Expand All @@ -346,7 +349,7 @@ func startCC(t *testing.T, channelID string, ccname string, chaincodeSupport *Ch
},
}
ccSide.SetResponses(respSet)
ccSide.Run()
ccSide.Run(ccDone)
}()

ccSide.Send(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: putils.MarshalOrPanic(&pb.ChaincodeID{Name: ccname + ":0"}), Txid: "0", ChannelId: channelID})
Expand Down
37 changes: 18 additions & 19 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
msg *pb.ChaincodeMessage
err error
}
msgAvail := make(chan *recvMsg)
msgAvail := make(chan *recvMsg, 1)

receiveMessage := func() {
in, err := h.chatStream.Recv()
Expand All @@ -374,32 +374,31 @@ func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
go receiveMessage()
for {
select {
case rMsg := <-msgAvail:
case rmsg := <-msgAvail:
switch {
// Defer the deregistering of the this handler.
if rMsg.err == io.EOF {
chaincodeLogger.Debugf("received EOF, ending chaincode support stream: %s", rMsg.err)
return rMsg.err
} else if rMsg.err != nil {
err := errors.Wrap(rMsg.err, "receive failed")
case rmsg.err == io.EOF:
chaincodeLogger.Debugf("received EOF, ending chaincode support stream: %s", rmsg.err)
return rmsg.err
case rmsg.err != nil:
err := errors.Wrap(rmsg.err, "receive failed")
chaincodeLogger.Errorf("handling chaincode support stream: %+v", err)
return err
} else if rMsg.msg == nil {
case rmsg.msg == nil:
err := errors.New("received nil message, ending chaincode support stream")
chaincodeLogger.Debugf("%+v", err)
return err
default:
err := h.handleMessage(rmsg.msg)
if err != nil {
err = errors.WithMessage(err, "error handling message, ending stream")
chaincodeLogger.Errorf("[%s] %+v", shorttxid(rmsg.msg.Txid), err)
return err
}

go receiveMessage()
}

in := rMsg.msg

err := h.handleMessage(in)
if err != nil {
err = errors.WithMessage(err, "error handling message, ending stream")
chaincodeLogger.Errorf("[%s] %+v", shorttxid(in.Txid), err)
return err
}

go receiveMessage()

case sendErr := <-h.errChan:
err := errors.Wrapf(sendErr, "received error while sending message, ending chaincode support stream")
chaincodeLogger.Errorf("%s", err)
Expand Down
100 changes: 44 additions & 56 deletions core/chaincode/shim/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,83 +262,71 @@ func newPeerClientConnection() (*grpc.ClientConn, error) {
}

func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {

// Create the shim handler responsible for all control logic
handler := newChaincodeHandler(stream, cc)

defer stream.CloseSend()

// Send the ChaincodeID during register.
chaincodeID := &pb.ChaincodeID{Name: chaincodename}
payload, err := proto.Marshal(chaincodeID)
if err != nil {
return errors.Wrap(err, "error marshalling chaincodeID during chaincode registration")
}

// Register on the stream
chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
return errors.WithMessage(err, "error sending chaincode REGISTER")
}
waitc := make(chan struct{})

// holds return values from gRPC Recv below
type recvMsg struct {
msg *pb.ChaincodeMessage
err error
}
msgAvail := make(chan *recvMsg, 1)
errc := make(chan error)
go func() {
defer close(waitc)
msgAvail := make(chan *pb.ChaincodeMessage)
var in *pb.ChaincodeMessage
recv := true
for {
in = nil
err = nil
if recv {
recv = false
go func() {
var in2 *pb.ChaincodeMessage
in2, err = stream.Recv()
errc <- err
msgAvail <- in2
}()
}
select {
case sendErr := <-errc:
//serialSendAsync successful?
if sendErr == nil {
continue
}
//no, bail
err = errors.Wrap(sendErr, "error sending")
return
case in = <-msgAvail:
if err == io.EOF {
err = errors.Wrapf(err, "received EOF, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return
} else if err != nil {
chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err)
return
} else if in == nil {
err = errors.New("received nil message, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return

receiveMessage := func() {
in, err := stream.Recv()
msgAvail <- &recvMsg{in, err}
}

go receiveMessage()
for {
select {
case rmsg := <-msgAvail:
switch {
case rmsg.err == io.EOF:
err = errors.Wrapf(rmsg.err, "received EOF, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return err
case rmsg.err != nil:
err := errors.Wrap(rmsg.err, "receive failed")
chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err)
return err
case rmsg.msg == nil:
err := errors.New("received nil message, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return err
default:
chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(rmsg.msg.Txid), rmsg.msg.Type)
err := handler.handleMessage(rmsg.msg, errc)
if err != nil {
err = errors.WithMessage(err, "error handling message")
return err
}
chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(in.Txid), in.Type)
recv = true
}

err = handler.handleMessage(in, errc)
if err != nil {
err = errors.WithMessage(err, "error handling message")
return
go receiveMessage()
}

//keepalive messages are PONGs to the fabric's PINGs
if in.Type == pb.ChaincodeMessage_KEEPALIVE {
chaincodeLogger.Debug("Sending KEEPALIVE response")
//ignore any errors, maybe next KEEPALIVE will work
handler.serialSendAsync(in, nil)
case sendErr := <-errc:
if sendErr != nil {
err := errors.Wrap(sendErr, "error sending")
return err
}
}
}()
<-waitc
return err
}
}

// -- init stub ---
Expand Down
3 changes: 2 additions & 1 deletion core/chaincode/shim/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ func (handler *Handler) handleCreated(msg *pb.ChaincodeMessage, errc chan error)
// handleMessage message handles loop for shim side of chaincode/peer stream.
func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error {
if msg.Type == pb.ChaincodeMessage_KEEPALIVE {
// Received a keep alive message, we don't do anything with it for now
chaincodeLogger.Debug("Sending KEEPALIVE response")
handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work
return nil
}
chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state)
Expand Down
Loading

0 comments on commit 635bce5

Please sign in to comment.