27
27
import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .ChannelState .Constructed ;
28
28
import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .ChannelState .LeaderElectionServiceStarted ;
29
29
import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .ChannelState .Started ;
30
+ import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .EventType .Assign ;
31
+ import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .EventType .Split ;
32
+ import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .EventType .Unload ;
30
33
import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .MetadataState .Jittery ;
31
34
import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .MetadataState .Stable ;
32
35
import static org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl .MetadataState .Unstable ;
44
47
import java .util .concurrent .TimeUnit ;
45
48
import java .util .concurrent .TimeoutException ;
46
49
import java .util .concurrent .atomic .AtomicLong ;
50
+ import lombok .AllArgsConstructor ;
51
+ import lombok .Getter ;
47
52
import lombok .extern .slf4j .Slf4j ;
48
53
import org .apache .commons .lang3 .StringUtils ;
49
54
import org .apache .commons .lang3 .mutable .MutableInt ;
@@ -107,6 +112,40 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
107
112
private long totalCleanupCancelledCnt = 0 ;
108
113
private volatile ChannelState channelState ;
109
114
115
+ enum EventType {
116
+ Assign ,
117
+ Split ,
118
+ Unload
119
+ }
120
+
121
+ @ Getter
122
+ @ AllArgsConstructor
123
+ static class Counters {
124
+ private AtomicLong total ;
125
+ private AtomicLong failure ;
126
+ }
127
+
128
+ // operation metrics
129
+ final Map <ServiceUnitState , AtomicLong > ownerLookUpCounters = Map .of (
130
+ Owned , new AtomicLong (),
131
+ Assigned , new AtomicLong (),
132
+ Released , new AtomicLong (),
133
+ Splitting , new AtomicLong (),
134
+ Free , new AtomicLong ()
135
+ );
136
+ final Map <EventType , Counters > eventCounters = Map .of (
137
+ Assign , new Counters (new AtomicLong (), new AtomicLong ()),
138
+ Split , new Counters (new AtomicLong (), new AtomicLong ()),
139
+ Unload , new Counters (new AtomicLong (), new AtomicLong ())
140
+ );
141
+ final Map <ServiceUnitState , Counters > handlerCounters = Map .of (
142
+ Owned , new Counters (new AtomicLong (), new AtomicLong ()),
143
+ Assigned , new Counters (new AtomicLong (), new AtomicLong ()),
144
+ Released , new Counters (new AtomicLong (), new AtomicLong ()),
145
+ Splitting , new Counters (new AtomicLong (), new AtomicLong ()),
146
+ Free , new Counters (new AtomicLong (), new AtomicLong ())
147
+ );
148
+
110
149
enum ChannelState {
111
150
Closed (0 ),
112
151
Constructed (1 ),
@@ -279,67 +318,112 @@ private boolean isChannelOwner() {
279
318
public CompletableFuture <String > getOwnerAsync (String serviceUnit ) {
280
319
validateChannelState (Started , true );
281
320
ServiceUnitStateData data = tableview .get (serviceUnit );
282
- if (data == null ) {
283
- return CompletableFuture .completedFuture (null );
284
- }
285
- switch (data .state ()) {
321
+ ServiceUnitState state = data == null ? Free : data .state ();
322
+ ownerLookUpCounters .get (state ).incrementAndGet ();
323
+ switch (state ) {
286
324
case Owned , Splitting -> {
287
325
return CompletableFuture .completedFuture (data .broker ());
288
326
}
289
327
case Assigned , Released -> {
290
328
return deferGetOwnerRequest (serviceUnit );
291
329
}
330
+ case Free -> {
331
+ return CompletableFuture .completedFuture (null );
332
+ }
292
333
default -> {
293
- return null ;
334
+ throw new IllegalStateException ( "Invalid service unit state:" + data . state ()) ;
294
335
}
295
336
}
296
337
}
297
338
298
339
public CompletableFuture <String > publishAssignEventAsync (String serviceUnit , String broker ) {
299
- CompletableFuture <String > getOwnerRequest = deferGetOwnerRequest (serviceUnit );
300
- pubAsync (serviceUnit , new ServiceUnitStateData (Assigned , broker ))
301
- .whenComplete ((__ , ex ) -> {
302
- if (ex != null ) {
303
- getOwnerRequests .remove (serviceUnit , getOwnerRequest );
304
- if (!getOwnerRequest .isCompletedExceptionally ()) {
305
- getOwnerRequest .completeExceptionally (ex );
340
+ EventType eventType = Assign ;
341
+ eventCounters .get (eventType ).getTotal ().incrementAndGet ();
342
+ try {
343
+ CompletableFuture <String > getOwnerRequest = deferGetOwnerRequest (serviceUnit );
344
+ pubAsync (serviceUnit , new ServiceUnitStateData (Assigned , broker ))
345
+ .whenComplete ((__ , ex ) -> {
346
+ if (ex != null ) {
347
+ getOwnerRequests .remove (serviceUnit , getOwnerRequest );
348
+ if (!getOwnerRequest .isCompletedExceptionally ()) {
349
+ getOwnerRequest .completeExceptionally (ex );
350
+ }
351
+ eventCounters .get (eventType ).getFailure ().incrementAndGet ();
306
352
}
307
- }
308
- });
309
-
310
- return getOwnerRequest ;
353
+ });
354
+ return getOwnerRequest ;
355
+ } catch (Throwable e ) {
356
+ log .error ("Failed to publish assign event. serviceUnit:{}, broker:{}, assignPublishFailureCount:{}" ,
357
+ serviceUnit , broker , eventCounters .get (eventType ).getFailure ().incrementAndGet (), e );
358
+ throw e ;
359
+ }
311
360
}
312
361
313
362
public CompletableFuture <Void > publishUnloadEventAsync (Unload unload ) {
314
- String serviceUnit = unload .serviceUnit ();
315
- if (isTransferCommand (unload )) {
316
- ServiceUnitStateData next = new ServiceUnitStateData (Assigned ,
317
- unload .destBroker ().get (), unload .sourceBroker ());
318
- return pubAsync (serviceUnit , next ).thenApply (__ -> null );
363
+ EventType eventType = Unload ;
364
+ eventCounters .get (eventType ).getTotal ().incrementAndGet ();
365
+ try {
366
+ String serviceUnit = unload .serviceUnit ();
367
+ CompletableFuture <MessageId > future ;
368
+ if (isTransferCommand (unload )) {
369
+ ServiceUnitStateData next = new ServiceUnitStateData (Assigned ,
370
+ unload .destBroker ().get (), unload .sourceBroker ());
371
+ future = pubAsync (serviceUnit , next );
372
+ } else {
373
+ future = tombstoneAsync (serviceUnit );
374
+ }
375
+
376
+ return future .whenComplete ((__ , ex ) -> {
377
+ if (ex != null ) {
378
+ eventCounters .get (eventType ).getFailure ().incrementAndGet ();
379
+ }
380
+ }).thenApply (__ -> null );
381
+ } catch (Throwable e ) {
382
+ log .error ("Failed to publish unload event. unload:{}. unloadPublishFailureCount:{}" ,
383
+ unload , eventCounters .get (eventType ).getFailure ().incrementAndGet (), e );
384
+ throw e ;
319
385
}
320
- return tombstoneAsync (serviceUnit ).thenApply (__ -> null );
321
386
}
322
387
323
388
public CompletableFuture <Void > publishSplitEventAsync (Split split ) {
324
- String serviceUnit = split .serviceUnit ();
325
- ServiceUnitStateData next = new ServiceUnitStateData (Splitting , split .sourceBroker ());
326
- return pubAsync (serviceUnit , next ).thenApply (__ -> null );
389
+ EventType eventType = Split ;
390
+ eventCounters .get (eventType ).getTotal ().incrementAndGet ();
391
+ try {
392
+ String serviceUnit = split .serviceUnit ();
393
+ ServiceUnitStateData next = new ServiceUnitStateData (Splitting , split .sourceBroker ());
394
+ return pubAsync (serviceUnit , next ).whenComplete ((__ , ex ) -> {
395
+ if (ex != null ) {
396
+ eventCounters .get (eventType ).getFailure ().incrementAndGet ();
397
+ }
398
+ }).thenApply (__ -> null );
399
+ } catch (Throwable e ) {
400
+ log .error ("Failed to publish split event. split:{}, splitPublishFailureCount:{}" ,
401
+ split , eventCounters .get (eventType ).getFailure ().incrementAndGet (), e );
402
+ throw e ;
403
+ }
327
404
}
328
405
329
406
private void handle (String serviceUnit , ServiceUnitStateData data ) {
407
+ long totalHandledRequests = getHandlerTotalCounter (data ).incrementAndGet ();
330
408
if (log .isDebugEnabled ()) {
331
- log .info ("{} received a handle request for serviceUnit:{}, data:{}" ,
332
- lookupServiceAddress , serviceUnit , data );
409
+ log .info ("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{} " ,
410
+ lookupServiceAddress , serviceUnit , data , totalHandledRequests );
333
411
}
334
412
335
413
ServiceUnitState state = data == null ? Free : data .state ();
336
- switch (state ) {
337
- case Owned -> handleOwnEvent (serviceUnit , data );
338
- case Assigned -> handleAssignEvent (serviceUnit , data );
339
- case Released -> handleReleaseEvent (serviceUnit , data );
340
- case Splitting -> handleSplitEvent (serviceUnit , data );
341
- case Free -> handleFreeEvent (serviceUnit );
342
- default -> throw new IllegalStateException ("Failed to handle channel data:" + data );
414
+ try {
415
+ switch (state ) {
416
+ case Owned -> handleOwnEvent (serviceUnit , data );
417
+ case Assigned -> handleAssignEvent (serviceUnit , data );
418
+ case Released -> handleReleaseEvent (serviceUnit , data );
419
+ case Splitting -> handleSplitEvent (serviceUnit , data );
420
+ case Free -> handleFreeEvent (serviceUnit );
421
+ default -> throw new IllegalStateException ("Failed to handle channel data:" + data );
422
+ }
423
+ } catch (Throwable e ){
424
+ getHandlerFailureCounter (data ).incrementAndGet ();
425
+ log .error ("Failed to handle the event. serviceUnit:{}, data:{}" , serviceUnit , data , e );
426
+ throw e ;
343
427
}
344
428
}
345
429
@@ -359,19 +443,46 @@ private static String getLogEventTag(ServiceUnitStateData data) {
359
443
isTransferCommand (data ) ? "Transfer:" + data .state () : data .state ().toString ();
360
444
}
361
445
446
+ private AtomicLong getHandlerTotalCounter (ServiceUnitStateData data ) {
447
+ return getHandlerCounter (data , true );
448
+ }
449
+
450
+ private AtomicLong getHandlerFailureCounter (ServiceUnitStateData data ) {
451
+ return getHandlerCounter (data , false );
452
+ }
453
+
454
+ private AtomicLong getHandlerCounter (ServiceUnitStateData data , boolean total ) {
455
+ var state = data .state () == null ? Free : data .state ();
456
+ var counter = total
457
+ ? handlerCounters .get (state ).getTotal () : handlerCounters .get (state ).getFailure ();
458
+ if (counter == null ) {
459
+ throw new IllegalStateException ("Unknown state:" + state );
460
+ }
461
+ return counter ;
462
+ }
463
+
362
464
private void log (Throwable e , String serviceUnit , ServiceUnitStateData data , ServiceUnitStateData next ) {
363
465
if (e == null ) {
364
466
if (log .isDebugEnabled () || isTransferCommand (data )) {
365
- log .info ("{} handled {} event for serviceUnit:{}, cur:{}, next:{}" ,
467
+ long handlerTotalCount = getHandlerTotalCounter (data ).get ();
468
+ long handlerFailureCount = getHandlerFailureCounter (data ).get ();
469
+ log .info ("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
470
+ + "totalHandledRequests{}, totalFailedRequests:{}" ,
366
471
lookupServiceAddress , getLogEventTag (data ), serviceUnit ,
367
472
data == null ? "" : data ,
368
- next == null ? "" : next );
473
+ next == null ? "" : next ,
474
+ handlerTotalCount , handlerFailureCount
475
+ );
369
476
}
370
477
} else {
371
- log .error ("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}" ,
478
+ long handlerTotalCount = getHandlerTotalCounter (data ).get ();
479
+ long handlerFailureCount = getHandlerFailureCounter (data ).incrementAndGet ();
480
+ log .error ("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}, "
481
+ + "totalHandledRequests{}, totalFailedRequests:{}" ,
372
482
lookupServiceAddress , getLogEventTag (data ), serviceUnit ,
373
483
data == null ? "" : data ,
374
484
next == null ? "" : next ,
485
+ handlerTotalCount , handlerFailureCount ,
375
486
e );
376
487
}
377
488
}
@@ -384,7 +495,6 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
384
495
if (isTargetBroker (data .broker ())) {
385
496
log (null , serviceUnit , data , null );
386
497
}
387
-
388
498
}
389
499
390
500
private void handleAssignEvent (String serviceUnit , ServiceUnitStateData data ) {
@@ -398,7 +508,6 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
398
508
}
399
509
400
510
private void handleReleaseEvent (String serviceUnit , ServiceUnitStateData data ) {
401
-
402
511
if (isTargetBroker (data .sourceBroker ())) {
403
512
ServiceUnitStateData next = new ServiceUnitStateData (Owned , data .broker (), data .sourceBroker ());
404
513
// TODO: when close, pass message to clients to connect to the new broker
0 commit comments