From 506e786244729dbe3cbf1f9675d13c58b13114a0 Mon Sep 17 00:00:00 2001 From: jiangyaoguo Date: Mon, 5 Sep 2016 10:38:08 +0800 Subject: [PATCH] Fix incorrect channel register Before receiving a hello message from fanin message channel, we can't get peerID of fanin message channel. So RegisterChannel always gets RegisterChannel(nil, channel). PeerID here is supposed to do something like session management (eg. duplicated peer connection check). But at this time we can't get peerID. The session management is done by class "Impl" with RegisterHandler and DeregisterHandler. Here PeerID is unnecessary, and it will always be nil. Incorrect warning of "Received duplicate connection from nil, switching to new connection" will be emitted everytime there is a new connection. Change-Id: I9f1bf6287576497acefb46a543fd4ac62d062665 Signed-off-by: jiangyaoguo --- consensus/helper/handler.go | 4 +--- consensus/util/messagefan.go | 27 +++++++++++++++------------ consensus/util/messagefan_test.go | 9 ++------- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/consensus/helper/handler.go b/consensus/helper/handler.go index 6d6e62a9167..3848b80a6f6 100644 --- a/consensus/helper/handler.go +++ b/consensus/helper/handler.go @@ -69,10 +69,8 @@ func NewConsensusHandler(coord peer.MessageHandlerCoordinator, consensusQueueSize = DefaultConsensusQueueSize } - pe, _ := handler.To() - handler.consenterChan = make(chan *util.Message, consensusQueueSize) - getEngineImpl().consensusFan.RegisterChannel(pe.ID, handler.consenterChan) + getEngineImpl().consensusFan.AddFaninChannel(handler.consenterChan) return handler, nil } diff --git a/consensus/util/messagefan.go b/consensus/util/messagefan.go index 4d4a66edcd0..dad3b0e962a 100644 --- a/consensus/util/messagefan.go +++ b/consensus/util/messagefan.go @@ -38,7 +38,7 @@ type Message struct { // MessageFan contains the reference to the peer's MessageHandlerCoordinator type MessageFan struct { - ins map[*pb.PeerID]<-chan *Message + ins []<-chan *Message out chan *Message lock sync.Mutex } @@ -46,35 +46,38 @@ type MessageFan struct { // NewMessageFan will return an initialized MessageFan func NewMessageFan() *MessageFan { return &MessageFan{ - ins: make(map[*pb.PeerID]<-chan *Message), + ins: []<-chan *Message{}, out: make(chan *Message), } } -// RegisterChannel is intended to be invoked by Handler to add a channel to be fan-ed in -func (fan *MessageFan) RegisterChannel(sender *pb.PeerID, channel <-chan *Message) { +// AddFaninChannel is intended to be invoked by Handler to add a channel to be fan-ed in +func (fan *MessageFan) AddFaninChannel(channel <-chan *Message) { fan.lock.Lock() defer fan.lock.Unlock() - if _, ok := fan.ins[sender]; ok { - logger.Warningf("Received duplicate connection from %v, switching to new connection", sender) - } else { - logger.Infof("Registering connection from %v", sender) + for _, c := range fan.ins { + if c == channel { + logger.Warningf("Received duplicate connection") + return + } } - fan.ins[sender] = channel + fan.ins = append(fan.ins, channel) go func() { for msg := range channel { fan.out <- msg } - logger.Infof("Connection from peer %v terminated", sender) - fan.lock.Lock() defer fan.lock.Unlock() - delete(fan.ins, sender) + for i, c := range fan.ins { + if c == channel { + fan.ins = append(fan.ins[:i], fan.ins[i+1:]...) + } + } }() } diff --git a/consensus/util/messagefan_test.go b/consensus/util/messagefan_test.go index b30d983cc75..ebf19e2f9ed 100644 --- a/consensus/util/messagefan_test.go +++ b/consensus/util/messagefan_test.go @@ -17,11 +17,8 @@ limitations under the License. package util import ( - "fmt" "testing" "time" - - pb "github.com/hyperledger/fabric/protos" ) func TestFanIn(t *testing.T) { @@ -32,8 +29,7 @@ func TestFanIn(t *testing.T) { for i := 0; i < Channels; i++ { c := make(chan *Message, Messages/2) - pid := &pb.PeerID{Name: fmt.Sprintf("%d", i)} - fh.RegisterChannel(pid, c) + fh.AddFaninChannel(c) go func() { for j := 0; j < Messages; j++ { c <- &Message{} @@ -67,8 +63,7 @@ func TestFanIn(t *testing.T) { func TestFanChannelClose(t *testing.T) { fh := NewMessageFan() c := make(chan *Message) - pid := &pb.PeerID{Name: "1"} - fh.RegisterChannel(pid, c) + fh.AddFaninChannel(c) close(c) for i := 0; i < 100; i++ {