diff --git a/consensus/helper/handler.go b/consensus/helper/handler.go index 6d6e62a9167..3848b80a6f6 100644 --- a/consensus/helper/handler.go +++ b/consensus/helper/handler.go @@ -69,10 +69,8 @@ func NewConsensusHandler(coord peer.MessageHandlerCoordinator, consensusQueueSize = DefaultConsensusQueueSize } - pe, _ := handler.To() - handler.consenterChan = make(chan *util.Message, consensusQueueSize) - getEngineImpl().consensusFan.RegisterChannel(pe.ID, handler.consenterChan) + getEngineImpl().consensusFan.AddFaninChannel(handler.consenterChan) return handler, nil } diff --git a/consensus/util/messagefan.go b/consensus/util/messagefan.go index 4d4a66edcd0..dad3b0e962a 100644 --- a/consensus/util/messagefan.go +++ b/consensus/util/messagefan.go @@ -38,7 +38,7 @@ type Message struct { // MessageFan contains the reference to the peer's MessageHandlerCoordinator type MessageFan struct { - ins map[*pb.PeerID]<-chan *Message + ins []<-chan *Message out chan *Message lock sync.Mutex } @@ -46,35 +46,38 @@ type MessageFan struct { // NewMessageFan will return an initialized MessageFan func NewMessageFan() *MessageFan { return &MessageFan{ - ins: make(map[*pb.PeerID]<-chan *Message), + ins: []<-chan *Message{}, out: make(chan *Message), } } -// RegisterChannel is intended to be invoked by Handler to add a channel to be fan-ed in -func (fan *MessageFan) RegisterChannel(sender *pb.PeerID, channel <-chan *Message) { +// AddFaninChannel is intended to be invoked by Handler to add a channel to be fan-ed in +func (fan *MessageFan) AddFaninChannel(channel <-chan *Message) { fan.lock.Lock() defer fan.lock.Unlock() - if _, ok := fan.ins[sender]; ok { - logger.Warningf("Received duplicate connection from %v, switching to new connection", sender) - } else { - logger.Infof("Registering connection from %v", sender) + for _, c := range fan.ins { + if c == channel { + logger.Warningf("Received duplicate connection") + return + } } - fan.ins[sender] = channel + fan.ins = append(fan.ins, channel) go func() { for msg := range channel { fan.out <- msg } - logger.Infof("Connection from peer %v terminated", sender) - fan.lock.Lock() defer fan.lock.Unlock() - delete(fan.ins, sender) + for i, c := range fan.ins { + if c == channel { + fan.ins = append(fan.ins[:i], fan.ins[i+1:]...) + } + } }() } diff --git a/consensus/util/messagefan_test.go b/consensus/util/messagefan_test.go index b30d983cc75..ebf19e2f9ed 100644 --- a/consensus/util/messagefan_test.go +++ b/consensus/util/messagefan_test.go @@ -17,11 +17,8 @@ limitations under the License. package util import ( - "fmt" "testing" "time" - - pb "github.com/hyperledger/fabric/protos" ) func TestFanIn(t *testing.T) { @@ -32,8 +29,7 @@ func TestFanIn(t *testing.T) { for i := 0; i < Channels; i++ { c := make(chan *Message, Messages/2) - pid := &pb.PeerID{Name: fmt.Sprintf("%d", i)} - fh.RegisterChannel(pid, c) + fh.AddFaninChannel(c) go func() { for j := 0; j < Messages; j++ { c <- &Message{} @@ -67,8 +63,7 @@ func TestFanIn(t *testing.T) { func TestFanChannelClose(t *testing.T) { fh := NewMessageFan() c := make(chan *Message) - pid := &pb.PeerID{Name: "1"} - fh.RegisterChannel(pid, c) + fh.AddFaninChannel(c) close(c) for i := 0; i < 100; i++ {