Skip to content

Commit

Permalink
fix DeliverService stop
Browse files Browse the repository at this point in the history
Currently when peer stopes, DeliverService.stop will be blocked. So you
can't use "ctrl+c" or "kill" to interrupt or stop peer. Because
DeliverService use a unbuffered channel stopChan to send stop signal.
When peer is a gossip.orgLeader, DeliverService don't receive from
stopChan. So DeliverService.stop will block at "d.stopChan <- true".
Fix the block bug and use a atomic flag to distinguash unexpected
connection error and initiative stop.

Change-Id: If2afd226c5b074e3b78157d84e2f267e741208aa
Signed-off-by: jiangyaoguo <jiangyaoguo@gmail.com>
  • Loading branch information
jiangyaoguo committed Nov 30, 2016
1 parent 2c22539 commit 8e868b8
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion core/committer/noopssinglechain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package noopssinglechain

import (
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -60,6 +61,7 @@ type DeliverService struct {
gossip gossip.Gossip
conn *grpc.ClientConn

stopFlag int32
stopChan chan bool
}

Expand Down Expand Up @@ -172,6 +174,8 @@ func (d *DeliverService) Start() {

// Stop all service and release resources
func (d *DeliverService) Stop() {
atomic.StoreInt32(&d.stopFlag, 1)
d.stopDeliver()
d.stopChan <- true
d.stateProvider.Stop()
d.gossip.Stop()
Expand Down Expand Up @@ -211,13 +215,23 @@ func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
})
}

// Internal function to check whenever we need to finish listening
// for new messages to arrive
func (d *DeliverService) isDone() bool {

return atomic.LoadInt32(&d.stopFlag) == 1
}

func (d *DeliverService) readUntilClose() {
for {
msg, err := d.client.Recv()
if err != nil {
logger.Warningf("Receive error: %s", err.Error())
if d.isDone() {
<-d.stopChan
}
return
}

switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Error:
if t.Error == common.Status_SUCCESS {
Expand Down

0 comments on commit 8e868b8

Please sign in to comment.