Skip to content

Commit

Permalink
Make sbft tests run concurrently to reduce time
Browse files Browse the repository at this point in the history
Made sbft tests to run in parallel by making a different port range
for each test.

Change-Id: I78ac564a886334f1e8a911825a042e27d1392ed0
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Jan 2, 2017
1 parent 2e672af commit 6e8d216
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
81 changes: 46 additions & 35 deletions orderer/sbft/main/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/bootstrap/provisional"
pb "github.com/hyperledger/fabric/orderer/sbft/simplebft"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/golang/protobuf/proto"
)

const keyfile = "testdata/key.pem"
Expand Down Expand Up @@ -93,17 +92,19 @@ func TestMain(m *testing.M) {
}

func TestTwoReplicasBroadcastAndDeliverUsingTheSame(t *testing.T) {
t.Parallel()
startingPort := 2000
skipInShortMode(t)
peers := InitPeers(2)
peers := InitPeers(2, startingPort)
StartPeers(peers)
r, err := Receive(peers[1])
r, err := Receive(peers[1], startingPort)
defer r.Stop()
defer StopPeers(peers)
if err != nil {
t.Errorf("Failed to start up receiver: %s", err)
}
WaitForConnection(peers)
if berr := Broadcast(peers[0], []byte{0, 1, 2, 3, 4}); berr != nil {
if berr := Broadcast(peers[0], startingPort, []byte{0, 1, 2, 3, 4}); berr != nil {
t.Errorf("Failed to broadcast message: %s", berr)
}
if !AssertWithTimeout(func() bool { return r.Received() == 2 }, 30) {
Expand All @@ -112,17 +113,19 @@ func TestTwoReplicasBroadcastAndDeliverUsingTheSame(t *testing.T) {
}

func TestTenReplicasBroadcastAndDeliverUsingDifferent(t *testing.T) {
t.Parallel()
startingPort := 3000
skipInShortMode(t)
peers := InitPeers(10)
peers := InitPeers(10, startingPort)
StartPeers(peers)
r, err := Receive(peers[9])
r, err := Receive(peers[9], startingPort)
defer r.Stop()
defer StopPeers(peers)
if err != nil {
t.Errorf("Failed to start up receiver: %s", err)
}
WaitForConnection(peers)
if berr := Broadcast(peers[1], []byte{0, 1, 2, 3, 4}); berr != nil {
if berr := Broadcast(peers[1], startingPort, []byte{0, 1, 2, 3, 4}); berr != nil {
t.Errorf("Failed to broadcast message: %s", berr)
}
if !AssertWithTimeout(func() bool { return r.Received() == 2 }, 30) {
Expand All @@ -131,21 +134,23 @@ func TestTenReplicasBroadcastAndDeliverUsingDifferent(t *testing.T) {
}

func TestFourReplicasBombedWithBroadcasts(t *testing.T) {
t.Parallel()
startingPort := 4000
skipInShortMode(t)
// Add for debug mode:
// logging.SetLevel(logging.DEBUG, "sbft")
broadcastCount := 15
peers := InitPeers(4)
peers := InitPeers(4, startingPort)
StartPeers(peers)
r, err := Receive(peers[2])
r, err := Receive(peers[2], startingPort)
defer r.Stop()
defer StopPeers(peers)
if err != nil {
t.Errorf("Failed to start up receiver: %s", err)
}
WaitForConnection(peers)
for x := 0; x < broadcastCount; x++ {
if berr := Broadcast(peers[2], []byte{0, 1, 2, byte(x), 3, 4, byte(x)}); berr != nil {
if berr := Broadcast(peers[2], startingPort, []byte{0, 1, 2, byte(x), 3, 4, byte(x)}); berr != nil {
t.Errorf("Failed to broadcast message: %s (broadcast number %d)", berr, x)
}
time.Sleep(time.Second)
Expand All @@ -156,19 +161,21 @@ func TestFourReplicasBombedWithBroadcasts(t *testing.T) {
}

func TestTenReplicasBombedWithBroadcasts(t *testing.T) {
t.Parallel()
startingPort := 5000
skipInShortMode(t)
broadcastCount := 15
peers := InitPeers(10)
peers := InitPeers(10, startingPort)
StartPeers(peers)
r, err := Receive(peers[3])
r, err := Receive(peers[3], startingPort)
defer r.Stop()
defer StopPeers(peers)
if err != nil {
t.Errorf("Failed to start up receiver: %s", err)
}
WaitForConnection(peers)
for x := 0; x < broadcastCount; x++ {
if berr := Broadcast(peers[2], []byte{0, 1, 2, byte(x), 3, 4, byte(x)}); berr != nil {
if berr := Broadcast(peers[2], startingPort, []byte{0, 1, 2, byte(x), 3, 4, byte(x)}); berr != nil {
t.Errorf("Failed to broadcast message: %s (broadcast number %d)", berr, x)
}
time.Sleep(time.Second)
Expand All @@ -179,25 +186,26 @@ func TestTenReplicasBombedWithBroadcasts(t *testing.T) {
}

func TestTenReplicasBombedWithBroadcastsIfLedgersConsistent(t *testing.T) {
t.Parallel()
startingPort := 6000
skipInShortMode(t)
broadcastCount := 15
peers := InitPeers(10)
peers := InitPeers(10, startingPort)
StartPeers(peers)
defer StopPeers(peers)

receivers := make([]*receiver, 0, len(peers))
for i := 0; i < len(peers); i++ {
r, err := Receive(peers[i])
r, err := Receive(peers[i], startingPort)
if err != nil {
t.Errorf("Failed to start up receiver: %s", err)
}
receivers = append(receivers, r)
defer r.Stop()
}
defer StopPeers(peers)

WaitForConnection(peers)
for x := 0; x < broadcastCount; x++ {
if berr := Broadcast(peers[2], []byte{0, 1, 2, byte(x), 3, 4, byte(x)}); berr != nil {
if berr := Broadcast(peers[2], startingPort, []byte{0, 1, 2, byte(x), 3, 4, byte(x)}); berr != nil {
t.Errorf("Failed to broadcast message: %s (broadcast number %d)", berr, x)
}
time.Sleep(time.Second)
Expand All @@ -209,17 +217,20 @@ func TestTenReplicasBombedWithBroadcastsIfLedgersConsistent(t *testing.T) {
t.Errorf("Failed to receive some messages. (Received %d)", r.Received())
}
}
for _, r := range receivers {
r.Stop()
}
}

func InitPeers(num uint64) []*peer {
func InitPeers(num uint64, startingPort int) []*peer {
peers := make([]*peer, 0, num)
certFiles := make([]string, 0, num)
for i := uint64(0); i < num; i++ {
certFiles = append(certFiles, generateCertificate(i, keyfile))
}
configFile := generateConfig(num, certFiles)
configFile := generateConfig(num, startingPort, certFiles)
for i := uint64(0); i < num; i++ {
peers = append(peers, initPeer(i, configFile, certFiles[i]))
peers = append(peers, initPeer(i, startingPort, configFile, certFiles[i]))
}
return peers
}
Expand All @@ -236,7 +247,7 @@ func StopPeers(peers []*peer) {
}
}

func generateConfig(peerNum uint64, certFiles []string) string {
func generateConfig(peerNum uint64, startingPort int, certFiles []string) string {
tempDir, err := ioutil.TempDir("", "sbft_test_config")
panicOnError(err)
c := pb.Config{
Expand All @@ -249,7 +260,7 @@ func generateConfig(peerNum uint64, certFiles []string) string {
for i := uint64(0); i < peerNum; i++ {
pc := make(map[string]string)
pc["Id"] = fmt.Sprintf("%d", i)
pc["Address"] = listenAddress(i)
pc["Address"] = listenAddress(i, startingPort)
pc["Cert"] = certFiles[i]
peerconfigs = append(peerconfigs, pc)
}
Expand All @@ -263,13 +274,13 @@ func generateConfig(peerNum uint64, certFiles []string) string {
return conffilepath
}

func initPeer(uid uint64, configFile string, certFile string) (p *peer) {
func initPeer(uid uint64, startingPort int, configFile string, certFile string) (p *peer) {
tempDir, err := ioutil.TempDir("", "sbft_test")
panicOnError(err)
os.RemoveAll(tempDir)
c := flags{init: configFile,
listenAddr: listenAddress(uid),
grpcAddr: grpcAddress(uid),
listenAddr: listenAddress(uid, startingPort),
grpcAddr: grpcAddress(uid, startingPort),
certFile: certFile,
keyFile: keyfile,
dataDir: tempDir}
Expand All @@ -294,9 +305,9 @@ func (p *peer) stop() {
p.cmd.Wait()
}

func Broadcast(p *peer, bytes []byte) error {
func Broadcast(p *peer, startingPort int, bytes []byte) error {
timeout := 10 * time.Second
grpcAddress := grpcAddress(p.id)
grpcAddress := grpcAddress(p.id, startingPort)
clientconn, err := grpc.Dial(grpcAddress, grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithInsecure())
if err != nil {
return err
Expand All @@ -318,11 +329,11 @@ func Broadcast(p *peer, bytes []byte) error {
return nil
}

func Receive(p *peer) (*receiver, error) {
func Receive(p *peer, startingPort int) (*receiver, error) {
retch := make(chan []byte, 100)
signals := make(chan bool, 100)
timeout := 4 * time.Second
grpcAddress := grpcAddress(p.id)
grpcAddress := grpcAddress(p.id, startingPort)
clientconn, err := grpc.Dial(grpcAddress, grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithInsecure())
if err != nil {
return nil, err
Expand Down Expand Up @@ -390,12 +401,12 @@ func WaitForConnection(peers []*peer) {
_ = <-time.After(time.Duration(m) * time.Second)
}

func listenAddress(id uint64) string {
return fmt.Sprintf(":%d", 6000+2*id)
func listenAddress(id uint64, startingPort int) string {
return fmt.Sprintf(":%d", startingPort+2*int(id))
}

func grpcAddress(id uint64) string {
return fmt.Sprintf(":%d", 6001+2*id)
func grpcAddress(id uint64, startingPort int) string {
return fmt.Sprintf(":%d", startingPort+1+2*int(id))
}

func generateCertificate(id uint64, keyFile string) string {
Expand Down
1 change: 1 addition & 0 deletions orderer/sbft/main/sbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var NEEDED_UPDATES = 2
var NEEDED_SENT = 1

func TestSbftPeer(t *testing.T) {
t.Parallel()
skipInShortMode(t)
tempDir, err := ioutil.TempDir("", "sbft_test")
if err != nil {
Expand Down

0 comments on commit 6e8d216

Please sign in to comment.