14
14
15
15
package org .hyperledger .fabric .sdk ;
16
16
17
+ import java .io .IOException ;
18
+ import java .io .ObjectInputStream ;
17
19
import java .io .Serializable ;
18
20
import java .util .ArrayList ;
19
21
import java .util .Properties ;
52
54
53
55
public class EventHub implements Serializable {
54
56
private static final long serialVersionUID = 2882609588201108148L ;
55
- private static final Log logger = LogFactory .getLog (EventHub .class );
56
57
private static final Config config = Config .getConfig ();
58
+ private transient String id = config .getNextID ();
59
+ private static final Log logger = LogFactory .getLog (EventHub .class );
60
+ private static final boolean IS_TRACE_LEVEL = logger .isTraceEnabled ();
61
+
57
62
private static final long EVENTHUB_CONNECTION_WAIT_TIME = config .getEventHubConnectionWaitTime ();
58
63
private static final long EVENTHUB_RECONNECTION_WARNING_RATE = config .getEventHubReconnectionWarningRate ();
59
64
@@ -78,6 +83,7 @@ public class EventHub implements Serializable {
78
83
private transient long reconnectCount ;
79
84
private transient long lastBlockNumber ;
80
85
private transient BlockEvent lastBlockEvent ;
86
+ private String channelName ;
81
87
82
88
/**
83
89
* Get disconnected time.
@@ -99,6 +105,22 @@ public boolean isConnected() {
99
105
return connected ;
100
106
}
101
107
108
+ String getStatus () {
109
+
110
+ StringBuilder sb = new StringBuilder (1000 );
111
+ sb .append (toString ()).append (", connected: " ).append (connected );
112
+ ManagedChannel lmanagedChannel = managedChannel ;
113
+ if (lmanagedChannel == null ) {
114
+ sb .append ("managedChannel: null" );
115
+ } else {
116
+ sb .append (", isShutdown: " ).append (lmanagedChannel .isShutdown ());
117
+ sb .append (", isTerminated: " ).append (lmanagedChannel .isTerminated ());
118
+ sb .append (", state: " ).append ("" + lmanagedChannel .getState (false ));
119
+ }
120
+
121
+ return sb .toString ();
122
+ }
123
+
102
124
/**
103
125
* Get last connect time.
104
126
*
@@ -136,6 +158,7 @@ public long getLastConnectedAttempt() {
136
158
this .name = name ;
137
159
this .executorService = executorService ;
138
160
this .properties = properties == null ? null : (Properties ) properties .clone (); //keep our own copy.
161
+ logger .debug ("Created " + toString ());
139
162
}
140
163
141
164
/**
@@ -186,7 +209,7 @@ synchronized boolean connect(final TransactionContext transactionContext, final
186
209
187
210
final CountDownLatch finishLatch = new CountDownLatch (1 );
188
211
189
- logger .debug (format ("EventHub %s is connecting." , name ));
212
+ logger .debug (format ("%s is connecting." , toString () ));
190
213
191
214
lastConnectedAttempt = System .currentTimeMillis ();
192
215
@@ -203,12 +226,14 @@ synchronized boolean connect(final TransactionContext transactionContext, final
203
226
@ Override
204
227
public void onNext (PeerEvents .Event event ) {
205
228
206
- logger .debug (format ("EventHub %s got event type: %s" , EventHub .this .name , event .getEventCase ().name ()));
229
+ logger .debug (format ("%s got event type: %s" , EventHub .this .toString () , event .getEventCase ().name ()));
207
230
208
231
if (event .getEventCase () == PeerEvents .Event .EventCase .BLOCK ) {
209
232
try {
210
233
211
234
BlockEvent blockEvent = new BlockEvent (EventHub .this , event );
235
+
236
+ logger .trace (format ("%s got block number: %d" , EventHub .this .toString (), blockEvent .getBlockNumber ()));
212
237
setLastBlockSeen (blockEvent );
213
238
214
239
eventQue .addBEvent (blockEvent ); //add to channel queue
@@ -220,14 +245,17 @@ public void onNext(PeerEvents.Event event) {
220
245
} else if (event .getEventCase () == PeerEvents .Event .EventCase .REGISTER ) {
221
246
222
247
if (reconnectCount > 1 ) {
223
- logger .info (format ("Eventhub %s has reconnecting after %d attempts" , name , reconnectCount ));
248
+ logger .info (format ("%s has reconnecting after %d attempts" , EventHub . this . toString () , reconnectCount ));
224
249
}
225
250
226
251
connected = true ;
227
252
connectedTime = System .currentTimeMillis ();
228
253
reconnectCount = 0L ;
229
254
230
255
finishLatch .countDown ();
256
+ } else {
257
+ logger .error (format ("%s got a unexpected block type: %s" ,
258
+ EventHub .this .toString (), event .getEventCase ().name ()));
231
259
}
232
260
}
233
261
@@ -271,7 +299,7 @@ public void onError(Throwable t) {
271
299
try {
272
300
reconnect ();
273
301
} catch (Exception e ) {
274
- logger .warn (format ("Eventhub %s Failed shutdown msg: %s" , EventHub .this .name , e .getMessage ()));
302
+ logger .warn (format ("%s Failed shutdown msg: %s" , EventHub .this .toString () , e .getMessage ()));
275
303
}
276
304
277
305
}
@@ -300,17 +328,17 @@ public void onCompleted() {
300
328
301
329
if (!reconnection && !finishLatch .await (EVENTHUB_CONNECTION_WAIT_TIME , TimeUnit .MILLISECONDS )) {
302
330
303
- logger .warn (format ("EventHub %s failed to connect in %s ms." , name , EVENTHUB_CONNECTION_WAIT_TIME ));
331
+ logger .warn (format ("%s failed to connect in %s ms." , toString () , EVENTHUB_CONNECTION_WAIT_TIME ));
304
332
305
333
} else {
306
- logger .trace (format ("Eventhub %s Done waiting for reply!" , name ));
334
+ logger .trace (format ("%s done waiting for reply!" , toString () ));
307
335
}
308
336
309
337
} catch (InterruptedException e ) {
310
338
logger .error (e );
311
339
}
312
340
313
- logger .debug (format ("Eventhub %s connect is done with connect status: %b " , name , connected ));
341
+ logger .debug (format ("%s connect is done with connect status: %b " , toString () , connected ));
314
342
315
343
if (connected ) {
316
344
eventStream = eventStreamLocal ;
@@ -383,16 +411,19 @@ void setEventQue(Channel.ChannelEventQue eventQue) {
383
411
384
412
@ Override
385
413
public String toString () {
386
- return "EventHub: " + getName ();
414
+ return "EventHub{" + "id: " + id + ", name: " + getName () + ", channelName: " + channelName + ", url: " + getUrl () + "}" ;
387
415
}
388
416
389
- public void shutdown () {
417
+ public synchronized void shutdown () {
418
+ if (shutdown ) {
419
+ return ;
420
+ }
421
+ logger .trace (toString () + " being shutdown." );
390
422
shutdown = true ;
391
423
lastBlockEvent = null ;
392
424
lastBlockNumber = 0 ;
393
425
connected = false ;
394
426
disconnectedHandler = null ;
395
- channel = null ;
396
427
eventStream = null ;
397
428
final ManagedChannel lmanagedChannel = managedChannel ;
398
429
managedChannel = null ;
@@ -406,20 +437,23 @@ void setChannel(Channel channel) throws InvalidArgumentException {
406
437
throw new InvalidArgumentException ("setChannel Channel can not be null" );
407
438
}
408
439
409
- if (null != this . channel ) {
440
+ if (null != channelName ) {
410
441
throw new InvalidArgumentException (format ("Can not add event hub %s to channel %s because it already belongs to channel %s." ,
411
- name , channel .getName (), this . channel . getName () ));
442
+ name , channel .getName (), channelName ));
412
443
}
413
-
414
- this .channel = channel ;
444
+ logger . debug ( toString () + " set to channel: " + channel );
445
+ this .channelName = channel . getName () ;
415
446
}
416
447
417
- synchronized void setLastBlockSeen (BlockEvent lastBlockSeen ) {
448
+ private synchronized void setLastBlockSeen (BlockEvent lastBlockSeen ) {
418
449
long newLastBlockNumber = lastBlockSeen .getBlockNumber ();
419
450
// overkill but make sure.
420
451
if (lastBlockNumber < newLastBlockNumber ) {
421
452
lastBlockNumber = newLastBlockNumber ;
422
453
this .lastBlockEvent = lastBlockSeen ;
454
+ if (IS_TRACE_LEVEL ) {
455
+ logger .trace (toString () + " last block seen: " + lastBlockNumber );
456
+ }
423
457
}
424
458
}
425
459
@@ -446,7 +480,7 @@ public interface EventHubDisconnected {
446
480
@ Override
447
481
public synchronized void disconnected (final EventHub eventHub ) {
448
482
if (reconnectCount == 1 ) {
449
- logger .warn (format ("Channel %s detected disconnect on event hub %s (%s) " , channel . getName (), eventHub .toString (), url ));
483
+ logger .warn (format ("%s detected disconnect. " , eventHub .toString ()));
450
484
}
451
485
452
486
executorService .execute (() -> {
@@ -455,7 +489,7 @@ public synchronized void disconnected(final EventHub eventHub) {
455
489
Thread .sleep (500 );
456
490
457
491
if (transactionContext == null ) {
458
- logger .warn ("Eventhub reconnect failed with no user context" );
492
+ logger .warn (EventHub . this . toString () + " reconnect failed with no user context" );
459
493
return ;
460
494
}
461
495
@@ -485,4 +519,16 @@ public EventHubDisconnected setEventHubDisconnectedHandler(EventHubDisconnected
485
519
return ret ;
486
520
}
487
521
522
+ private void readObject (ObjectInputStream in ) throws IOException , ClassNotFoundException {
523
+ in .defaultReadObject ();
524
+ id = config .getNextID ();
525
+ }
526
+
527
+ public void finalize () throws Throwable {
528
+ logger .trace (format ("%s finalized" , toString ()));
529
+ shutdown ();
530
+ super .finalize ();
531
+
532
+ }
533
+
488
534
}
0 commit comments