@@ -172,12 +172,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
172
172
private final TopicListService topicListService ;
173
173
private State state ;
174
174
private volatile boolean isActive = true ;
175
+ private SSLSession sslSession ;
175
176
private String authRole = null ;
176
177
private volatile AuthenticationDataSource authenticationData ;
177
178
private AuthenticationProvider authenticationProvider ;
178
179
private AuthenticationState authState ;
179
180
// In case of proxy, if the authentication credentials are forwardable,
180
181
// it will hold the credentials of the original client
182
+ private AuthenticationProvider originalAuthenticationProvider ;
181
183
private AuthenticationState originalAuthState ;
182
184
private AuthenticationDataSource originalAuthData ;
183
185
private boolean pendingAuthChallengeResponse = false ;
@@ -676,73 +678,90 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole
676
678
}
677
679
678
680
// According to auth result, send newConnected or newAuthChallenge command.
679
- private State doAuthentication (AuthData clientData ,
680
- int clientProtocolVersion ,
681
- String clientVersion ) throws Exception {
681
+ private CompletableFuture <Void > doAuthentication (AuthData clientData , int clientProtocolVersion ,
682
+ String clientVersion ) throws AuthenticationException {
683
+ boolean useOriginalAuthState = (originalAuthState != null );
684
+ if (state == State .Connected ) {
685
+ // For auth challenge, the authentication state requires to be updated.
686
+ if (log .isDebugEnabled ()) {
687
+ log .debug ("Refreshing authenticate state, original auth state: {}, original auth role: {}, "
688
+ + "auth role: {}" ,
689
+ useOriginalAuthState , originalPrincipal , authRole );
690
+ }
691
+ if (useOriginalAuthState ) {
692
+ originalAuthState = originalAuthenticationProvider .newAuthState (clientData , remoteAddress , sslSession );
693
+ } else {
694
+ authState = authenticationProvider .newAuthState (clientData , remoteAddress , sslSession );
695
+ }
696
+ }
682
697
683
698
// The original auth state can only be set on subsequent auth attempts (and only
684
699
// in presence of a proxy and if the proxy is forwarding the credentials).
685
700
// In this case, the re-validation needs to be done against the original client
686
701
// credentials.
687
- boolean useOriginalAuthState = (originalAuthState != null );
688
- AuthenticationState authState = useOriginalAuthState ? originalAuthState : this .authState ;
702
+ AuthenticationState authState = useOriginalAuthState ? originalAuthState : this .authState ;
689
703
String authRole = useOriginalAuthState ? originalPrincipal : this .authRole ;
690
- AuthData brokerData = authState .authenticate (clientData );
691
-
692
- if (log .isDebugEnabled ()) {
693
- log .debug ("Authenticate using original auth state : {}, role = {}" , useOriginalAuthState , authRole );
694
- }
695
704
696
- if (authState .isComplete ()) {
697
- // Authentication has completed. It was either:
698
- // 1. the 1st time the authentication process was done, in which case we'll send
699
- // a `CommandConnected` response
700
- // 2. an authentication refresh, in which case we need to refresh authenticationData
701
-
702
- String newAuthRole = authState .getAuthRole ();
703
-
704
- // Refresh the auth data.
705
- this .authenticationData = authState .getAuthDataSource ();
706
- if (log .isDebugEnabled ()) {
707
- log .debug ("[{}] Auth data refreshed for role={}" , remoteAddress , this .authRole );
708
- }
709
-
710
- if (!useOriginalAuthState ) {
711
- this .authRole = newAuthRole ;
712
- }
713
-
714
- if (log .isDebugEnabled ()) {
715
- log .debug ("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}" ,
716
- remoteAddress , authMethod , this .authRole , originalPrincipal );
717
- }
705
+ CompletableFuture <AuthData > authFuture ;
706
+ if (!authState .isComplete ()) {
707
+ authFuture = authState .authenticateAsync (clientData );
708
+ } else {
709
+ authFuture = CompletableFuture .completedFuture (null );
710
+ }
711
+ return authFuture .thenCompose (nextAuthData -> {
712
+ if (nextAuthData == null ) {
713
+ // Authentication has completed. It was either:
714
+ // 1. the 1st time the authentication process was done, in which case we'll send
715
+ // a `CommandConnected` response
716
+ // 2. an authentication refresh, in which case we need to refresh authenticationData and role
717
+ String newAuthRole ;
718
+ try {
719
+ newAuthRole = authState .getAuthRole ();
720
+ } catch (AuthenticationException e ) {
721
+ return CompletableFuture .failedFuture (e );
722
+ }
723
+ AuthenticationDataSource newAuthDataSource = authState .getAuthDataSource ();
724
+ if (useOriginalAuthState ) {
725
+ this .originalAuthData = newAuthDataSource ;
726
+ } else {
727
+ this .authRole = newAuthRole ;
728
+ this .authenticationData = newAuthDataSource ;
729
+ }
718
730
719
- if (state != State .Connected ) {
720
- // First time authentication is done
721
- completeConnect (clientProtocolVersion , clientVersion , enableSubscriptionPatternEvaluation );
722
- } else {
723
- // If the connection was already ready, it means we're doing a refresh
724
- if (!StringUtils .isEmpty (authRole )) {
725
- if (!authRole .equals (newAuthRole )) {
726
- log .warn ("[{}] Principal cannot change during an authentication refresh expected={} got={}" ,
727
- remoteAddress , authRole , newAuthRole );
728
- ctx .close ();
729
- } else {
730
- log .info ("[{}] Refreshed authentication credentials for role {}" , remoteAddress , authRole );
731
+ if (log .isDebugEnabled ()) {
732
+ log .debug ("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}, "
733
+ + "using original auth state: {}" ,
734
+ remoteAddress , authMethod , this .authRole , originalPrincipal , originalAuthState );
735
+ }
736
+ if (state != State .Connected ) {
737
+ // First time authentication is done
738
+ completeConnect (clientProtocolVersion , clientVersion , enableSubscriptionPatternEvaluation );
739
+ } else {
740
+ // If the connection was already ready, it means we're doing a refresh
741
+ if (!StringUtils .isEmpty (authRole )) {
742
+ if (!authRole .equals (newAuthRole )) {
743
+ return CompletableFuture .failedFuture (new AuthenticationException (String .format (
744
+ "Principal cannot change during an authentication refresh expected=%s got=%s" ,
745
+ authRole , newAuthRole )));
746
+ } else {
747
+ log .info ("[{}] Refreshed authentication credentials for role {}" , remoteAddress , authRole );
748
+ }
731
749
}
750
+ state = State .Connected ;
751
+ }
752
+ } else {
753
+ // auth not complete, continue auth with client side.
754
+ ctx .writeAndFlush (
755
+ Commands .newAuthChallenge (authMethod , nextAuthData , clientProtocolVersion ));
756
+ if (log .isDebugEnabled ()) {
757
+ log .debug ("[{}] Authentication in progress client by method {}, using original auth state: {}" ,
758
+ remoteAddress , authMethod , useOriginalAuthState );
759
+ log .debug ("[{}] connect state change to : [{}]" , remoteAddress , State .Connecting .name ());
732
760
}
761
+ state = State .Connecting ;
733
762
}
734
-
735
- return State .Connected ;
736
- }
737
-
738
- // auth not complete, continue auth with client side.
739
- ctx .writeAndFlush (Commands .newAuthChallenge (authMethod , brokerData , clientProtocolVersion ));
740
- if (log .isDebugEnabled ()) {
741
- log .debug ("[{}] Authentication in progress client by method {}." ,
742
- remoteAddress , authMethod );
743
- log .debug ("[{}] connect state change to : [{}]" , remoteAddress , State .Connecting .name ());
744
- }
745
- return State .Connecting ;
763
+ return CompletableFuture .completedFuture (null );
764
+ });
746
765
}
747
766
748
767
public void refreshAuthenticationCredentials () {
@@ -870,72 +889,62 @@ protected void handleConnect(CommandConnect connect) {
870
889
871
890
// init authState and other var
872
891
ChannelHandler sslHandler = ctx .channel ().pipeline ().get (PulsarChannelInitializer .TLS_HANDLER );
873
- SSLSession sslSession = null ;
874
892
if (sslHandler != null ) {
875
893
sslSession = ((SslHandler ) sslHandler ).engine ().getSession ();
876
894
}
877
895
878
896
authState = authenticationProvider .newAuthState (clientData , remoteAddress , sslSession );
897
+ doAuthentication (clientData , clientProtocolVersion , clientVersion ).thenCompose (__ -> {
898
+ // This will fail the check if:
899
+ // 1. client is coming through a proxy
900
+ // 2. we require to validate the original credentials
901
+ // 3. no credentials were passed
902
+ if (connect .hasOriginalPrincipal ()
903
+ && service .getPulsar ().getConfig ().isAuthenticateOriginalAuthData ()) {
904
+ // init authentication
905
+ String originalAuthMethod ;
906
+ if (connect .hasOriginalAuthMethod ()) {
907
+ originalAuthMethod = connect .getOriginalAuthMethod ();
908
+ } else {
909
+ originalAuthMethod = "none" ;
910
+ }
879
911
880
- if (log .isDebugEnabled ()) {
881
- String role = "" ;
882
- if (authState != null && authState .isComplete ()) {
883
- role = authState .getAuthRole ();
884
- } else {
885
- role = "authentication incomplete or null" ;
886
- }
887
- log .debug ("[{}] Authenticate role : {}" , remoteAddress , role );
888
- }
889
-
890
- state = doAuthentication (clientData , clientProtocolVersion , clientVersion );
891
-
892
- // This will fail the check if:
893
- // 1. client is coming through a proxy
894
- // 2. we require to validate the original credentials
895
- // 3. no credentials were passed
896
- if (connect .hasOriginalPrincipal () && service .getPulsar ().getConfig ().isAuthenticateOriginalAuthData ()) {
897
- // init authentication
898
- String originalAuthMethod ;
899
- if (connect .hasOriginalAuthMethod ()) {
900
- originalAuthMethod = connect .getOriginalAuthMethod ();
901
- } else {
902
- originalAuthMethod = "none" ;
903
- }
904
-
905
- AuthenticationProvider originalAuthenticationProvider = getBrokerService ()
906
- .getAuthenticationService ()
907
- .getAuthenticationProvider (originalAuthMethod );
912
+ originalAuthenticationProvider =
913
+ getBrokerService ().getAuthenticationService ().getAuthenticationProvider (originalAuthMethod );
908
914
909
- if (originalAuthenticationProvider == null ) {
910
- throw new AuthenticationException (
911
- String . format ( "Can't find AuthenticationProvider for original role"
912
- + " using auth method [%s] is not available" , originalAuthMethod ));
913
- }
915
+ if (originalAuthenticationProvider == null ) {
916
+ return CompletableFuture . failedFuture ( new AuthenticationException ( String . format (
917
+ "Can't find AuthenticationProvider for original role"
918
+ + " using auth method [%s] is not available" , originalAuthMethod ) ));
919
+ }
914
920
915
- originalAuthState = originalAuthenticationProvider .newAuthState (
916
- AuthData .of (connect .getOriginalAuthData ().getBytes ()),
917
- remoteAddress ,
918
- sslSession );
919
- originalAuthData = originalAuthState .getAuthDataSource ();
920
- originalPrincipal = originalAuthState .getAuthRole ();
921
+ try {
922
+ originalAuthState = originalAuthenticationProvider .newAuthState (
923
+ AuthData .of (connect .getOriginalAuthData ().getBytes ()), remoteAddress , sslSession );
924
+ originalAuthData = originalAuthState .getAuthDataSource ();
925
+ originalPrincipal = originalAuthState .getAuthRole ();
921
926
922
- if (log .isDebugEnabled ()) {
923
- log .debug ("[{}] Authenticate original role : {}" , remoteAddress , originalPrincipal );
924
- }
925
- } else {
926
- originalPrincipal = connect .hasOriginalPrincipal () ? connect .getOriginalPrincipal () : null ;
927
+ if (log .isDebugEnabled ()) {
928
+ log .debug ("[{}] Authenticate original role : {}" , remoteAddress , originalPrincipal );
929
+ }
930
+ } catch (AuthenticationException e ) {
931
+ return CompletableFuture .failedFuture (e );
932
+ }
933
+ } else {
934
+ originalPrincipal = connect .hasOriginalPrincipal () ? connect .getOriginalPrincipal () : null ;
927
935
928
- if (log .isDebugEnabled ()) {
929
- log .debug ("[{}] Authenticate original role (forwarded from proxy): {}" ,
930
- remoteAddress , originalPrincipal );
936
+ if (log .isDebugEnabled ()) {
937
+ log .debug ("[{}] Authenticate original role (forwarded from proxy): {}" , remoteAddress ,
938
+ originalPrincipal );
939
+ }
931
940
}
932
- }
941
+ return CompletableFuture .completedFuture (null );
942
+ }).exceptionally (e -> {
943
+ closeWithAuthException ("connect" , e );
944
+ return null ;
945
+ });
933
946
} catch (Exception e ) {
934
- service .getPulsarStats ().recordConnectionCreateFail ();
935
- logAuthException (remoteAddress , "connect" , getPrincipal (), Optional .empty (), e );
936
- String msg = "Unable to authenticate" ;
937
- ctx .writeAndFlush (Commands .newError (-1 , ServerError .AuthenticationError , msg ));
938
- close ();
947
+ closeWithAuthException ("connect" , e );
939
948
}
940
949
}
941
950
@@ -954,18 +963,14 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
954
963
try {
955
964
AuthData clientData = AuthData .of (authResponse .getResponse ().getAuthData ());
956
965
doAuthentication (clientData , authResponse .getProtocolVersion (),
957
- authResponse .hasClientVersion () ? authResponse .getClientVersion () : EMPTY );
958
- } catch ( AuthenticationException e ) {
959
- service . getPulsarStats (). recordConnectionCreateFail ();
960
- log . warn ( "[{}] Authentication failed: {} " , remoteAddress , e . getMessage () );
961
- ctx . writeAndFlush ( Commands . newError (- 1 , ServerError . AuthenticationError , e . getMessage ()));
962
- close ( );
966
+ authResponse .hasClientVersion () ? authResponse .getClientVersion () : EMPTY ). whenComplete (
967
+ ( __ , e ) -> {
968
+ if ( e != null ) {
969
+ closeWithAuthException ( "authResponse" , e );
970
+ }
971
+ } );
963
972
} catch (Exception e ) {
964
- service .getPulsarStats ().recordConnectionCreateFail ();
965
- String msg = "Unable to handleAuthResponse" ;
966
- log .warn ("[{}] {} " , remoteAddress , msg , e );
967
- ctx .writeAndFlush (Commands .newError (-1 , ServerError .UnknownError , msg ));
968
- close ();
973
+ closeWithAuthException ("authResponse" , e );
969
974
}
970
975
}
971
976
@@ -3075,11 +3080,18 @@ public String clientSourceAddress() {
3075
3080
}
3076
3081
}
3077
3082
3083
+ private void closeWithAuthException (String operation , Throwable e ) {
3084
+ service .getPulsarStats ().recordConnectionCreateFail ();
3085
+ logAuthException (remoteAddress , operation , getPrincipal (), Optional .empty (), e );
3086
+ ctx .writeAndFlush (Commands .newError (-1 , ServerError .AuthenticationError , e .getMessage ()));
3087
+ close ();
3088
+ }
3089
+
3078
3090
private static void logAuthException (SocketAddress remoteAddress , String operation ,
3079
3091
String principal , Optional <TopicName > topic , Throwable ex ) {
3080
3092
String topicString = topic .map (t -> ", topic=" + t .toString ()).orElse ("" );
3081
3093
if (ex instanceof AuthenticationException ) {
3082
- log .info ("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}" ,
3094
+ log .error ("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}" ,
3083
3095
remoteAddress , operation , principal , topicString , ex .getMessage ());
3084
3096
} else {
3085
3097
log .error ("[{}] Error trying to authenticate: operation={}, principal={}{}" ,
0 commit comments