44
44
import java .util .concurrent .Semaphore ;
45
45
import java .util .concurrent .TimeUnit ;
46
46
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
47
+ import javax .naming .AuthenticationException ;
47
48
import lombok .AccessLevel ;
48
49
import lombok .Getter ;
49
50
import org .apache .commons .lang3 .exception .ExceptionUtils ;
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
253
254
} else {
254
255
log .info ("{} Connected through proxy to target broker at {}" , ctx .channel (), proxyToTargetBrokerAddress );
255
256
}
256
- // Send CONNECT command
257
- ctx .writeAndFlush (newConnectCommand ())
257
+ completeActive ();
258
+ }
259
+
260
+ protected void completeActive () throws Exception {
261
+ sendConnectCommand (null , null , null );
262
+ }
263
+
264
+ protected final void sendConnectCommand (String originalPrincipal , AuthData originalAuthData ,
265
+ String originalAuthMethod ) throws Exception {
266
+ // mutual authentication is to auth between `remoteHostName` and this client for this channel.
267
+ // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
268
+ // and return authData to server.
269
+ authenticationDataProvider = authentication .getAuthData (remoteHostName );
270
+ AuthData authData = authenticationDataProvider .authenticate (AuthData .INIT_AUTH_DATA );
271
+
272
+ ByteBuf byteBuf = Commands .newConnect (authentication .getAuthMethodName (), authData , this .protocolVersion ,
273
+ PulsarVersion .getVersion (), proxyToTargetBrokerAddress , originalPrincipal , originalAuthData ,
274
+ originalAuthMethod );
275
+ ctx .writeAndFlush (byteBuf )
258
276
.addListener (future -> {
259
277
if (future .isSuccess ()) {
260
278
if (log .isDebugEnabled ()) {
@@ -268,16 +286,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
268
286
});
269
287
}
270
288
271
- protected ByteBuf newConnectCommand () throws Exception {
272
- // mutual authentication is to auth between `remoteHostName` and this client for this channel.
273
- // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
274
- // and return authData to server.
275
- authenticationDataProvider = authentication .getAuthData (remoteHostName );
276
- AuthData authData = authenticationDataProvider .authenticate (AuthData .INIT_AUTH_DATA );
277
- return Commands .newConnect (authentication .getAuthMethodName (), authData , this .protocolVersion ,
278
- PulsarVersion .getVersion (), proxyToTargetBrokerAddress , null , null , null );
279
- }
280
-
281
289
@ Override
282
290
public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
283
291
super .channelInactive (ctx );
@@ -361,51 +369,54 @@ protected void handleConnected(CommandConnected connected) {
361
369
state = State .Ready ;
362
370
}
363
371
364
- @ Override
365
- protected void handleAuthChallenge ( CommandAuthChallenge authChallenge ) {
366
- checkArgument ( authChallenge . hasChallenge () );
367
- checkArgument ( authChallenge . getChallenge (). hasAuthData ());
372
+ protected final void sendMutualAuthCommand ( String authMethod , AuthData authData ) {
373
+ if ( log . isDebugEnabled () ) {
374
+ log . debug ( "{} Mutual auth {}" , ctx . channel (), authMethod );
375
+ }
368
376
377
+ ByteBuf request = Commands .newAuthResponse (authMethod ,
378
+ authData ,
379
+ this .protocolVersion ,
380
+ PulsarVersion .getVersion ());
381
+
382
+ ctx .writeAndFlush (request ).addListener (writeFuture -> {
383
+ if (!writeFuture .isSuccess ()) {
384
+ log .warn ("{} Failed to send request for mutual auth to broker: {}" , ctx .channel (),
385
+ writeFuture .cause ().getMessage ());
386
+ close (writeFuture .cause ());
387
+ }
388
+ });
389
+ }
390
+
391
+ protected void prepareMutualAuth (CommandAuthChallenge authChallenge ) throws AuthenticationException {
369
392
if (Arrays .equals (AuthData .REFRESH_AUTH_DATA_BYTES , authChallenge .getChallenge ().getAuthData ())) {
370
393
try {
371
394
authenticationDataProvider = authentication .getAuthData (remoteHostName );
372
395
} catch (PulsarClientException e ) {
373
396
log .error ("{} Error when refreshing authentication data provider: {}" , ctx .channel (), e );
374
- connectionFuture . completeExceptionally (e );
397
+ close (e );
375
398
return ;
376
399
}
377
400
}
378
-
379
401
// mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
380
- try {
381
- AuthData authData = authenticationDataProvider
382
- .authenticate (AuthData .of (authChallenge .getChallenge ().getAuthData ()));
383
-
384
- checkState (!authData .isComplete ());
385
-
386
- ByteBuf request = Commands .newAuthResponse (authentication .getAuthMethodName (),
387
- authData ,
388
- this .protocolVersion ,
389
- PulsarVersion .getVersion ());
390
-
391
- if (log .isDebugEnabled ()) {
392
- log .debug ("{} Mutual auth {}" , ctx .channel (), authentication .getAuthMethodName ());
393
- }
394
-
395
- ctx .writeAndFlush (request ).addListener (writeFuture -> {
396
- if (!writeFuture .isSuccess ()) {
397
- log .warn ("{} Failed to send request for mutual auth to broker: {}" , ctx .channel (),
398
- writeFuture .cause ().getMessage ());
399
- connectionFuture .completeExceptionally (writeFuture .cause ());
400
- }
401
- });
402
+ AuthData authData =
403
+ authenticationDataProvider .authenticate (AuthData .of (authChallenge .getChallenge ().getAuthData ()));
404
+ checkState (!authData .isComplete ());
405
+ sendMutualAuthCommand (authentication .getAuthMethodName (), authData );
406
+ }
402
407
408
+ @ Override
409
+ protected void handleAuthChallenge (CommandAuthChallenge authChallenge ) {
410
+ checkArgument (authChallenge .hasChallenge ());
411
+ checkArgument (authChallenge .getChallenge ().hasAuthData ());
412
+ try {
413
+ prepareMutualAuth (authChallenge );
403
414
if (state == State .SentConnectFrame ) {
404
415
state = State .Connecting ;
405
416
}
406
417
} catch (Exception e ) {
407
418
log .error ("{} Error mutual verify: {}" , ctx .channel (), e );
408
- connectionFuture . completeExceptionally (e );
419
+ close (e );
409
420
return ;
410
421
}
411
422
}
@@ -1243,6 +1254,13 @@ public void close() {
1243
1254
}
1244
1255
}
1245
1256
1257
+ public void close (Throwable e ) {
1258
+ if (ctx != null ) {
1259
+ ctx .close ();
1260
+ connectionFuture .completeExceptionally (e );
1261
+ }
1262
+ }
1263
+
1246
1264
private void checkRequestTimeout () {
1247
1265
while (!requestTimeoutQueue .isEmpty ()) {
1248
1266
RequestTime request = requestTimeoutQueue .peek ();
0 commit comments