@@ -39,12 +39,18 @@ type EventHub interface {
39
39
SetPeerAddr (peerURL string , certificate string , serverHostOverride string )
40
40
IsConnected () bool
41
41
Connect () error
42
- RegisterChaincodeEvent (ccid string , eventname string , callback func (* pb. ChaincodeEvent )) * ChainCodeCBE
42
+ RegisterChaincodeEvent (ccid string , eventname string , callback func (* ChaincodeEvent )) * ChainCodeCBE
43
43
UnregisterChaincodeEvent (cbe * ChainCodeCBE )
44
44
RegisterTxEvent (txID string , callback func (string , error ))
45
45
UnregisterTxEvent (txID string )
46
46
}
47
47
48
+ // The EventHubExt interface allows extensions of the SDK to add functionality to EventHub overloads.
49
+ type EventHubExt interface {
50
+ SetInterests (block bool , rejection bool )
51
+ AddChaincodeInterest (ChaincodeID string , EventName string )
52
+ }
53
+
48
54
type eventHub struct {
49
55
// Protects chaincodeRegistrants, blockRegistrants and txRegistrants
50
56
mtx sync.RWMutex
@@ -68,6 +74,14 @@ type eventHub struct {
68
74
interestedEvents []* pb.Interest
69
75
}
70
76
77
+ // ChaincodeEvent contains the current event data for the event handler
78
+ type ChaincodeEvent struct {
79
+ ChaincodeId string
80
+ TxId string
81
+ EventName string
82
+ Payload []byte
83
+ }
84
+
71
85
// ChainCodeCBE ...
72
86
/**
73
87
* The ChainCodeCBE is used internal to the EventHub to hold chaincode
@@ -79,7 +93,7 @@ type ChainCodeCBE struct {
79
93
// event name regex filter
80
94
EventNameFilter string
81
95
// callback function to invoke on successful filter match
82
- CallbackFunc func (* pb. ChaincodeEvent )
96
+ CallbackFunc func (* ChaincodeEvent )
83
97
}
84
98
85
99
// NewEventHub ...
@@ -88,15 +102,40 @@ func NewEventHub() EventHub {
88
102
blockRegistrants := make ([]func (* common.Block ), 0 )
89
103
txRegistrants := make (map [string ]func (string , error ))
90
104
105
+ eventHub := & eventHub {chaincodeRegistrants : chaincodeRegistrants , blockRegistrants : blockRegistrants , txRegistrants : txRegistrants , interestedEvents : nil }
91
106
// default interested events
92
- // TODO: set interestedEvents based on handler registration
93
- interestedEvents := []* pb.Interest {{EventType : pb .EventType_BLOCK }}
94
-
95
- eventHub := & eventHub {chaincodeRegistrants : chaincodeRegistrants , blockRegistrants : blockRegistrants , txRegistrants : txRegistrants , interestedEvents : interestedEvents }
107
+ eventHub .SetInterests (true , true )
96
108
97
109
return eventHub
98
110
}
99
111
112
+ // SetInterests clears all interests and sets the interests for BLOCK and REJECTION type of events.
113
+ func (eventHub * eventHub ) SetInterests (block bool , rejection bool ) {
114
+ eventHub .mtx .Lock ()
115
+ defer eventHub .mtx .Unlock ()
116
+
117
+ eventHub .interestedEvents = nil
118
+ if block {
119
+ eventHub .interestedEvents = append (eventHub .interestedEvents , & pb.Interest {EventType : pb .EventType_BLOCK })
120
+ }
121
+ if rejection {
122
+ eventHub .interestedEvents = append (eventHub .interestedEvents , & pb.Interest {EventType : pb .EventType_REJECTION })
123
+ }
124
+ }
125
+
126
+ // AddChaincodeInterest adds interest for specific CHAINCODE events.
127
+ func (eventHub * eventHub ) AddChaincodeInterest (ChaincodeID string , EventName string ) {
128
+ eventHub .interestedEvents = append (eventHub .interestedEvents , & pb.Interest {
129
+ EventType : pb .EventType_CHAINCODE ,
130
+ RegInfo : & pb.Interest_ChaincodeRegInfo {
131
+ ChaincodeRegInfo : & pb.ChaincodeReg {
132
+ ChaincodeId : ChaincodeID ,
133
+ EventName : EventName ,
134
+ },
135
+ },
136
+ })
137
+ }
138
+
100
139
// SetPeerAddr ...
101
140
/**
102
141
* Set peer url for event source<p>
@@ -149,6 +188,11 @@ func (eventHub *eventHub) Connect() error {
149
188
return nil
150
189
}
151
190
191
+ //SetInterestedEvents set events that client is interested in
192
+ func (eventHub * eventHub ) SetInterestedEvents (events []* pb.Interest ) {
193
+ eventHub .interestedEvents = events
194
+ }
195
+
152
196
//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events
153
197
func (eventHub * eventHub ) GetInterestedEvents () ([]* pb.Interest , error ) {
154
198
return eventHub .interestedEvents , nil
@@ -180,7 +224,12 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) {
180
224
if v .EventNameFilter == ccEvent .ChaincodeEvent .EventName {
181
225
callback := v .CallbackFunc
182
226
if callback != nil {
183
- callback (ccEvent .ChaincodeEvent )
227
+ callback (& ChaincodeEvent {
228
+ ChaincodeId : ccEvent .ChaincodeEvent .ChaincodeId ,
229
+ TxId : ccEvent .ChaincodeEvent .TxId ,
230
+ EventName : ccEvent .ChaincodeEvent .EventName ,
231
+ Payload : ccEvent .ChaincodeEvent .Payload ,
232
+ })
184
233
}
185
234
}
186
235
}
@@ -217,14 +266,12 @@ func (eventHub *eventHub) Disconnected(err error) {
217
266
* @returns {object} ChainCodeCBE object that should be treated as an opaque
218
267
* handle used to unregister (see unregisterChaincodeEvent)
219
268
*/
220
- func (eventHub * eventHub ) RegisterChaincodeEvent (ccid string , eventname string , callback func (* pb.ChaincodeEvent )) * ChainCodeCBE {
221
- if ! eventHub .connected {
222
- return nil
223
- }
224
-
269
+ func (eventHub * eventHub ) RegisterChaincodeEvent (ccid string , eventname string , callback func (* ChaincodeEvent )) * ChainCodeCBE {
225
270
eventHub .mtx .Lock ()
226
271
defer eventHub .mtx .Unlock ()
227
272
273
+ eventHub .AddChaincodeInterest (ccid , eventname )
274
+
228
275
cbe := ChainCodeCBE {CCID : ccid , EventNameFilter : eventname , CallbackFunc : callback }
229
276
cbeArray := eventHub .chaincodeRegistrants [ccid ]
230
277
if cbeArray == nil && len (cbeArray ) <= 0 {
@@ -245,10 +292,6 @@ func (eventHub *eventHub) RegisterChaincodeEvent(ccid string, eventname string,
245
292
* registerChaincodeEvent.
246
293
*/
247
294
func (eventHub * eventHub ) UnregisterChaincodeEvent (cbe * ChainCodeCBE ) {
248
- if ! eventHub .connected {
249
- return
250
- }
251
-
252
295
eventHub .mtx .Lock ()
253
296
defer eventHub .mtx .Unlock ()
254
297
0 commit comments