diff --git a/bddtests/docker-compose-with-orderer.yml b/bddtests/docker-compose-with-orderer.yml new file mode 100644 index 00000000000..8098d5a580e --- /dev/null +++ b/bddtests/docker-compose-with-orderer.yml @@ -0,0 +1,76 @@ +version: '2' +networks: + bridge: + +services: + orderer: + image: hyperledger/fabric-orderer + environment: + - ORDERER_GENERAL_LEDGERTYPE=ram + - ORDERER_GENERAL_BATCHTIMEOUT=10s + - ORDERER_GENERAL_BATCHSIZE=2 + - ORDERER_GENERAL_MAXWINDOWSIZE=1000 + - ORDERER_GENERAL_ORDERERTYPE=solo + - ORDERER_GENERAL_LISTENADDRESS=0.0.0.0 + - ORDERER_GENERAL_LISTENPORT=5005 + - ORDERER_RAMLEDGER_HISTORY_SIZE=100 + working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer + command: orderer + networks: + - bridge + + + vpserv: + image: hyperledger/fabric-peer + environment: + - CORE_PEER_ADDRESSAUTODETECT=true + - CORE_VM_ENDPOINT=unix:///host/var/run/docker.sock + - CORE_PEER_NETWORKID=${CORE_PEER_NETWORKID} + - CORE_NEXT=true + - CORE_PEER_ENDORSER_ENABLED=true + - CORE_PEER_COMMITTER_ENABLED=true + - CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:5005 + volumes: + - /var/run/:/host/var/run/ + networks: + - bridge + + vp0: + extends: + service: vpserv + environment: + - CORE_PEER_ID=vp0 + - CORE_PEER_PROFILE_ENABLED=true + - CORE_PEER_GOSSIP_ORGLEADER=true + ports: + - 7051:7051 + - 7053:7053 + command: peer node start + links: + - orderer + + vp1: + extends: + service: vpserv + environment: + - CORE_PEER_ID=vp1 + - CORE_PEER_PROFILE_ENABLED=true + - CORE_PEER_GOSSIP_BOOTSTRAP=vp0:7051 + - CORE_PEER_GOSSIP_ORGLEADER=false + command: peer node start + links: + - orderer + + vp2: + extends: + service: vpserv + environment: + - CORE_PEER_ID=vp2 + - CORE_PEER_PROFILE_ENABLED=true + - CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:5005 + - CORE_PEER_GOSSIP_BOOTSTRAP=vp0:10000 + - CORE_PEER_GOSSIP_ORGLEADER=false + command: peer node start + links: + - orderer + diff --git a/core/committer/committer_impl.go b/core/committer/committer_impl.go index c524c9c4764..8c9af970a75 100644 --- a/core/committer/committer_impl.go +++ b/core/committer/committer_impl.go @@ -77,7 +77,7 @@ func (lc *LedgerCommitter) GetBlocks(blockSeqs []uint64) []*pb.Block2 { logger.Errorf("Not able to acquire block num %d, from the ledger skipping...\n", seqNum) continue } else { - logger.Debug("Appending next ", blck, " to the resulting set") + logger.Debug("Appending next block with seqNum = ", seqNum, " to the resulting set") blocks = append(blocks, blck) } } diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index e05447f7705..997e60a336c 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -17,7 +17,6 @@ limitations under the License. package noopssinglechain import ( - "fmt" "time" "github.com/golang/protobuf/proto" @@ -32,7 +31,13 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "fmt" + "github.com/hyperledger/fabric/core/peer" + "github.com/hyperledger/fabric/gossip/gossip" + "github.com/hyperledger/fabric/gossip/integration" + gossip_proto "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/state" pb "github.com/hyperledger/fabric/protos/peer" ) @@ -40,6 +45,7 @@ var logger *logging.Logger // package-level logger func init() { logger = logging.MustGetLogger("committer") + logging.SetLevel(logging.DEBUG, logger.Module) } // DeliverService used to communicate with orderers to obtain @@ -49,55 +55,139 @@ type DeliverService struct { windowSize uint64 unAcknowledged uint64 committer *committer.LedgerCommitter + + stateProvider state.GossipStateProvider + gossip gossip.Gossip + conn *grpc.ClientConn + + stopChan chan bool +} + +// StopDeliveryService sends stop to the delivery service reference +func StopDeliveryService(service *DeliverService) { + if service != nil { + service.Stop() + } } // NewDeliverService construction function to create and initilize // delivery service instance -func NewDeliverService() *DeliverService { +func NewDeliverService(address string, grpcServer *grpc.Server) *DeliverService { if viper.GetBool("peer.committer.enabled") { logger.Infof("Creating committer for single noops endorser") - var opts []grpc.DialOption - opts = append(opts, grpc.WithInsecure()) - opts = append(opts, grpc.WithTimeout(3*time.Second)) - opts = append(opts, grpc.WithBlock()) - endpoint := viper.GetString("peer.committer.ledger.orderer") - conn, err := grpc.Dial(endpoint, opts...) - if err != nil { - logger.Errorf("Cannot dial to %s, because of %s", endpoint, err) - return nil - } - var abc orderer.AtomicBroadcast_DeliverClient - abc, err = orderer.NewAtomicBroadcastClient(conn).Deliver(context.TODO()) - if err != nil { - logger.Errorf("Unable to initialize atomic broadcast, due to %s", err) - return nil - } - deliverService := &DeliverService{ - // Atomic Broadcast Deliver Clienet - client: abc, // Instance of RawLedger committer: committer.NewLedgerCommitter(kvledger.GetLedger(string(chaincode.DefaultChain))), windowSize: 10, + stopChan: make(chan bool), } + + deliverService.initStateProvider(address, grpcServer) + return deliverService } logger.Infof("Committer disabled") return nil } -// Start the delivery service to read the block via delivery -// protocol from the orderers -func (d *DeliverService) Start() error { - if err := d.seekOldest(); err != nil { +func (d *DeliverService) startDeliver() error { + logger.Info("Starting deliver service client") + err := d.initDeliver() + + if err != nil { + logger.Errorf("Can't initiate deliver protocol [%s]", err) return err } + height, err := d.committer.LedgerHeight() + if err != nil { + logger.Errorf("Can't get legder height from committer [%s]", err) + return err + } + + if height > 0 { + logger.Debugf("Starting deliver with block [%d]", height) + if err := d.seekLatestFromCommitter(height); err != nil { + return err + } + + } else { + logger.Debug("Starting deliver with olders block") + if err := d.seekOldest(); err != nil { + return err + } + + } + d.readUntilClose() + + return nil +} + +func (d *DeliverService) initDeliver() error { + opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(3 * time.Second), grpc.WithBlock()} + endpoint := viper.GetString("peer.committer.ledger.orderer") + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + logger.Errorf("Cannot dial to %s, because of %s", endpoint, err) + return err + } + var abc orderer.AtomicBroadcast_DeliverClient + abc, err = orderer.NewAtomicBroadcastClient(conn).Deliver(context.TODO()) + if err != nil { + logger.Errorf("Unable to initialize atomic broadcast, due to %s", err) + return err + } + + // Atomic Broadcast Deliver Client + d.client = abc + d.conn = conn + return nil + +} + +func (d *DeliverService) stopDeliver() { + if d.conn != nil { + d.conn.Close() + } +} + +func (d *DeliverService) initStateProvider(address string, grpcServer *grpc.Server) error { + bootstrap := viper.GetStringSlice("peer.gossip.bootstrap") + logger.Debug("Initializing state provideer, endpoint = ", address, " bootstrap set = ", bootstrap) + + gossip, gossipComm := integration.NewGossipComponent(address, grpcServer, bootstrap...) + + d.gossip = gossip + d.stateProvider = state.NewGossipStateProvider(gossip, gossipComm, d.committer) return nil } +// Start the delivery service to read the block via delivery +// protocol from the orderers +func (d *DeliverService) Start() { + go d.checkLeaderAndRunDeliver() +} + +// Stop all service and release resources +func (d *DeliverService) Stop() { + d.stopChan <- true + d.stateProvider.Stop() + d.gossip.Stop() +} + +func (d *DeliverService) checkLeaderAndRunDeliver() { + + isLeader := viper.GetBool("peer.gossip.orgLeader") + + if isLeader { + d.startDeliver() + } else { + <-d.stopChan + } +} + func (d *DeliverService) seekOldest() error { return d.client.Send(&orderer.DeliverUpdate{ Type: &orderer.DeliverUpdate_Seek{ @@ -109,6 +199,18 @@ func (d *DeliverService) seekOldest() error { }) } +func (d *DeliverService) seekLatestFromCommitter(height uint64) error { + return d.client.Send(&orderer.DeliverUpdate{ + Type: &orderer.DeliverUpdate_Seek{ + Seek: &orderer.SeekInfo{ + Start: orderer.SeekInfo_SPECIFIED, + WindowSize: d.windowSize, + SpecifiedNumber: height, + }, + }, + }) +} + func (d *DeliverService) readUntilClose() { for { msg, err := d.client.Recv() @@ -119,12 +221,13 @@ func (d *DeliverService) readUntilClose() { switch t := msg.Type.(type) { case *orderer.DeliverResponse_Error: if t.Error == common.Status_SUCCESS { - fmt.Println("ERROR! Received success in error field") + logger.Warning("ERROR! Received success in error field") return } - fmt.Println("Got error ", t) + logger.Warning("Got error ", t) case *orderer.DeliverResponse_Block: block := &pb.Block2{} + seqNum := t.Block.Header.Number for _, d := range t.Block.Data.Data { if d != nil { if env, err := putils.GetEnvelopeFromBlock(d); err != nil { @@ -137,7 +240,9 @@ func (d *DeliverService) readUntilClose() { // job for VSCC below _, err := peer.ValidateTransaction(env) if err != nil { - // TODO: this code needs to receive a bit more attention and discussion: it's not clear what it means if a transaction which causes a failure in validation is just dropped on the floor + // TODO: this code needs to receive a bit more attention and discussion: + // it's not clear what it means if a transaction which causes a failure + // in validation is just dropped on the floor logger.Errorf("Invalid transaction, error %s", err) } else { // TODO: call VSCC now @@ -148,24 +253,30 @@ func (d *DeliverService) readUntilClose() { } } } else { - fmt.Printf("Nil tx from block\n") + logger.Warning("Nil tx from block") } } } - // Once block is constructed need to commit into the ledger - if err = d.committer.CommitBlock(block); err != nil { - fmt.Printf("Got error while committing(%s)\n", err) - } else { - fmt.Printf("Commit success, created a block!\n") - } + + numberOfPeers := len(d.gossip.GetPeers()) + // Create payload with a block received + payload := createPayload(seqNum, block) + // Use payload to create gossip message + gossipMsg := createGossipMsg(payload) + logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers) + // Add payload to local state payloads buffer + d.stateProvider.AddPayload(payload) + // Gossip messages with other nodes + logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers) + d.gossip.Gossip(gossipMsg) d.unAcknowledged++ if d.unAcknowledged >= d.windowSize/2 { - fmt.Println("Sending acknowledgement") + logger.Warningf("Sending acknowledgement [%d]", t.Block.Header.Number) err = d.client.Send(&orderer.DeliverUpdate{ Type: &orderer.DeliverUpdate_Acknowledgement{ Acknowledgement: &orderer.Acknowledgement{ - Number: t.Block.Header.Number, + Number: seqNum, }, }, }) @@ -175,8 +286,28 @@ func (d *DeliverService) readUntilClose() { d.unAcknowledged = 0 } default: - fmt.Println("Received unknown: ", t) + logger.Warning("Received unknown: ", t) return } } } + +func createGossipMsg(payload *gossip_proto.Payload) *gossip_proto.GossipMessage { + gossipMsg := &gossip_proto.GossipMessage{ + Nonce: 0, + Content: &gossip_proto.GossipMessage_DataMsg{ + DataMsg: &gossip_proto.DataMessage{ + Payload: payload, + }, + }, + } + return gossipMsg +} + +func createPayload(seqNum uint64, block2 *pb.Block2) *gossip_proto.Payload { + marshaledBlock, _ := proto.Marshal(block2) + return &gossip_proto.Payload{ + Data: marshaledBlock, + SeqNum: seqNum, + } +} diff --git a/gossip/state/state.go b/gossip/state/state.go index a11eb195804..0c8cf56139f 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -17,17 +17,18 @@ limitations under the License. package state import ( - "github.com/hyperledger/fabric/gossip/gossip" - "github.com/hyperledger/fabric/gossip/proto" - pb "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/gossip/comm" - "github.com/op/go-logging" + "math/rand" "sync" "sync/atomic" "time" - "math/rand" - "github.com/hyperledger/fabric/protos/peer" + + pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/gossip" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/protos/peer" + "github.com/op/go-logging" ) // GossipStateProvider is the interface to acquire sequences of the ledger blocks @@ -49,7 +50,7 @@ var logFormat = logging.MustStringFormatter( ) const ( - defPollingPeriod = 200 * time.Millisecond + defPollingPeriod = 200 * time.Millisecond defAntiEntropyInterval = 10 * time.Second ) @@ -58,33 +59,34 @@ const ( // new ledger block to be acquired by hyper ledger type GossipStateProviderImpl struct { // The gossiping service - gossip gossip.Gossip; + gossip gossip.Gossip // Channel to read gossip messages from - gossipChan <- chan *proto.GossipMessage; + gossipChan <-chan *proto.GossipMessage - commChan <- chan comm.ReceivedMessage; + commChan <-chan comm.ReceivedMessage // Flag which signals for termination - stopFlag int32; + stopFlag int32 - mutex sync.RWMutex; + mutex sync.RWMutex // Queue of payloads which wasn't acquired yet - payloads PayloadsBuffer; + payloads PayloadsBuffer - comm comm.Comm; + comm comm.Comm - committer committer.Committer; + committer committer.Committer - logger *logging.Logger; + logger *logging.Logger - done sync.WaitGroup; + done sync.WaitGroup } // NewGossipStateProvider creates initialized instance of gossip state provider func NewGossipStateProvider(g gossip.Gossip, c comm.Comm, committer committer.Committer) GossipStateProvider { logger, _ := logging.GetLogger("GossipStateProvider") + logging.SetLevel(logging.DEBUG, logger.Module) gossipChan := g.Accept(func(message interface{}) bool { // Get only data messages @@ -108,7 +110,7 @@ func NewGossipStateProvider(g gossip.Gossip, c comm.Comm, committer committer.Co s := &GossipStateProviderImpl{ // Instance of the gossip - gossip : g, + gossip: g, // Channel to read new messages from gossipChan: gossipChan, @@ -120,7 +122,7 @@ func NewGossipStateProvider(g gossip.Gossip, c comm.Comm, committer committer.Co // Create a queue for payload received payloads: NewPayloadsBuffer(height + 1), - comm : c, + comm: c, committer: committer, @@ -156,28 +158,27 @@ func (s *GossipStateProviderImpl) listen() { // Do not block on waiting message from channel // check each 500ms whenever is done indicates to // finish - next: - select { - case msg := <-s.gossipChan: - { - s.logger.Debug("Received new message via gossip channel") - go s.queueNewMessage(msg) - } - case msg := <-s.commChan: - { - s.logger.Debug("Direct message ", msg) - go s.directMessage(msg) - } - case <-time.After(defPollingPeriod): - break next + next: + select { + case msg := <-s.gossipChan: + { + s.logger.Debug("Received new message via gossip channel") + go s.queueNewMessage(msg) } + case msg := <-s.commChan: + { + go s.directMessage(msg) + } + case <-time.After(defPollingPeriod): + break next + } } s.logger.Debug("[XXX]: Stop listening for new messages") s.done.Done() } func (s *GossipStateProviderImpl) directMessage(msg comm.ReceivedMessage) { - s.logger.Debugf("[ENTER] -> directMessage, ", msg) + s.logger.Debug("[ENTER] -> directMessage") defer s.logger.Debug("[EXIT] -> directMessage") if msg == nil { @@ -196,7 +197,7 @@ func (s *GossipStateProviderImpl) directMessage(msg comm.ReceivedMessage) { func (s *GossipStateProviderImpl) handleStateRequest(msg comm.ReceivedMessage) { request := msg.GetGossipMessage().GetStateRequest() - response := &proto.RemoteStateResponse{Payloads:make([]*proto.Payload, 0)} + response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)} for _, seqNum := range request.SeqNums { s.logger.Debug("Reading block ", seqNum, " from the committer service") blocks := s.committer.GetBlocks([]uint64{seqNum}) @@ -217,7 +218,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg comm.ReceivedMessage) { response.Payloads = append(response.Payloads, &proto.Payload{ SeqNum: seqNum, - Data: blockBytes, + Data: blockBytes, // TODO: Check hash generation for given block from the ledger Hash: "", }) @@ -249,12 +250,13 @@ func (s *GossipStateProviderImpl) isDone() bool { func (s *GossipStateProviderImpl) Stop() { atomic.StoreInt32(&s.stopFlag, 1) s.done.Wait() + s.committer.Close() } // New message notification/handler func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) { dataMsg := msg.GetDataMsg() - if (dataMsg != nil) { + if dataMsg != nil { // Add new payload to ordered set s.logger.Debugf("Received new payload with sequence number = [%d]", dataMsg.Payload.SeqNum) s.payloads.Push(dataMsg.GetPayload()) @@ -265,29 +267,29 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) { func (s *GossipStateProviderImpl) deliverPayloads() { for !s.isDone() { - next: - select { - // Wait for notification that next seq has arrived - case <-s.payloads.Ready(): - { - s.logger.Debugf("Ready to transfer payloads to the ledger, next sequence number is = [%d]", s.payloads.Next()) - // Collect all subsequent payloads - for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() { - rawblock := &peer.Block2{} - if err := pb.Unmarshal(payload.Data, rawblock); err != nil { - s.logger.Errorf("Error getting block with seqNum = %d due to (%s)...dropping block\n", payload.SeqNum, err) - continue - } - s.logger.Debug("New block with sequence number ", payload.SeqNum, " is ", rawblock) - - s.commitBlock(rawblock, payload.SeqNum) + next: + select { + // Wait for notification that next seq has arrived + case <-s.payloads.Ready(): + { + s.logger.Debugf("Ready to transfer payloads to the ledger, next sequence number is = [%d]", s.payloads.Next()) + // Collect all subsequent payloads + for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() { + rawblock := &peer.Block2{} + if err := pb.Unmarshal(payload.Data, rawblock); err != nil { + s.logger.Errorf("Error getting block with seqNum = %d due to (%s)...dropping block\n", payload.SeqNum, err) + continue } + s.logger.Debug("New block with sequence number ", payload.SeqNum, " transactions num ", len(rawblock.Transactions)) + + s.commitBlock(rawblock, payload.SeqNum) } - case <-time.After(defPollingPeriod): - { - break next - } } + case <-time.After(defPollingPeriod): + { + break next + } + } } s.logger.Debug("State provider has been stoped, finishing to push new blocks.") s.done.Done() @@ -295,7 +297,7 @@ func (s *GossipStateProviderImpl) deliverPayloads() { func (s *GossipStateProviderImpl) antiEntropy() { checkPoint := time.Now() - for (!s.isDone()) { + for !s.isDone() { time.Sleep(defPollingPeriod) if time.Since(checkPoint).Nanoseconds() <= defAntiEntropyInterval.Nanoseconds() { continue @@ -306,21 +308,21 @@ func (s *GossipStateProviderImpl) antiEntropy() { max, _ := s.committer.LedgerHeight() for _, p := range s.gossip.GetPeers() { - if state, err:= FromBytes(p.Metadata); err == nil { + if state, err := FromBytes(p.Metadata); err == nil { if max < state.LedgerHeight { max = state.LedgerHeight } } } - if current == max { + if current == max { // No messages in the buffer or there are no gaps s.logger.Debugf("Current ledger height is the same as ledger height on other peers.") continue } - s.logger.Debugf("Requesting new blocks in range [%d...%d].", current + 1, max) - s.requestBlocksInRange(uint64(current + 1), uint64(max)) + s.logger.Debugf("Requesting new blocks in range [%d...%d].", current+1, max) + s.requestBlocksInRange(uint64(current+1), uint64(max)) } s.logger.Debug("[XXX]: Stateprovider stopped, stoping anti entropy procedure.") s.done.Done() @@ -335,7 +337,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) nodeMetadata, err := FromBytes(value.Metadata) if err == nil { if nodeMetadata.LedgerHeight >= end { - peers = append(peers, &comm.RemotePeer{Endpoint: value.Endpoint, PKIID:value.PKIid}) + peers = append(peers, &comm.RemotePeer{Endpoint: value.Endpoint, PKIID: value.PKIid}) } } else { s.logger.Errorf("Unable to de-serialize node meta state, error = %s", err) @@ -343,7 +345,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) } n := len(peers) - if (n == 0) { + if n == 0 { s.logger.Warningf("There is not peer nodes to ask for missing blocks in range [%d, %d)", start, end) return } @@ -356,7 +358,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) } for i := start; i <= end; i++ { - request.SeqNums = append(request.SeqNums, uint64(i)); + request.SeqNums = append(request.SeqNums, uint64(i)) } s.logger.Debug("[$$$$$$$$$$$$$$$$]: Sending direct request to complete missing blocks, ", request) @@ -397,4 +399,4 @@ func (s *GossipStateProviderImpl) commitBlock(block *peer.Block2, seqNum uint64) s.logger.Debug("[XXX]: Commit success, created a block!") return nil -} \ No newline at end of file +} diff --git a/orderer/main.go b/orderer/main.go index 7be69ed7f23..0256e09d37b 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -258,7 +258,7 @@ func launchKafka(conf *config.TopLevel) { signal.Notify(signalChan, os.Interrupt) for range signalChan { - fmt.Println("Server shutting down") + logger.Info("Server shutting down") return } } diff --git a/peer/core.yaml b/peer/core.yaml index 589cd6b9856..4a10913310e 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -1,5 +1,6 @@ ############################################################################### # +# # CLI section # ############################################################################### @@ -100,6 +101,12 @@ peer: gomaxprocs: -1 workers: 2 + # Gossip related configuration + gossip: + boostrap: 0.0.0.0:7051 + # For debug - is peer is its org leader and should pass blocks from orderer to other peers in org + orgLeader: true + # Sync related configuration sync: blocks: diff --git a/peer/node/start.go b/peer/node/start.go index fb18d974316..b9b530e98bc 100755 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -150,18 +150,14 @@ func serve(args []string) error { serverEndorser := endorser.NewEndorserServer() pb.RegisterEndorserServer(grpcServer, serverEndorser) - // !!!IMPORTANT!!! - as mentioned in core.yaml, peer-orderer-committer - // interaction is closely tied to bootstrapping. This is to be viewed - // as temporary implementation to test the end-to-end flows in the - // system outside of multi-ledger, multi-channel work - if deliverService := noopssinglechain.NewDeliverService(); deliverService != nil { - go func() { - if err := deliverService.Start(); err != nil { - fmt.Printf("Could not start solo committer(%s), continuing without committer\n", err) - } - }() + deliverService := noopssinglechain.NewDeliverService(peerEndpoint.Address, grpcServer) + + if deliverService != nil { + deliverService.Start() } + defer noopssinglechain.StopDeliveryService(deliverService) + logger.Infof("Starting peer with ID=%s, network ID=%s, address=%s, rootnodes=%v, validator=%v", peerEndpoint.ID, viper.GetString("peer.networkId"), peerEndpoint.Address, viper.GetString("peer.discovery.rootnode"), peer.ValidatorEnabled())