Skip to content

Commit 7dcfd45

Browse files
authored
Improve threadpool usage and error handling for API key validation (#58090)
The PR introduces following two changes: Move API key validation into a new separate threadpool. The new threadpool is created separately with half of the available processors and 1000 in queue size. We could combine it with the existing TokenService's threadpool. Technically it is straightforward, but I am not sure whether it could be a rushed optimization since I am not clear about potential impact on the token service. On threadpoool saturation, it now fails with EsRejectedExecutionException which in turns gives back a 429, instead of 401 status code to users.
1 parent cda2f9b commit 7dcfd45

File tree

6 files changed

+241
-32
lines changed

6 files changed

+241
-32
lines changed

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.settings.SettingsFilter;
3636
import org.elasticsearch.common.util.BigArrays;
3737
import org.elasticsearch.common.util.PageCacheRecycler;
38+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3839
import org.elasticsearch.common.util.concurrent.ThreadContext;
3940
import org.elasticsearch.common.util.set.Sets;
4041
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -289,6 +290,8 @@
289290
public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin,
290291
DiscoveryPlugin, MapperPlugin, ExtensiblePlugin {
291292

293+
public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField.SECURITY + "-crypto";
294+
292295
private static final Logger logger = LogManager.getLogger(Security.class);
293296

294297
private final Settings settings;
@@ -1005,9 +1008,14 @@ public UnaryOperator<RestHandler> getRestHandlerWrapper(ThreadContext threadCont
10051008
@Override
10061009
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
10071010
if (enabled) {
1008-
return Collections.singletonList(
1009-
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool",
1010-
false));
1011+
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
1012+
return List.of(
1013+
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000,
1014+
"xpack.security.authc.token.thread_pool", false),
1015+
new FixedExecutorBuilder(settings, SECURITY_CRYPTO_THREAD_POOL_NAME,
1016+
(allocatedProcessors + 1) / 2, 1000,
1017+
"xpack.security.authc.api_key.thread_pool", false)
1018+
);
10111019
}
10121020
return Collections.emptyList();
10131021
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.ExceptionsHelper;
1515
import org.elasticsearch.Version;
1616
import org.elasticsearch.action.ActionListener;
17+
import org.elasticsearch.action.ActionRunnable;
1718
import org.elasticsearch.action.DocWriteResponse;
1819
import org.elasticsearch.action.bulk.BulkItemResponse;
1920
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -42,6 +43,7 @@
4243
import org.elasticsearch.common.settings.Setting.Property;
4344
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.unit.TimeValue;
46+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4547
import org.elasticsearch.common.util.concurrent.FutureUtils;
4648
import org.elasticsearch.common.util.concurrent.ListenableFuture;
4749
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -104,6 +106,7 @@
104106
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
105107
import static org.elasticsearch.xpack.core.security.authc.Authentication.AuthenticationType;
106108
import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
109+
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
107110

108111
public class ApiKeyService {
109112

@@ -328,14 +331,26 @@ private void loadApiKeyAndValidateCredentials(ThreadContext ctx, ApiKeyCredentia
328331
executeAsyncWithOrigin(ctx, SECURITY_ORIGIN, getRequest, ActionListener.<GetResponse>wrap(response -> {
329332
if (response.isExists()) {
330333
final Map<String, Object> source = response.getSource();
331-
validateApiKeyCredentials(docId, source, credentials, clock, listener);
334+
validateApiKeyCredentials(docId, source, credentials, clock, ActionListener.delegateResponse(listener, (l, e) -> {
335+
if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
336+
listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
337+
} else {
338+
listener.onFailure(e);
339+
}
340+
}));
332341
} else {
333342
listener.onResponse(
334343
AuthenticationResult.unsuccessful("unable to find apikey with id " + credentials.getId(), null));
335344
}
336345
},
337-
e -> listener.onResponse(AuthenticationResult.unsuccessful(
338-
"apikey authentication for id " + credentials.getId() + " encountered a failure", e))),
346+
e -> {
347+
if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
348+
listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
349+
} else {
350+
listener.onResponse(AuthenticationResult.unsuccessful(
351+
"apikey authentication for id " + credentials.getId() + " encountered a failure",e));
352+
}
353+
}),
339354
client::get);
340355
}
341356

@@ -468,23 +483,31 @@ void validateApiKeyCredentials(String docId, Map<String, Object> source, ApiKeyC
468483
}, listener::onFailure),
469484
threadPool.generic(), threadPool.getThreadContext());
470485
} else {
471-
final boolean verified = verifyKeyAgainstHash(apiKeyHash, credentials);
472-
listenableCacheEntry.onResponse(new CachedApiKeyHashResult(verified, credentials.getKey()));
473-
if (verified) {
474-
// move on
475-
validateApiKeyExpiration(source, credentials, clock, listener);
476-
} else {
477-
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
478-
}
486+
verifyKeyAgainstHash(apiKeyHash, credentials, ActionListener.wrap(
487+
verified -> {
488+
listenableCacheEntry.onResponse(new CachedApiKeyHashResult(verified, credentials.getKey()));
489+
if (verified) {
490+
// move on
491+
validateApiKeyExpiration(source, credentials, clock, listener);
492+
} else {
493+
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
494+
}
495+
}, listener::onFailure
496+
));
479497
}
480498
} else {
481-
final boolean verified = verifyKeyAgainstHash(apiKeyHash, credentials);
482-
if (verified) {
483-
// move on
484-
validateApiKeyExpiration(source, credentials, clock, listener);
485-
} else {
486-
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
487-
}
499+
verifyKeyAgainstHash(apiKeyHash, credentials, ActionListener.wrap(
500+
verified -> {
501+
if (verified) {
502+
// move on
503+
validateApiKeyExpiration(source, credentials, clock, listener);
504+
} else {
505+
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
506+
}
507+
},
508+
listener::onFailure
509+
));
510+
488511
}
489512
}
490513
}
@@ -552,14 +575,16 @@ static ApiKeyCredentials getCredentialsFromHeader(ThreadContext threadContext) {
552575
}
553576

554577
// Protected instance method so this can be mocked
555-
protected boolean verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials) {
556-
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
557-
try {
578+
protected void verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials, ActionListener<Boolean> listener) {
579+
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> {
558580
Hasher hasher = Hasher.resolveFromHash(apiKeyHash.toCharArray());
559-
return hasher.verify(credentials.getKey(), apiKeyHashChars);
560-
} finally {
561-
Arrays.fill(apiKeyHashChars, (char) 0);
562-
}
581+
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
582+
try {
583+
return hasher.verify(credentials.getKey(), apiKeyHashChars);
584+
} finally {
585+
Arrays.fill(apiKeyHashChars, (char) 0);
586+
}
587+
}));
563588
}
564589

565590
private Instant getApiKeyExpiration(Instant now, CreateApiKeyRequest request) {

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,25 @@
88

99
import org.elasticsearch.ElasticsearchSecurityException;
1010
import org.elasticsearch.action.DocWriteResponse;
11+
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
1112
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1213
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
1314
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1415
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.action.support.WriteRequest;
1617
import org.elasticsearch.action.update.UpdateResponse;
1718
import org.elasticsearch.client.Client;
19+
import org.elasticsearch.client.Request;
1820
import org.elasticsearch.client.RequestOptions;
21+
import org.elasticsearch.client.ResponseException;
22+
import org.elasticsearch.client.RestClient;
1923
import org.elasticsearch.client.RestHighLevelClient;
2024
import org.elasticsearch.client.security.AuthenticateResponse;
2125
import org.elasticsearch.common.Strings;
2226
import org.elasticsearch.common.collect.Tuple;
2327
import org.elasticsearch.common.settings.Settings;
2428
import org.elasticsearch.common.unit.TimeValue;
29+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2530
import org.elasticsearch.common.util.set.Sets;
2631
import org.elasticsearch.rest.RestStatus;
2732
import org.elasticsearch.test.SecurityIntegTestCase;
@@ -47,6 +52,7 @@
4752
import org.junit.After;
4853
import org.junit.Before;
4954

55+
import java.io.IOException;
5056
import java.nio.charset.StandardCharsets;
5157
import java.time.Instant;
5258
import java.time.temporal.ChronoUnit;
@@ -57,12 +63,16 @@
5763
import java.util.List;
5864
import java.util.Map;
5965
import java.util.Set;
66+
import java.util.concurrent.CountDownLatch;
6067
import java.util.concurrent.ExecutionException;
68+
import java.util.concurrent.ExecutorService;
6169
import java.util.concurrent.TimeUnit;
6270
import java.util.stream.Collectors;
71+
import java.util.stream.IntStream;
6372
import java.util.stream.Stream;
6473

6574
import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
75+
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
6676
import static org.hamcrest.Matchers.containsInAnyOrder;
6777
import static org.hamcrest.Matchers.containsString;
6878
import static org.hamcrest.Matchers.equalTo;
@@ -823,6 +833,63 @@ public void testDerivedKeys() throws ExecutionException, InterruptedException {
823833
assertApiKeyNotCreated(client, "key-5");
824834
}
825835

836+
public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException {
837+
final String nodeName = randomFrom(internalCluster().getNodeNames());
838+
final Settings settings = internalCluster().getInstance(Settings.class, nodeName);
839+
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
840+
841+
final RoleDescriptor descriptor = new RoleDescriptor("auth_only", new String[] { }, null, null);
842+
final Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
843+
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
844+
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
845+
final CreateApiKeyResponse createApiKeyResponse = new CreateApiKeyRequestBuilder(client)
846+
.setName("auth only key")
847+
.setRoleDescriptors(Collections.singletonList(descriptor))
848+
.get();
849+
850+
assertNotNull(createApiKeyResponse.getId());
851+
assertNotNull(createApiKeyResponse.getKey());
852+
853+
final List<NodeInfo> nodeInfos = client().admin().cluster().prepareNodesInfo().get().getNodes().stream()
854+
.filter(nodeInfo -> nodeInfo.getNode().getName().equals(nodeName))
855+
.collect(Collectors.toList());
856+
857+
final ExecutorService executorService = threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME);
858+
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
859+
final int numberOfThreads = (allocatedProcessors + 1) / 2;
860+
final CountDownLatch blockingLatch = new CountDownLatch(1);
861+
final CountDownLatch readyLatch = new CountDownLatch(numberOfThreads);
862+
863+
for (int i = 0; i < numberOfThreads; i++) {
864+
executorService.submit(() -> {
865+
readyLatch.countDown();
866+
try {
867+
blockingLatch.await();
868+
} catch (InterruptedException e) {
869+
throw new RuntimeException(e);
870+
}
871+
});
872+
}
873+
// Fill the whole queue for the crypto thread pool
874+
final int queueSize = 1000;
875+
IntStream.range(0, queueSize).forEach(i -> executorService.submit(() -> {}));
876+
readyLatch.await();
877+
878+
try (RestClient restClient = createRestClient(nodeInfos, null, "http")) {
879+
final String base64ApiKeyKeyValue = Base64.getEncoder().encodeToString(
880+
(createApiKeyResponse.getId() + ":" + createApiKeyResponse.getKey().toString()).getBytes(StandardCharsets.UTF_8));
881+
882+
final Request authRequest = new Request("GET", "_security/_authenticate");
883+
authRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader(
884+
"Authorization", "ApiKey " + base64ApiKeyKeyValue).build());
885+
final ResponseException responseException = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest));
886+
assertThat(responseException.getMessage(), containsString("429 Too Many Requests"));
887+
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(429));
888+
} finally {
889+
blockingLatch.countDown();
890+
}
891+
}
892+
826893
private void assertApiKeyNotCreated(Client client, String keyName) throws ExecutionException, InterruptedException {
827894
new RefreshRequestBuilder(client, RefreshAction.INSTANCE).setIndices(SECURITY_MAIN_ALIAS).execute().get();
828895
assertEquals(0, client.execute(GetApiKeyAction.INSTANCE,

0 commit comments

Comments
 (0)