From dd98f31dd985583ee862282b6478f0a6eac1f6f5 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Thu, 29 Jun 2017 16:19:02 -0400 Subject: [PATCH] [FAB-5273] Perf oriented broadcast_msg The existing broadcast_timestamp only operates on one go routine. This CR allows the number of go routines to be specified on the CLI. It also signs messages as the signature verification is the piece which CPU bounds the orderer. Additionally, the message size may now be specified as a parameter. Change-Id: I2edf0c891161b66044d4713e3e928c1067fd6470 Signed-off-by: Jason Yellick --- .../sample_clients/broadcast_msg/client.go | 137 ++++++++++++++++++ .../broadcast_timestamp/client.go | 105 -------------- 2 files changed, 137 insertions(+), 105 deletions(-) create mode 100644 orderer/sample_clients/broadcast_msg/client.go delete mode 100644 orderer/sample_clients/broadcast_timestamp/client.go diff --git a/orderer/sample_clients/broadcast_msg/client.go b/orderer/sample_clients/broadcast_msg/client.go new file mode 100644 index 00000000000..d2f8dc44ff1 --- /dev/null +++ b/orderer/sample_clients/broadcast_msg/client.go @@ -0,0 +1,137 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "flag" + "fmt" + "os" + "sync" + "time" + + "github.com/hyperledger/fabric/common/crypto" + "github.com/hyperledger/fabric/common/localmsp" + "github.com/hyperledger/fabric/common/tools/configtxgen/provisional" + mspmgmt "github.com/hyperledger/fabric/msp/mgmt" + "github.com/hyperledger/fabric/orderer/common/localconfig" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type broadcastClient struct { + client ab.AtomicBroadcast_BroadcastClient + signer crypto.LocalSigner + chainID string +} + +// newBroadcastClient creates a simple instance of the broadcastClient interface +func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient, chainID string, signer crypto.LocalSigner) *broadcastClient { + return &broadcastClient{client: client, chainID: chainID, signer: signer} +} + +func (s *broadcastClient) broadcast(transaction []byte) error { + env, err := utils.CreateSignedEnvelope(cb.HeaderType_MESSAGE, s.chainID, s.signer, &cb.Envelope{Signature: transaction}, 0, 0) + if err != nil { + panic(err) + } + time.Sleep(time.Second) + return s.client.Send(env) +} + +func (s *broadcastClient) getAck() error { + msg, err := s.client.Recv() + if err != nil { + return err + } + if msg.Status != cb.Status_SUCCESS { + return fmt.Errorf("Got unexpected status: %v - %s", msg.Status, msg.Info) + } + return nil +} + +func main() { + config := config.Load() + + // Load local MSP + err := mspmgmt.LoadLocalMsp(config.General.LocalMSPDir, config.General.BCCSP, config.General.LocalMSPID) + if err != nil { // Handle errors reading the config file + fmt.Println("Failed to initialize local MSP:", err) + os.Exit(0) + } + + signer := localmsp.NewSigner() + + var chainID string + var serverAddr string + var messages uint64 + var goroutines uint64 + var msgSize uint64 + + flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.") + flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to broadcast to.") + flag.Uint64Var(&messages, "messages", 1, "The number of messages to broadcast.") + flag.Uint64Var(&goroutines, "goroutines", 1, "The number of concurrent go routines to broadcast the messages on") + flag.Uint64Var(&msgSize, "size", 1024, "The size in bytes of the data section for the payload") + flag.Parse() + + conn, err := grpc.Dial(serverAddr, grpc.WithInsecure()) + defer func() { + _ = conn.Close() + }() + if err != nil { + fmt.Println("Error connecting:", err) + return + } + + msgsPerGo := messages / goroutines + roundMsgs := msgsPerGo * goroutines + if roundMsgs != messages { + fmt.Println("Rounding messages to", roundMsgs) + } + + msgData := make([]byte, msgSize) + + var wg sync.WaitGroup + wg.Add(int(goroutines)) + for i := uint64(0); i < goroutines; i++ { + go func(i uint64) { + client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO()) + time.Sleep(10 * time.Second) + if err != nil { + fmt.Println("Error connecting:", err) + return + } + + s := newBroadcastClient(client, chainID, signer) + done := make(chan (struct{})) + go func() { + for i := uint64(0); i < msgsPerGo; i++ { + err = s.getAck() + } + if err != nil { + fmt.Printf("\nError: %v\n", err) + } + close(done) + }() + for i := uint64(0); i < msgsPerGo; i++ { + if err := s.broadcast(msgData); err != nil { + panic(err) + } + } + <-done + wg.Done() + client.CloseSend() + fmt.Println("Go routine", i, "exiting") + }(i) + } + + wg.Wait() +} diff --git a/orderer/sample_clients/broadcast_timestamp/client.go b/orderer/sample_clients/broadcast_timestamp/client.go deleted file mode 100644 index 640b6670dc1..00000000000 --- a/orderer/sample_clients/broadcast_timestamp/client.go +++ /dev/null @@ -1,105 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "flag" - "fmt" - "time" - - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/common/tools/configtxgen/provisional" - "github.com/hyperledger/fabric/orderer/common/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - "github.com/hyperledger/fabric/protos/utils" - "golang.org/x/net/context" - "google.golang.org/grpc" -) - -type broadcastClient struct { - client ab.AtomicBroadcast_BroadcastClient - chainID string -} - -// newBroadcastClient creates a simple instance of the broadcastClient interface -func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient, chainID string) *broadcastClient { - return &broadcastClient{client: client, chainID: chainID} -} - -func (s *broadcastClient) broadcast(transaction []byte) error { - payload, err := proto.Marshal(&cb.Payload{ - Header: &cb.Header{ - ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ - ChannelId: s.chainID, - }), - SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), - }, - Data: transaction, - }) - if err != nil { - panic(err) - } - return s.client.Send(&cb.Envelope{Payload: payload}) -} - -func (s *broadcastClient) getAck() error { - msg, err := s.client.Recv() - if err != nil { - return err - } - if msg.Status != cb.Status_SUCCESS { - return fmt.Errorf("Got unexpected status: %v", msg.Status) - } - return nil -} - -func main() { - config := config.Load() - - var chainID string - var serverAddr string - var messages uint64 - - flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.") - flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to broadcast to.") - flag.Uint64Var(&messages, "messages", 1, "The number of messages to broadcast.") - flag.Parse() - - conn, err := grpc.Dial(serverAddr, grpc.WithInsecure()) - defer func() { - _ = conn.Close() - }() - if err != nil { - fmt.Println("Error connecting:", err) - return - } - client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO()) - if err != nil { - fmt.Println("Error connecting:", err) - return - } - - s := newBroadcastClient(client, chainID) - for i := uint64(0); i < messages; i++ { - s.broadcast([]byte(fmt.Sprintf("Testing %v", time.Now()))) - err = s.getAck() - } - if err != nil { - fmt.Printf("\nError: %v\n", err) - } -}