Skip to content

Commit dd98f31

Browse files
author
Jason Yellick
committed
[FAB-5273] Perf oriented broadcast_msg
The existing broadcast_timestamp only operates on one go routine. This CR allows the number of go routines to be specified on the CLI. It also signs messages as the signature verification is the piece which CPU bounds the orderer. Additionally, the message size may now be specified as a parameter. Change-Id: I2edf0c891161b66044d4713e3e928c1067fd6470 Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
1 parent 5101b9e commit dd98f31

File tree

2 files changed

+137
-105
lines changed

2 files changed

+137
-105
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package main
8+
9+
import (
10+
"flag"
11+
"fmt"
12+
"os"
13+
"sync"
14+
"time"
15+
16+
"github.com/hyperledger/fabric/common/crypto"
17+
"github.com/hyperledger/fabric/common/localmsp"
18+
"github.com/hyperledger/fabric/common/tools/configtxgen/provisional"
19+
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
20+
"github.com/hyperledger/fabric/orderer/common/localconfig"
21+
cb "github.com/hyperledger/fabric/protos/common"
22+
ab "github.com/hyperledger/fabric/protos/orderer"
23+
"github.com/hyperledger/fabric/protos/utils"
24+
25+
"golang.org/x/net/context"
26+
"google.golang.org/grpc"
27+
)
28+
29+
type broadcastClient struct {
30+
client ab.AtomicBroadcast_BroadcastClient
31+
signer crypto.LocalSigner
32+
chainID string
33+
}
34+
35+
// newBroadcastClient creates a simple instance of the broadcastClient interface
36+
func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient, chainID string, signer crypto.LocalSigner) *broadcastClient {
37+
return &broadcastClient{client: client, chainID: chainID, signer: signer}
38+
}
39+
40+
func (s *broadcastClient) broadcast(transaction []byte) error {
41+
env, err := utils.CreateSignedEnvelope(cb.HeaderType_MESSAGE, s.chainID, s.signer, &cb.Envelope{Signature: transaction}, 0, 0)
42+
if err != nil {
43+
panic(err)
44+
}
45+
time.Sleep(time.Second)
46+
return s.client.Send(env)
47+
}
48+
49+
func (s *broadcastClient) getAck() error {
50+
msg, err := s.client.Recv()
51+
if err != nil {
52+
return err
53+
}
54+
if msg.Status != cb.Status_SUCCESS {
55+
return fmt.Errorf("Got unexpected status: %v - %s", msg.Status, msg.Info)
56+
}
57+
return nil
58+
}
59+
60+
func main() {
61+
config := config.Load()
62+
63+
// Load local MSP
64+
err := mspmgmt.LoadLocalMsp(config.General.LocalMSPDir, config.General.BCCSP, config.General.LocalMSPID)
65+
if err != nil { // Handle errors reading the config file
66+
fmt.Println("Failed to initialize local MSP:", err)
67+
os.Exit(0)
68+
}
69+
70+
signer := localmsp.NewSigner()
71+
72+
var chainID string
73+
var serverAddr string
74+
var messages uint64
75+
var goroutines uint64
76+
var msgSize uint64
77+
78+
flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.")
79+
flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to broadcast to.")
80+
flag.Uint64Var(&messages, "messages", 1, "The number of messages to broadcast.")
81+
flag.Uint64Var(&goroutines, "goroutines", 1, "The number of concurrent go routines to broadcast the messages on")
82+
flag.Uint64Var(&msgSize, "size", 1024, "The size in bytes of the data section for the payload")
83+
flag.Parse()
84+
85+
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
86+
defer func() {
87+
_ = conn.Close()
88+
}()
89+
if err != nil {
90+
fmt.Println("Error connecting:", err)
91+
return
92+
}
93+
94+
msgsPerGo := messages / goroutines
95+
roundMsgs := msgsPerGo * goroutines
96+
if roundMsgs != messages {
97+
fmt.Println("Rounding messages to", roundMsgs)
98+
}
99+
100+
msgData := make([]byte, msgSize)
101+
102+
var wg sync.WaitGroup
103+
wg.Add(int(goroutines))
104+
for i := uint64(0); i < goroutines; i++ {
105+
go func(i uint64) {
106+
client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO())
107+
time.Sleep(10 * time.Second)
108+
if err != nil {
109+
fmt.Println("Error connecting:", err)
110+
return
111+
}
112+
113+
s := newBroadcastClient(client, chainID, signer)
114+
done := make(chan (struct{}))
115+
go func() {
116+
for i := uint64(0); i < msgsPerGo; i++ {
117+
err = s.getAck()
118+
}
119+
if err != nil {
120+
fmt.Printf("\nError: %v\n", err)
121+
}
122+
close(done)
123+
}()
124+
for i := uint64(0); i < msgsPerGo; i++ {
125+
if err := s.broadcast(msgData); err != nil {
126+
panic(err)
127+
}
128+
}
129+
<-done
130+
wg.Done()
131+
client.CloseSend()
132+
fmt.Println("Go routine", i, "exiting")
133+
}(i)
134+
}
135+
136+
wg.Wait()
137+
}

orderer/sample_clients/broadcast_timestamp/client.go

Lines changed: 0 additions & 105 deletions
This file was deleted.

0 commit comments

Comments
 (0)