|
22 | 22 | import java.nio.charset.StandardCharsets;
|
23 | 23 | import java.util.Arrays;
|
24 | 24 | import java.util.Set;
|
| 25 | +import java.util.concurrent.ExecutionException; |
| 26 | +import java.util.concurrent.TimeUnit; |
| 27 | +import java.util.concurrent.TimeoutException; |
25 | 28 | import javax.naming.AuthenticationException;
|
26 | 29 | import javax.security.sasl.SaslException;
|
27 | 30 | import javax.security.sasl.SaslServer;
|
@@ -79,8 +82,10 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
|
79 | 82 | }
|
80 | 83 |
|
81 | 84 | try {
|
82 |
| - final AuthenticationState authState = authenticationProvider.newAuthState( |
83 |
| - AuthData.of(saslAuth.getAuthData().getBytes(StandardCharsets.UTF_8)), null, null); |
| 85 | + final AuthData authData = AuthData.of(saslAuth.getAuthData().getBytes(StandardCharsets.UTF_8)); |
| 86 | + final AuthenticationState authState = authenticationProvider.newAuthState(authData, null, null); |
| 87 | + // TODO: Use the request.timeout.ms config from the Kafka client as the timeout |
| 88 | + authState.authenticateAsync(authData).get(10, TimeUnit.SECONDS); |
84 | 89 | final String role = authState.getAuthRole();
|
85 | 90 | if (StringUtils.isEmpty(role)) {
|
86 | 91 | throw new AuthenticationException("Role cannot be empty.");
|
@@ -109,7 +114,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
|
109 | 114 | }
|
110 | 115 | complete = true;
|
111 | 116 | return new byte[0];
|
112 |
| - } catch (AuthenticationException e) { |
| 117 | + } catch (AuthenticationException | ExecutionException | InterruptedException | TimeoutException e) { |
113 | 118 | throw new SaslException(e.getMessage());
|
114 | 119 | }
|
115 | 120 | }
|
|
0 commit comments