@@ -172,12 +172,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
172
172
private final TopicListService topicListService ;
173
173
private State state ;
174
174
private volatile boolean isActive = true ;
175
+ private SSLSession sslSession ;
175
176
private String authRole = null ;
176
177
private volatile AuthenticationDataSource authenticationData ;
177
178
private AuthenticationProvider authenticationProvider ;
178
179
private AuthenticationState authState ;
179
180
// In case of proxy, if the authentication credentials are forwardable,
180
181
// it will hold the credentials of the original client
182
+ private AuthenticationProvider originalAuthenticationProvider ;
181
183
private AuthenticationState originalAuthState ;
182
184
private AuthenticationDataSource originalAuthData ;
183
185
private boolean pendingAuthChallengeResponse = false ;
@@ -676,73 +678,93 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole
676
678
}
677
679
678
680
// According to auth result, send newConnected or newAuthChallenge command.
679
- private State doAuthentication (AuthData clientData ,
680
- int clientProtocolVersion ,
681
- String clientVersion ) throws Exception {
681
+ private CompletableFuture <Void > doAuthenticationAsync (AuthData clientData , int clientProtocolVersion ,
682
+ String clientVersion ) {
683
+ boolean useOriginalAuthState = (originalAuthState != null );
684
+ if (state == State .Connected ) {
685
+ // For auth challenge, the authentication state requires to be updated.
686
+ if (log .isDebugEnabled ()) {
687
+ log .debug ("Refreshing authenticate state, original auth state: {}, original auth role: {}, "
688
+ + "auth role: {}" ,
689
+ useOriginalAuthState , originalPrincipal , authRole );
690
+ }
691
+ try {
692
+ if (useOriginalAuthState ) {
693
+ originalAuthState =
694
+ originalAuthenticationProvider .newAuthState (clientData , remoteAddress , sslSession );
695
+ } else {
696
+ authState = authenticationProvider .newAuthState (clientData , remoteAddress , sslSession );
697
+ }
698
+ } catch (AuthenticationException e ) {
699
+ return CompletableFuture .failedFuture (e );
700
+ }
701
+ }
682
702
683
703
// The original auth state can only be set on subsequent auth attempts (and only
684
704
// in presence of a proxy and if the proxy is forwarding the credentials).
685
705
// In this case, the re-validation needs to be done against the original client
686
706
// credentials.
687
- boolean useOriginalAuthState = (originalAuthState != null );
688
- AuthenticationState authState = useOriginalAuthState ? originalAuthState : this .authState ;
707
+ AuthenticationState authState = useOriginalAuthState ? originalAuthState : this .authState ;
689
708
String authRole = useOriginalAuthState ? originalPrincipal : this .authRole ;
690
- AuthData brokerData = authState .authenticate (clientData );
691
709
692
- if (log .isDebugEnabled ()) {
693
- log .debug ("Authenticate using original auth state : {}, role = {}" , useOriginalAuthState , authRole );
694
- }
695
-
696
- if (authState .isComplete ()) {
697
- // Authentication has completed. It was either:
698
- // 1. the 1st time the authentication process was done, in which case we'll send
699
- // a `CommandConnected` response
700
- // 2. an authentication refresh, in which case we need to refresh authenticationData
701
-
702
- String newAuthRole = authState .getAuthRole ();
703
-
704
- // Refresh the auth data.
705
- this .authenticationData = authState .getAuthDataSource ();
706
- if (log .isDebugEnabled ()) {
707
- log .debug ("[{}] Auth data refreshed for role={}" , remoteAddress , this .authRole );
708
- }
709
-
710
- if (!useOriginalAuthState ) {
711
- this .authRole = newAuthRole ;
712
- }
713
-
714
- if (log .isDebugEnabled ()) {
715
- log .debug ("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}" ,
716
- remoteAddress , authMethod , this .authRole , originalPrincipal );
717
- }
710
+ CompletableFuture <AuthData > authFuture = CompletableFuture .completedFuture (null );
711
+ if (!authState .isComplete ()) {
712
+ authFuture = authState .authenticateAsync (clientData );
713
+ }
714
+ return authFuture .thenCompose (nextAuthData -> {
715
+ if (nextAuthData == null ) {
716
+ // Authentication has completed. It was either:
717
+ // 1. the 1st time the authentication process was done, in which case we'll send
718
+ // a `CommandConnected` response
719
+ // 2. an authentication refresh, in which case we need to refresh authenticationData and role
720
+ String newAuthRole ;
721
+ try {
722
+ newAuthRole = authState .getAuthRole ();
723
+ } catch (AuthenticationException e ) {
724
+ return CompletableFuture .failedFuture (e );
725
+ }
726
+ AuthenticationDataSource newAuthDataSource = authState .getAuthDataSource ();
727
+ if (useOriginalAuthState ) {
728
+ this .originalAuthData = newAuthDataSource ;
729
+ } else {
730
+ this .authRole = newAuthRole ;
731
+ this .authenticationData = newAuthDataSource ;
732
+ }
718
733
719
- if (state != State .Connected ) {
720
- // First time authentication is done
721
- completeConnect (clientProtocolVersion , clientVersion , enableSubscriptionPatternEvaluation );
722
- } else {
723
- // If the connection was already ready, it means we're doing a refresh
724
- if (!StringUtils .isEmpty (authRole )) {
725
- if (!authRole .equals (newAuthRole )) {
726
- log .warn ("[{}] Principal cannot change during an authentication refresh expected={} got={}" ,
727
- remoteAddress , authRole , newAuthRole );
728
- ctx .close ();
729
- } else {
730
- log .info ("[{}] Refreshed authentication credentials for role {}" , remoteAddress , authRole );
734
+ if (log .isDebugEnabled ()) {
735
+ log .debug ("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}, "
736
+ + "using original auth state: {}" ,
737
+ remoteAddress , authMethod , this .authRole , originalPrincipal , originalAuthState );
738
+ }
739
+ if (state != State .Connected ) {
740
+ // First time authentication is done
741
+ completeConnect (clientProtocolVersion , clientVersion , enableSubscriptionPatternEvaluation );
742
+ } else {
743
+ // If the connection was already ready, it means we're doing a refresh
744
+ if (!StringUtils .isEmpty (authRole )) {
745
+ if (!authRole .equals (newAuthRole )) {
746
+ return CompletableFuture .failedFuture (new AuthenticationException (String .format (
747
+ "Principal cannot change during an authentication refresh expected=%s got=%s" ,
748
+ authRole , newAuthRole )));
749
+ } else {
750
+ log .info ("[{}] Refreshed authentication credentials for role {}" , remoteAddress , authRole );
751
+ }
731
752
}
753
+ state = State .Connected ;
754
+ }
755
+ } else {
756
+ // auth not complete, continue auth with client side.
757
+ ctx .writeAndFlush (
758
+ Commands .newAuthChallenge (authMethod , nextAuthData , clientProtocolVersion ));
759
+ if (log .isDebugEnabled ()) {
760
+ log .debug ("[{}] Authentication in progress client by method {}, using original auth state: {}" ,
761
+ remoteAddress , authMethod , useOriginalAuthState );
762
+ log .debug ("[{}] connect state change to : [{}]" , remoteAddress , State .Connecting .name ());
732
763
}
764
+ state = State .Connecting ;
733
765
}
734
-
735
- return State .Connected ;
736
- }
737
-
738
- // auth not complete, continue auth with client side.
739
- ctx .writeAndFlush (Commands .newAuthChallenge (authMethod , brokerData , clientProtocolVersion ));
740
- if (log .isDebugEnabled ()) {
741
- log .debug ("[{}] Authentication in progress client by method {}." ,
742
- remoteAddress , authMethod );
743
- log .debug ("[{}] connect state change to : [{}]" , remoteAddress , State .Connecting .name ());
744
- }
745
- return State .Connecting ;
766
+ return CompletableFuture .completedFuture (null );
767
+ });
746
768
}
747
769
748
770
public void refreshAuthenticationCredentials () {
@@ -870,72 +892,62 @@ protected void handleConnect(CommandConnect connect) {
870
892
871
893
// init authState and other var
872
894
ChannelHandler sslHandler = ctx .channel ().pipeline ().get (PulsarChannelInitializer .TLS_HANDLER );
873
- SSLSession sslSession = null ;
874
895
if (sslHandler != null ) {
875
896
sslSession = ((SslHandler ) sslHandler ).engine ().getSession ();
876
897
}
877
898
878
899
authState = authenticationProvider .newAuthState (clientData , remoteAddress , sslSession );
900
+ doAuthenticationAsync (clientData , clientProtocolVersion , clientVersion ).thenCompose (__ -> {
901
+ // This will fail the check if:
902
+ // 1. client is coming through a proxy
903
+ // 2. we require to validate the original credentials
904
+ // 3. no credentials were passed
905
+ if (connect .hasOriginalPrincipal ()
906
+ && service .getPulsar ().getConfig ().isAuthenticateOriginalAuthData ()) {
907
+ // init authentication
908
+ String originalAuthMethod ;
909
+ if (connect .hasOriginalAuthMethod ()) {
910
+ originalAuthMethod = connect .getOriginalAuthMethod ();
911
+ } else {
912
+ originalAuthMethod = "none" ;
913
+ }
879
914
880
- if (log .isDebugEnabled ()) {
881
- String role = "" ;
882
- if (authState != null && authState .isComplete ()) {
883
- role = authState .getAuthRole ();
884
- } else {
885
- role = "authentication incomplete or null" ;
886
- }
887
- log .debug ("[{}] Authenticate role : {}" , remoteAddress , role );
888
- }
889
-
890
- state = doAuthentication (clientData , clientProtocolVersion , clientVersion );
891
-
892
- // This will fail the check if:
893
- // 1. client is coming through a proxy
894
- // 2. we require to validate the original credentials
895
- // 3. no credentials were passed
896
- if (connect .hasOriginalPrincipal () && service .getPulsar ().getConfig ().isAuthenticateOriginalAuthData ()) {
897
- // init authentication
898
- String originalAuthMethod ;
899
- if (connect .hasOriginalAuthMethod ()) {
900
- originalAuthMethod = connect .getOriginalAuthMethod ();
901
- } else {
902
- originalAuthMethod = "none" ;
903
- }
904
-
905
- AuthenticationProvider originalAuthenticationProvider = getBrokerService ()
906
- .getAuthenticationService ()
907
- .getAuthenticationProvider (originalAuthMethod );
915
+ originalAuthenticationProvider =
916
+ getBrokerService ().getAuthenticationService ().getAuthenticationProvider (originalAuthMethod );
908
917
909
- if (originalAuthenticationProvider == null ) {
910
- throw new AuthenticationException (
911
- String . format ( "Can't find AuthenticationProvider for original role"
912
- + " using auth method [%s] is not available" , originalAuthMethod ));
913
- }
918
+ if (originalAuthenticationProvider == null ) {
919
+ return CompletableFuture . failedFuture ( new AuthenticationException ( String . format (
920
+ "Can't find AuthenticationProvider for original role"
921
+ + " using auth method [%s] is not available" , originalAuthMethod ) ));
922
+ }
914
923
915
- originalAuthState = originalAuthenticationProvider .newAuthState (
916
- AuthData .of (connect .getOriginalAuthData ().getBytes ()),
917
- remoteAddress ,
918
- sslSession );
919
- originalAuthData = originalAuthState .getAuthDataSource ();
920
- originalPrincipal = originalAuthState .getAuthRole ();
924
+ try {
925
+ originalAuthState = originalAuthenticationProvider .newAuthState (
926
+ AuthData .of (connect .getOriginalAuthData ().getBytes ()), remoteAddress , sslSession );
927
+ originalAuthData = originalAuthState .getAuthDataSource ();
928
+ originalPrincipal = originalAuthState .getAuthRole ();
921
929
922
- if (log .isDebugEnabled ()) {
923
- log .debug ("[{}] Authenticate original role : {}" , remoteAddress , originalPrincipal );
924
- }
925
- } else {
926
- originalPrincipal = connect .hasOriginalPrincipal () ? connect .getOriginalPrincipal () : null ;
930
+ if (log .isDebugEnabled ()) {
931
+ log .debug ("[{}] Authenticate original role : {}" , remoteAddress , originalPrincipal );
932
+ }
933
+ } catch (AuthenticationException e ) {
934
+ return CompletableFuture .failedFuture (e );
935
+ }
936
+ } else {
937
+ originalPrincipal = connect .hasOriginalPrincipal () ? connect .getOriginalPrincipal () : null ;
927
938
928
- if (log .isDebugEnabled ()) {
929
- log .debug ("[{}] Authenticate original role (forwarded from proxy): {}" ,
930
- remoteAddress , originalPrincipal );
939
+ if (log .isDebugEnabled ()) {
940
+ log .debug ("[{}] Authenticate original role (forwarded from proxy): {}" , remoteAddress ,
941
+ originalPrincipal );
942
+ }
931
943
}
932
- }
944
+ return CompletableFuture .completedFuture (null );
945
+ }).exceptionally (e -> {
946
+ closeWithAuthException ("connect" , e );
947
+ return null ;
948
+ });
933
949
} catch (Exception e ) {
934
- service .getPulsarStats ().recordConnectionCreateFail ();
935
- logAuthException (remoteAddress , "connect" , getPrincipal (), Optional .empty (), e );
936
- String msg = "Unable to authenticate" ;
937
- ctx .writeAndFlush (Commands .newError (-1 , ServerError .AuthenticationError , msg ));
938
- close ();
950
+ closeWithAuthException ("connect" , e );
939
951
}
940
952
}
941
953
@@ -951,22 +963,14 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
951
963
remoteAddress , authResponse .getResponse ().getAuthMethodName ());
952
964
}
953
965
954
- try {
955
- AuthData clientData = AuthData .of (authResponse .getResponse ().getAuthData ());
956
- doAuthentication (clientData , authResponse .getProtocolVersion (),
957
- authResponse .hasClientVersion () ? authResponse .getClientVersion () : EMPTY );
958
- } catch (AuthenticationException e ) {
959
- service .getPulsarStats ().recordConnectionCreateFail ();
960
- log .warn ("[{}] Authentication failed: {} " , remoteAddress , e .getMessage ());
961
- ctx .writeAndFlush (Commands .newError (-1 , ServerError .AuthenticationError , e .getMessage ()));
962
- close ();
963
- } catch (Exception e ) {
964
- service .getPulsarStats ().recordConnectionCreateFail ();
965
- String msg = "Unable to handleAuthResponse" ;
966
- log .warn ("[{}] {} " , remoteAddress , msg , e );
967
- ctx .writeAndFlush (Commands .newError (-1 , ServerError .UnknownError , msg ));
968
- close ();
969
- }
966
+ AuthData clientData = AuthData .of (authResponse .getResponse ().getAuthData ());
967
+ String clientVersion = authResponse .hasClientVersion () ? authResponse .getClientVersion () : EMPTY ;
968
+ doAuthenticationAsync (clientData , authResponse .getProtocolVersion (), clientVersion )
969
+ .whenComplete ((__ , e ) -> {
970
+ if (e != null ) {
971
+ closeWithAuthException ("authResponse" , e );
972
+ }
973
+ });
970
974
}
971
975
972
976
@ Override
@@ -3075,11 +3079,19 @@ public String clientSourceAddress() {
3075
3079
}
3076
3080
}
3077
3081
3082
+ private void closeWithAuthException (String operation , Throwable e ) {
3083
+ Throwable unwrapEx = FutureUtil .unwrapCompletionException (e );
3084
+ service .getPulsarStats ().recordConnectionCreateFail ();
3085
+ logAuthException (remoteAddress , operation , getPrincipal (), Optional .empty (), unwrapEx );
3086
+ ctx .writeAndFlush (Commands .newError (-1 , ServerError .AuthenticationError , unwrapEx .getMessage ()));
3087
+ close ();
3088
+ }
3089
+
3078
3090
private static void logAuthException (SocketAddress remoteAddress , String operation ,
3079
3091
String principal , Optional <TopicName > topic , Throwable ex ) {
3080
3092
String topicString = topic .map (t -> ", topic=" + t .toString ()).orElse ("" );
3081
3093
if (ex instanceof AuthenticationException ) {
3082
- log .info ("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}" ,
3094
+ log .error ("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}" ,
3083
3095
remoteAddress , operation , principal , topicString , ex .getMessage ());
3084
3096
} else {
3085
3097
log .error ("[{}] Error trying to authenticate: operation={}, principal={}{}" ,
0 commit comments