Skip to content

Commit

Permalink
Merge "Tune gossip default bootstrap and skip localhost conn"
Browse files Browse the repository at this point in the history
  • Loading branch information
hacera-jonathan authored and Gerrit Code Review committed Feb 24, 2017
2 parents 9a3aa1d + e96eea9 commit 187f36a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 2 deletions.
31 changes: 31 additions & 0 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"sync/atomic"
"time"

"strconv"
"strings"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
Expand Down Expand Up @@ -153,8 +156,25 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember) {
}

func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
if d.self.InternalEndpoint == nil || len(d.self.InternalEndpoint.Endpoint) == 0 {
d.logger.Panic("Internal endpoint is empty:", d.self.InternalEndpoint)
}

if len(strings.Split(d.self.InternalEndpoint.Endpoint, ":")) != 2 {
d.logger.Panicf("Self endpoint %s isn't formatted as 'host:port'", d.self.InternalEndpoint.Endpoint)
}

myPort, err := strconv.ParseInt(strings.Split(d.self.InternalEndpoint.Endpoint, ":")[1], 10, 64)
if err != nil {
d.logger.Panicf("Self endpoint %s has not valid port'", d.self.InternalEndpoint.Endpoint)
}

d.logger.Info("Entering:", endpoints)
defer d.logger.Info("Exiting")
endpoints = filterOutLocalhost(endpoints, int(myPort))
if len(endpoints) == 0 {
return
}

for !d.somePeerIsKnown() {
var wg sync.WaitGroup
Expand Down Expand Up @@ -822,3 +842,14 @@ func getAliveExpirationCheckInterval() time.Duration {
func getReconnectInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.reconnectInterval", getAliveExpirationTimeout())
}

func filterOutLocalhost(endpoints []string, port int) []string {
var returnedEndpoints []string
for _, endpoint := range endpoints {
if endpoint == fmt.Sprintf("127.0.0.1:%d", port) || endpoint == fmt.Sprintf("localhost:%d", port) {
continue
}
returnedEndpoints = append(returnedEndpoints, endpoint)
}
return returnedEndpoints
}
42 changes: 42 additions & 0 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
Expand All @@ -54,6 +55,7 @@ type dummyCommModule struct {
incMsgs chan *proto.GossipMessage
lastSeqs map[string]uint64
shouldGossip bool
mock *mock.Mock
}

type gossipMsg struct {
Expand Down Expand Up @@ -92,10 +94,16 @@ func (comm *dummyCommModule) Gossip(msg *proto.GossipMessage) {
}

func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.GossipMessage) {
var mock *mock.Mock
comm.lock.RLock()
_, exists := comm.streams[peer.Endpoint]
mock = comm.mock
comm.lock.RUnlock()

if mock != nil {
mock.Called()
}

if !exists {
if comm.Ping(peer) == false {
fmt.Printf("Ping to %v failed\n", peer.Endpoint)
Expand All @@ -111,6 +119,10 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
comm.lock.Lock()
defer comm.lock.Unlock()

if comm.mock != nil {
comm.mock.Called()
}

_, alreadyExists := comm.streams[peer.Endpoint]
if !alreadyExists {
newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure())
Expand Down Expand Up @@ -451,6 +463,22 @@ func TestGossipDiscoveryStopping(t *testing.T) {

}

func TestGossipDiscoverySkipConnectingToLocalhostBootstrap(t *testing.T) {
t.Parallel()
inst := createDiscoveryInstance(11611, "d1", []string{"localhost:11611", "127.0.0.1:11611"})
inst.comm.lock.Lock()
inst.comm.mock = &mock.Mock{}
inst.comm.mock.On("SendToPeer", mock.Anything).Run(func(mock.Arguments) {
t.Fatal("Should not have connected to any peer")
})
inst.comm.mock.On("Ping", mock.Anything).Run(func(mock.Arguments) {
t.Fatal("Should not have connected to any peer")
})
inst.comm.lock.Unlock()
time.Sleep(time.Second * 3)
waitUntilOrFailBlocking(t, inst.Stop)
}

func TestConvergence(t *testing.T) {
t.Parallel()
// scenario:
Expand Down Expand Up @@ -522,6 +550,20 @@ func TestConfigFromFile(t *testing.T) {
assert.Equal(t, time.Duration(25)*time.Second, getReconnectInterval())
}

func TestFilterOutLocalhost(t *testing.T) {
endpoints := []string{"localhost:5611", "127.0.0.1:5611", "1.2.3.4:5611"}
assert.Len(t, filterOutLocalhost(endpoints, 5611), 1)
endpoints = []string{"1.2.3.4:5611"}
assert.Len(t, filterOutLocalhost(endpoints, 5611), 1)
endpoints = []string{"localhost:5611", "127.0.0.1:5611"}
assert.Len(t, filterOutLocalhost(endpoints, 5611), 0)
// Check slice returned is a copy
endpoints = []string{"localhost:5611", "127.0.0.1:5611", "1.2.3.4:5611"}
endpoints2 := filterOutLocalhost(endpoints, 5611)
endpoints2[0] = "bla bla"
assert.NotEqual(t, endpoints[2], endpoints[0])
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
start := time.Now()
limit := start.UnixNano() + timeout.Nanoseconds()
Expand Down
4 changes: 2 additions & 2 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ peer:

# Gossip related configuration
gossip:
bootstrap: 0.0.0.0:7051
# For debug - is peer is its org leader and should pass blocks from orderer to other peers in org
bootstrap: 127.0.0.1:7051
# Is peer is its org leader and should pass blocks from orderer to other peers in org
orgLeader: true
# ID of this instance
endpoint:
Expand Down

0 comments on commit 187f36a

Please sign in to comment.