Skip to content

Commit

Permalink
Merge "Fix incorrect channel register"
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborh-da authored and Gerrit Code Review committed Sep 12, 2016
2 parents c891561 + 506e786 commit 491eb19
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
4 changes: 1 addition & 3 deletions consensus/helper/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,8 @@ func NewConsensusHandler(coord peer.MessageHandlerCoordinator,
consensusQueueSize = DefaultConsensusQueueSize
}

pe, _ := handler.To()

handler.consenterChan = make(chan *util.Message, consensusQueueSize)
getEngineImpl().consensusFan.RegisterChannel(pe.ID, handler.consenterChan)
getEngineImpl().consensusFan.AddFaninChannel(handler.consenterChan)

return handler, nil
}
Expand Down
27 changes: 15 additions & 12 deletions consensus/util/messagefan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,46 @@ type Message struct {

// MessageFan contains the reference to the peer's MessageHandlerCoordinator
type MessageFan struct {
ins map[*pb.PeerID]<-chan *Message
ins []<-chan *Message
out chan *Message
lock sync.Mutex
}

// NewMessageFan will return an initialized MessageFan
func NewMessageFan() *MessageFan {
return &MessageFan{
ins: make(map[*pb.PeerID]<-chan *Message),
ins: []<-chan *Message{},
out: make(chan *Message),
}
}

// RegisterChannel is intended to be invoked by Handler to add a channel to be fan-ed in
func (fan *MessageFan) RegisterChannel(sender *pb.PeerID, channel <-chan *Message) {
// AddFaninChannel is intended to be invoked by Handler to add a channel to be fan-ed in
func (fan *MessageFan) AddFaninChannel(channel <-chan *Message) {
fan.lock.Lock()
defer fan.lock.Unlock()

if _, ok := fan.ins[sender]; ok {
logger.Warningf("Received duplicate connection from %v, switching to new connection", sender)
} else {
logger.Infof("Registering connection from %v", sender)
for _, c := range fan.ins {
if c == channel {
logger.Warningf("Received duplicate connection")
return
}
}

fan.ins[sender] = channel
fan.ins = append(fan.ins, channel)

go func() {
for msg := range channel {
fan.out <- msg
}

logger.Infof("Connection from peer %v terminated", sender)

fan.lock.Lock()
defer fan.lock.Unlock()

delete(fan.ins, sender)
for i, c := range fan.ins {
if c == channel {
fan.ins = append(fan.ins[:i], fan.ins[i+1:]...)
}
}
}()
}

Expand Down
9 changes: 2 additions & 7 deletions consensus/util/messagefan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ limitations under the License.
package util

import (
"fmt"
"testing"
"time"

pb "github.com/hyperledger/fabric/protos"
)

func TestFanIn(t *testing.T) {
Expand All @@ -32,8 +29,7 @@ func TestFanIn(t *testing.T) {

for i := 0; i < Channels; i++ {
c := make(chan *Message, Messages/2)
pid := &pb.PeerID{Name: fmt.Sprintf("%d", i)}
fh.RegisterChannel(pid, c)
fh.AddFaninChannel(c)
go func() {
for j := 0; j < Messages; j++ {
c <- &Message{}
Expand Down Expand Up @@ -67,8 +63,7 @@ func TestFanIn(t *testing.T) {
func TestFanChannelClose(t *testing.T) {
fh := NewMessageFan()
c := make(chan *Message)
pid := &pb.PeerID{Name: "1"}
fh.RegisterChannel(pid, c)
fh.AddFaninChannel(c)
close(c)

for i := 0; i < 100; i++ {
Expand Down

0 comments on commit 491eb19

Please sign in to comment.