44
44
import java .util .concurrent .Semaphore ;
45
45
import java .util .concurrent .TimeUnit ;
46
46
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
47
+ import javax .naming .AuthenticationException ;
47
48
import lombok .AccessLevel ;
48
49
import lombok .Getter ;
49
50
import org .apache .commons .lang3 .exception .ExceptionUtils ;
@@ -176,6 +177,9 @@ public class ClientCnx extends PulsarHandler {
176
177
@ Getter
177
178
private final ClientCnxIdleState idleState ;
178
179
180
+ @ Getter
181
+ private long lastDisconnectedTimestamp ;
182
+
179
183
enum State {
180
184
None , SentConnectFrame , Ready , Failed , Connecting
181
185
}
@@ -253,34 +257,53 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
253
257
} else {
254
258
log .info ("{} Connected through proxy to target broker at {}" , ctx .channel (), proxyToTargetBrokerAddress );
255
259
}
256
- // Send CONNECT command
257
- ctx . writeAndFlush ( newConnectCommand ())
258
- . addListener ( future -> {
259
- if ( future . isSuccess () ) {
260
- if ( log . isDebugEnabled ()) {
261
- log . debug ( "Complete: {}" , future . isSuccess ());
262
- }
263
- state = State . SentConnectFrame ;
264
- } else {
265
- log . warn ( "Error during handshake" , future . cause ());
266
- ctx . close ();
267
- }
268
- }) ;
260
+ handleChannelActive ();
261
+ }
262
+
263
+ protected String getOriginalAuthRole ( ) {
264
+ return null ;
265
+ }
266
+
267
+ protected CompletableFuture < AuthData > getOriginalAuthDataSupplier ( boolean isRefresh ) {
268
+ return CompletableFuture . completedFuture ( null );
269
+ }
270
+
271
+ protected String getOriginalAuthMethod () {
272
+ return null ;
269
273
}
270
274
271
- protected ByteBuf newConnectCommand () throws Exception {
275
+ protected void handleChannelActive () throws Exception {
272
276
// mutual authentication is to auth between `remoteHostName` and this client for this channel.
273
277
// each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
274
278
// and return authData to server.
275
279
authenticationDataProvider = authentication .getAuthData (remoteHostName );
276
280
AuthData authData = authenticationDataProvider .authenticate (AuthData .INIT_AUTH_DATA );
277
- return Commands .newConnect (authentication .getAuthMethodName (), authData , this .protocolVersion ,
278
- PulsarVersion .getVersion (), proxyToTargetBrokerAddress , null , null , null );
281
+ getOriginalAuthDataSupplier (false ).thenAccept (originalAuthData -> {
282
+ ByteBuf byteBuf = Commands .newConnect (authentication .getAuthMethodName (), authData , this .protocolVersion ,
283
+ PulsarVersion .getVersion (), proxyToTargetBrokerAddress , getOriginalAuthRole (), originalAuthData ,
284
+ getOriginalAuthMethod ());
285
+ ctx .writeAndFlush (byteBuf ).addListener (future -> {
286
+ if (future .isSuccess ()) {
287
+ if (log .isDebugEnabled ()) {
288
+ log .debug ("Complete: {}" , future .isSuccess ());
289
+ }
290
+ state = State .SentConnectFrame ;
291
+ } else {
292
+ closeWithException (future .cause ());
293
+ log .error ("Error during handshake" , future .cause ());
294
+ }
295
+ });
296
+ }).exceptionally (e -> {
297
+ closeWithException (e );
298
+ log .error ("Error during handshake" , e );
299
+ return null ;
300
+ });
279
301
}
280
302
281
303
@ Override
282
304
public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
283
305
super .channelInactive (ctx );
306
+ lastDisconnectedTimestamp = System .currentTimeMillis ();
284
307
log .info ("{} Disconnected" , ctx .channel ());
285
308
if (!connectionFuture .isDone ()) {
286
309
connectionFuture .completeExceptionally (new PulsarClientException ("Connection already closed" ));
@@ -361,6 +384,19 @@ protected void handleConnected(CommandConnected connected) {
361
384
state = State .Ready ;
362
385
}
363
386
387
+ protected CompletableFuture <AuthData > getMutualAuthData (CommandAuthChallenge authChallenge ) {
388
+ try {
389
+ return CompletableFuture .completedFuture (
390
+ authenticationDataProvider .authenticate (AuthData .of (authChallenge .getChallenge ().getAuthData ())));
391
+ } catch (AuthenticationException e ) {
392
+ return FutureUtil .failedFuture (e );
393
+ }
394
+ }
395
+
396
+ protected String getMutualAuthMethod () {
397
+ return authentication .getAuthMethodName ();
398
+ }
399
+
364
400
@ Override
365
401
protected void handleAuthChallenge (CommandAuthChallenge authChallenge ) {
366
402
checkArgument (authChallenge .hasChallenge ());
@@ -371,42 +407,39 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
371
407
authenticationDataProvider = authentication .getAuthData (remoteHostName );
372
408
} catch (PulsarClientException e ) {
373
409
log .error ("{} Error when refreshing authentication data provider: {}" , ctx .channel (), e );
374
- connectionFuture . completeExceptionally (e );
410
+ closeWithException (e );
375
411
return ;
376
412
}
377
413
}
378
414
379
- // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
380
415
try {
381
- AuthData authData = authenticationDataProvider
382
- .authenticate (AuthData .of (authChallenge .getChallenge ().getAuthData ()));
383
-
384
- checkState (!authData .isComplete ());
385
-
386
- ByteBuf request = Commands .newAuthResponse (authentication .getAuthMethodName (),
387
- authData ,
388
- this .protocolVersion ,
389
- PulsarVersion .getVersion ());
390
-
391
- if (log .isDebugEnabled ()) {
392
- log .debug ("{} Mutual auth {}" , ctx .channel (), authentication .getAuthMethodName ());
393
- }
416
+ getMutualAuthData (authChallenge ).thenAccept (authData ->{
417
+ checkState (!authData .isComplete ());
418
+ String authMethod = getMutualAuthMethod ();
419
+ if (log .isDebugEnabled ()) {
420
+ log .debug ("{} Mutual auth {}" , ctx .channel (), authMethod );
421
+ }
394
422
395
- ctx .writeAndFlush (request ).addListener (writeFuture -> {
396
- if (!writeFuture .isSuccess ()) {
397
- log .warn ("{} Failed to send request for mutual auth to broker: {}" , ctx .channel (),
398
- writeFuture .cause ().getMessage ());
399
- connectionFuture .completeExceptionally (writeFuture .cause ());
423
+ ByteBuf request =
424
+ Commands .newAuthResponse (authMethod , authData , protocolVersion , PulsarVersion .getVersion ());
425
+ ctx .writeAndFlush (request ).addListener (writeFuture -> {
426
+ if (!writeFuture .isSuccess ()) {
427
+ log .warn ("{} Failed to send request for mutual auth to broker: {}" , ctx .channel (),
428
+ writeFuture .cause ().getMessage ());
429
+ closeWithException (writeFuture .cause ());
430
+ }
431
+ });
432
+ if (state == State .SentConnectFrame ) {
433
+ state = State .Connecting ;
400
434
}
435
+ }).exceptionally (e ->{
436
+ log .error ("{} Error mutual verify: {}" , ctx .channel (), e );
437
+ closeWithException (e );
438
+ return null ;
401
439
});
402
-
403
- if (state == State .SentConnectFrame ) {
404
- state = State .Connecting ;
405
- }
406
440
} catch (Exception e ) {
407
441
log .error ("{} Error mutual verify: {}" , ctx .channel (), e );
408
- connectionFuture .completeExceptionally (e );
409
- return ;
442
+ closeWithException (e );
410
443
}
411
444
}
412
445
@@ -1243,6 +1276,13 @@ public void close() {
1243
1276
}
1244
1277
}
1245
1278
1279
+ protected void closeWithException (Throwable e ) {
1280
+ if (ctx != null ) {
1281
+ ctx .close ();
1282
+ connectionFuture .completeExceptionally (e );
1283
+ }
1284
+ }
1285
+
1246
1286
private void checkRequestTimeout () {
1247
1287
while (!requestTimeoutQueue .isEmpty ()) {
1248
1288
RequestTime request = requestTimeoutQueue .peek ();
0 commit comments