@@ -17,7 +17,6 @@ limitations under the License.
1717package noopssinglechain
1818
1919import (
20- "fmt"
2120 "time"
2221
2322 "github.com/golang/protobuf/proto"
@@ -32,14 +31,21 @@ import (
3231 "golang.org/x/net/context"
3332 "google.golang.org/grpc"
3433
34+ "fmt"
35+
3536 "github.com/hyperledger/fabric/core/peer"
37+ "github.com/hyperledger/fabric/gossip/gossip"
38+ "github.com/hyperledger/fabric/gossip/integration"
39+ gossip_proto "github.com/hyperledger/fabric/gossip/proto"
40+ "github.com/hyperledger/fabric/gossip/state"
3641 pb "github.com/hyperledger/fabric/protos/peer"
3742)
3843
3944var logger * logging.Logger // package-level logger
4045
4146func init () {
4247 logger = logging .MustGetLogger ("committer" )
48+ logging .SetLevel (logging .DEBUG , logger .Module )
4349}
4450
4551// DeliverService used to communicate with orderers to obtain
@@ -49,55 +55,139 @@ type DeliverService struct {
4955 windowSize uint64
5056 unAcknowledged uint64
5157 committer * committer.LedgerCommitter
58+
59+ stateProvider state.GossipStateProvider
60+ gossip gossip.Gossip
61+ conn * grpc.ClientConn
62+
63+ stopChan chan bool
64+ }
65+
66+ // StopDeliveryService sends stop to the delivery service reference
67+ func StopDeliveryService (service * DeliverService ) {
68+ if service != nil {
69+ service .Stop ()
70+ }
5271}
5372
5473// NewDeliverService construction function to create and initilize
5574// delivery service instance
56- func NewDeliverService () * DeliverService {
75+ func NewDeliverService (address string , grpcServer * grpc. Server ) * DeliverService {
5776 if viper .GetBool ("peer.committer.enabled" ) {
5877 logger .Infof ("Creating committer for single noops endorser" )
5978
60- var opts []grpc.DialOption
61- opts = append (opts , grpc .WithInsecure ())
62- opts = append (opts , grpc .WithTimeout (3 * time .Second ))
63- opts = append (opts , grpc .WithBlock ())
64- endpoint := viper .GetString ("peer.committer.ledger.orderer" )
65- conn , err := grpc .Dial (endpoint , opts ... )
66- if err != nil {
67- logger .Errorf ("Cannot dial to %s, because of %s" , endpoint , err )
68- return nil
69- }
70- var abc orderer.AtomicBroadcast_DeliverClient
71- abc , err = orderer .NewAtomicBroadcastClient (conn ).Deliver (context .TODO ())
72- if err != nil {
73- logger .Errorf ("Unable to initialize atomic broadcast, due to %s" , err )
74- return nil
75- }
76-
7779 deliverService := & DeliverService {
78- // Atomic Broadcast Deliver Clienet
79- client : abc ,
8080 // Instance of RawLedger
8181 committer : committer .NewLedgerCommitter (kvledger .GetLedger (string (chaincode .DefaultChain ))),
8282 windowSize : 10 ,
83+ stopChan : make (chan bool ),
8384 }
85+
86+ deliverService .initStateProvider (address , grpcServer )
87+
8488 return deliverService
8589 }
8690 logger .Infof ("Committer disabled" )
8791 return nil
8892}
8993
90- // Start the delivery service to read the block via delivery
91- // protocol from the orderers
92- func (d * DeliverService ) Start () error {
93- if err := d .seekOldest (); err != nil {
94+ func (d * DeliverService ) startDeliver () error {
95+ logger .Info ("Starting deliver service client" )
96+ err := d .initDeliver ()
97+
98+ if err != nil {
99+ logger .Errorf ("Can't initiate deliver protocol [%s]" , err )
94100 return err
95101 }
96102
103+ height , err := d .committer .LedgerHeight ()
104+ if err != nil {
105+ logger .Errorf ("Can't get legder height from committer [%s]" , err )
106+ return err
107+ }
108+
109+ if height > 0 {
110+ logger .Debugf ("Starting deliver with block [%d]" , height )
111+ if err := d .seekLatestFromCommitter (height ); err != nil {
112+ return err
113+ }
114+
115+ } else {
116+ logger .Debug ("Starting deliver with olders block" )
117+ if err := d .seekOldest (); err != nil {
118+ return err
119+ }
120+
121+ }
122+
97123 d .readUntilClose ()
124+
125+ return nil
126+ }
127+
128+ func (d * DeliverService ) initDeliver () error {
129+ opts := []grpc.DialOption {grpc .WithInsecure (), grpc .WithTimeout (3 * time .Second ), grpc .WithBlock ()}
130+ endpoint := viper .GetString ("peer.committer.ledger.orderer" )
131+ conn , err := grpc .Dial (endpoint , opts ... )
132+ if err != nil {
133+ logger .Errorf ("Cannot dial to %s, because of %s" , endpoint , err )
134+ return err
135+ }
136+ var abc orderer.AtomicBroadcast_DeliverClient
137+ abc , err = orderer .NewAtomicBroadcastClient (conn ).Deliver (context .TODO ())
138+ if err != nil {
139+ logger .Errorf ("Unable to initialize atomic broadcast, due to %s" , err )
140+ return err
141+ }
142+
143+ // Atomic Broadcast Deliver Client
144+ d .client = abc
145+ d .conn = conn
146+ return nil
147+
148+ }
149+
150+ func (d * DeliverService ) stopDeliver () {
151+ if d .conn != nil {
152+ d .conn .Close ()
153+ }
154+ }
155+
156+ func (d * DeliverService ) initStateProvider (address string , grpcServer * grpc.Server ) error {
157+ bootstrap := viper .GetStringSlice ("peer.gossip.bootstrap" )
158+ logger .Debug ("Initializing state provideer, endpoint = " , address , " bootstrap set = " , bootstrap )
159+
160+ gossip , gossipComm := integration .NewGossipComponent (address , grpcServer , bootstrap ... )
161+
162+ d .gossip = gossip
163+ d .stateProvider = state .NewGossipStateProvider (gossip , gossipComm , d .committer )
98164 return nil
99165}
100166
167+ // Start the delivery service to read the block via delivery
168+ // protocol from the orderers
169+ func (d * DeliverService ) Start () {
170+ go d .checkLeaderAndRunDeliver ()
171+ }
172+
173+ // Stop all service and release resources
174+ func (d * DeliverService ) Stop () {
175+ d .stopChan <- true
176+ d .stateProvider .Stop ()
177+ d .gossip .Stop ()
178+ }
179+
180+ func (d * DeliverService ) checkLeaderAndRunDeliver () {
181+
182+ isLeader := viper .GetBool ("peer.gossip.orgLeader" )
183+
184+ if isLeader {
185+ d .startDeliver ()
186+ } else {
187+ <- d .stopChan
188+ }
189+ }
190+
101191func (d * DeliverService ) seekOldest () error {
102192 return d .client .Send (& orderer.DeliverUpdate {
103193 Type : & orderer.DeliverUpdate_Seek {
@@ -109,6 +199,18 @@ func (d *DeliverService) seekOldest() error {
109199 })
110200}
111201
202+ func (d * DeliverService ) seekLatestFromCommitter (height uint64 ) error {
203+ return d .client .Send (& orderer.DeliverUpdate {
204+ Type : & orderer.DeliverUpdate_Seek {
205+ Seek : & orderer.SeekInfo {
206+ Start : orderer .SeekInfo_SPECIFIED ,
207+ WindowSize : d .windowSize ,
208+ SpecifiedNumber : height ,
209+ },
210+ },
211+ })
212+ }
213+
112214func (d * DeliverService ) readUntilClose () {
113215 for {
114216 msg , err := d .client .Recv ()
@@ -119,12 +221,13 @@ func (d *DeliverService) readUntilClose() {
119221 switch t := msg .Type .(type ) {
120222 case * orderer.DeliverResponse_Error :
121223 if t .Error == common .Status_SUCCESS {
122- fmt . Println ("ERROR! Received success in error field" )
224+ logger . Warning ("ERROR! Received success in error field" )
123225 return
124226 }
125- fmt . Println ("Got error " , t )
227+ logger . Warning ("Got error " , t )
126228 case * orderer.DeliverResponse_Block :
127229 block := & pb.Block2 {}
230+ seqNum := t .Block .Header .Number
128231 for _ , d := range t .Block .Data .Data {
129232 if d != nil {
130233 if env , err := putils .GetEnvelopeFromBlock (d ); err != nil {
@@ -137,7 +240,9 @@ func (d *DeliverService) readUntilClose() {
137240 // job for VSCC below
138241 _ , err := peer .ValidateTransaction (env )
139242 if err != nil {
140- // TODO: this code needs to receive a bit more attention and discussion: it's not clear what it means if a transaction which causes a failure in validation is just dropped on the floor
243+ // TODO: this code needs to receive a bit more attention and discussion:
244+ // it's not clear what it means if a transaction which causes a failure
245+ // in validation is just dropped on the floor
141246 logger .Errorf ("Invalid transaction, error %s" , err )
142247 } else {
143248 // TODO: call VSCC now
@@ -148,24 +253,30 @@ func (d *DeliverService) readUntilClose() {
148253 }
149254 }
150255 } else {
151- fmt . Printf ("Nil tx from block\n " )
256+ logger . Warning ("Nil tx from block" )
152257 }
153258 }
154259 }
155- // Once block is constructed need to commit into the ledger
156- if err = d .committer .CommitBlock (block ); err != nil {
157- fmt .Printf ("Got error while committing(%s)\n " , err )
158- } else {
159- fmt .Printf ("Commit success, created a block!\n " )
160- }
260+
261+ numberOfPeers := len (d .gossip .GetPeers ())
262+ // Create payload with a block received
263+ payload := createPayload (seqNum , block )
264+ // Use payload to create gossip message
265+ gossipMsg := createGossipMsg (payload )
266+ logger .Debugf ("Adding payload locally, buffer seqNum = [%d], peers number [%d]" , seqNum , numberOfPeers )
267+ // Add payload to local state payloads buffer
268+ d .stateProvider .AddPayload (payload )
269+ // Gossip messages with other nodes
270+ logger .Debugf ("Gossiping block [%d], peers number [%d]" , seqNum , numberOfPeers )
271+ d .gossip .Gossip (gossipMsg )
161272
162273 d .unAcknowledged ++
163274 if d .unAcknowledged >= d .windowSize / 2 {
164- fmt . Println ("Sending acknowledgement" )
275+ logger . Warningf ("Sending acknowledgement [%d]" , t . Block . Header . Number )
165276 err = d .client .Send (& orderer.DeliverUpdate {
166277 Type : & orderer.DeliverUpdate_Acknowledgement {
167278 Acknowledgement : & orderer.Acknowledgement {
168- Number : t . Block . Header . Number ,
279+ Number : seqNum ,
169280 },
170281 },
171282 })
@@ -175,8 +286,28 @@ func (d *DeliverService) readUntilClose() {
175286 d .unAcknowledged = 0
176287 }
177288 default :
178- fmt . Println ("Received unknown: " , t )
289+ logger . Warning ("Received unknown: " , t )
179290 return
180291 }
181292 }
182293}
294+
295+ func createGossipMsg (payload * gossip_proto.Payload ) * gossip_proto.GossipMessage {
296+ gossipMsg := & gossip_proto.GossipMessage {
297+ Nonce : 0 ,
298+ Content : & gossip_proto.GossipMessage_DataMsg {
299+ DataMsg : & gossip_proto.DataMessage {
300+ Payload : payload ,
301+ },
302+ },
303+ }
304+ return gossipMsg
305+ }
306+
307+ func createPayload (seqNum uint64 , block2 * pb.Block2 ) * gossip_proto.Payload {
308+ marshaledBlock , _ := proto .Marshal (block2 )
309+ return & gossip_proto.Payload {
310+ Data : marshaledBlock ,
311+ SeqNum : seqNum ,
312+ }
313+ }
0 commit comments