From 1314eb62115b3a4b7a92e5bf45727595c794ff07 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 22 Jun 2021 21:59:58 +1000 Subject: [PATCH] Move hashing on API key creation to crypto thread pool (#74165) The changes in #74106 make API keys cached on creation time. It helps avoid the expensive hashing operation on initial authentication when a request using the key hits the same node that creates the key. Since the more expensive hashing on authentication time is handled by a dedicated "crypto" thread pool (#58090), it is expected that usage of the "crypto" thread pool to be reduced. This PR moves the hashing on creation time to the "crypto" thread pool so that a similar (before #74106) usage level of "crypto" thread pool is maintained. It also has the benefit to avoid costly operations in the transport_worker thread, which is generally preferred. Relates: #74106 --- .../security/authc/ApiKeyIntegTests.java | 16 +++-- .../xpack/security/authc/ApiKeyService.java | 64 ++++++++++--------- .../security/authc/ApiKeyServiceTests.java | 24 ++++++- 3 files changed, 69 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java index 1ba91096a11fb..60f4042dc51f4 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java @@ -998,7 +998,7 @@ public void testDerivedKeys() throws ExecutionException, InterruptedException { assertApiKeyNotCreated(client,"key-5"); } - public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException, ExecutionException { + public void testCreationAndAuthenticationReturns429WhenThreadPoolIsSaturated() throws Exception { final String nodeName = randomFrom(internalCluster().getNodeNames()); final Settings settings = internalCluster().getInstance(Settings.class, nodeName); final int allocatedProcessors = EsExecutors.allocatedProcessors(settings); @@ -1059,9 +1059,17 @@ public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOExc final Request authRequest = new Request("GET", "_security/_authenticate"); authRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader( "Authorization", "ApiKey " + base64ApiKeyKeyValue).build()); - final ResponseException responseException = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest)); - assertThat(responseException.getMessage(), containsString("429 Too Many Requests")); - assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(429)); + final ResponseException e1 = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest)); + assertThat(e1.getMessage(), containsString("429 Too Many Requests")); + assertThat(e1.getResponse().getStatusLine().getStatusCode(), is(429)); + + final Request createApiKeyRequest = new Request("POST", "_security/api_key"); + createApiKeyRequest.setJsonEntity("{\"name\":\"key\"}"); + createApiKeyRequest.setOptions(createApiKeyRequest.getOptions().toBuilder() + .addHeader("Authorization", basicAuthHeaderValue(TEST_SUPERUSER, TEST_PASSWORD_SECURE_STRING))); + final ResponseException e2 = expectThrows(ResponseException.class, () -> restClient.performRequest(createApiKeyRequest)); + assertThat(e2.getMessage(), containsString("429 Too Many Requests")); + assertThat(e2.getResponse().getStatusLine().getStatusCode(), is(429)); } finally { blockingLatch.countDown(); if (lastTaskFuture != null) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java index 34b8e61c6b2f6..db0d96761f270 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java @@ -286,37 +286,41 @@ private void createApiKeyAndIndexIt(Authentication authentication, CreateApiKeyR Version.V_6_7_0); } - try (XContentBuilder builder = newDocument(apiKey, request.getName(), authentication, - roleDescriptorSet, created, expiration, - request.getRoleDescriptors(), version, request.getMetadata())) { - - final IndexRequest indexRequest = - client.prepareIndex(SECURITY_MAIN_ALIAS, SINGLE_MAPPING_NAME) - .setSource(builder) - .setRefreshPolicy(request.getRefreshPolicy()) - .request(); - final BulkRequest bulkRequest = toSingleItemBulkRequest(indexRequest); - - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> - executeAsyncWithOrigin(client, SECURITY_ORIGIN, BulkAction.INSTANCE, bulkRequest, - TransportSingleItemBulkWriteAction.wrapBulkResponse(ActionListener.wrap( - indexResponse -> { - final ListenableFuture listenableFuture = new ListenableFuture<>(); - listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey)); - apiKeyAuthCache.put(indexResponse.getId(), listenableFuture); - listener.onResponse( - new CreateApiKeyResponse(request.getName(), indexResponse.getId(), apiKey, expiration)); - }, - listener::onFailure)))); - } catch (IOException e) { - listener.onFailure(e); - } + computeHashForApiKey(apiKey, listener.delegateFailure((l, apiKeyHashChars) -> { + try (XContentBuilder builder = newDocument(apiKeyHashChars, request.getName(), authentication, + roleDescriptorSet, created, expiration, + request.getRoleDescriptors(), version, request.getMetadata())) { + + final IndexRequest indexRequest = + client.prepareIndex(SECURITY_MAIN_ALIAS, SINGLE_MAPPING_NAME) + .setSource(builder) + .setRefreshPolicy(request.getRefreshPolicy()) + .request(); + final BulkRequest bulkRequest = toSingleItemBulkRequest(indexRequest); + + securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + executeAsyncWithOrigin(client, SECURITY_ORIGIN, BulkAction.INSTANCE, bulkRequest, + TransportSingleItemBulkWriteAction.wrapBulkResponse(ActionListener.wrap( + indexResponse -> { + final ListenableFuture listenableFuture = new ListenableFuture<>(); + listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey)); + apiKeyAuthCache.put(indexResponse.getId(), listenableFuture); + listener.onResponse( + new CreateApiKeyResponse(request.getName(), indexResponse.getId(), apiKey, expiration)); + }, + listener::onFailure)))); + } catch (IOException e) { + listener.onFailure(e); + } finally { + Arrays.fill(apiKeyHashChars, (char) 0); + } + })); } /** * package-private for testing */ - XContentBuilder newDocument(SecureString apiKey, String name, Authentication authentication, Set userRoles, + XContentBuilder newDocument(char[] apiKeyHashChars, String name, Authentication authentication, Set userRoles, Instant created, Instant expiration, List keyRoles, Version version, @Nullable Map metadata) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); @@ -328,15 +332,13 @@ XContentBuilder newDocument(SecureString apiKey, String name, Authentication aut byte[] utf8Bytes = null; - final char[] keyHash = hasher.hash(apiKey); try { - utf8Bytes = CharArrays.toUtf8Bytes(keyHash); + utf8Bytes = CharArrays.toUtf8Bytes(apiKeyHashChars); builder.field("api_key_hash").utf8Value(utf8Bytes, 0, utf8Bytes.length); } finally { if (utf8Bytes != null) { Arrays.fill(utf8Bytes, (byte) 0); } - Arrays.fill(keyHash, (char) 0); } // Save role_descriptors @@ -772,6 +774,10 @@ public static boolean isApiKeyAuthentication(Authentication authentication) { } } + void computeHashForApiKey(SecureString apiKey, ActionListener listener) { + threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> hasher.hash(apiKey))); + } + // Protected instance method so this can be mocked protected void verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials, ActionListener listener) { threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java index 336c6459f14f4..6b473fa4599fa 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java @@ -399,7 +399,8 @@ Version.CURRENT, randomFrom(AuthenticationType.REALM, AuthenticationType.TOKEN, AuthenticationType.ANONYMOUS), Collections.emptyMap()); } final Map metadata = ApiKeyTests.randomMetadata(); - XContentBuilder docSource = service.newDocument(new SecureString(key.toCharArray()), "test", authentication, + XContentBuilder docSource = service.newDocument( + getFastStoredHashAlgoForTests().hash(new SecureString(key.toCharArray())),"test", authentication, Collections.singleton(SUPERUSER_ROLE_DESCRIPTOR), Instant.now(), Instant.now().plus(expiry), keyRoles, Version.CURRENT, metadata); if (invalidated) { @@ -976,6 +977,25 @@ public void testAuthWillTerminateIfHashingThreadPoolIsSaturated() throws IOExcep assertThat(authenticationResult.getMessage(), containsString("server is too busy to respond")); } + public void testCreationWillFailIfHashingThreadPoolIsSaturated() { + final EsRejectedExecutionException rejectedExecutionException = new EsRejectedExecutionException("rejected"); + final ExecutorService mockExecutorService = mock(ExecutorService.class); + when(threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME)).thenReturn(mockExecutorService); + Mockito.doAnswer(invocationOnMock -> { + final AbstractRunnable actionRunnable = (AbstractRunnable) invocationOnMock.getArguments()[0]; + actionRunnable.onRejection(rejectedExecutionException); + return null; + }).when(mockExecutorService).execute(any(Runnable.class)); + + final Authentication authentication = mock(Authentication.class); + final CreateApiKeyRequest createApiKeyRequest = new CreateApiKeyRequest(randomAlphaOfLengthBetween(3, 8), null, null); + ApiKeyService service = createApiKeyService(Settings.EMPTY); + final PlainActionFuture future = new PlainActionFuture<>(); + service.createApiKey(authentication, createApiKeyRequest, org.elasticsearch.core.Set.of(), future); + final EsRejectedExecutionException e = expectThrows(EsRejectedExecutionException.class, future::actionGet); + assertThat(e, is(rejectedExecutionException)); + } + public void testCachedApiKeyValidationWillNotBeBlockedByUnCachedApiKey() throws IOException, ExecutionException, InterruptedException { final String apiKey1 = randomAlphaOfLength(16); final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey1.toCharArray())); @@ -1130,7 +1150,7 @@ public static Authentication createApiKeyAuthentication(ApiKeyService apiKeyServ List keyRoles, Version version) throws Exception { XContentBuilder keyDocSource = apiKeyService.newDocument( - new SecureString(randomAlphaOfLength(16).toCharArray()), "test", authentication, + getFastStoredHashAlgoForTests().hash(new SecureString(randomAlphaOfLength(16).toCharArray())), "test", authentication, userRoles, Instant.now(), Instant.now().plus(Duration.ofSeconds(3600)), keyRoles, Version.CURRENT, randomBoolean() ? null : org.elasticsearch.core.Map.of( randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)));