From c30dd4e3d00e0249e020626d39c683ae6595be5f Mon Sep 17 00:00:00 2001 From: Allan Clements Date: Wed, 29 May 2024 22:31:38 -0500 Subject: [PATCH] Closes #4488 Retry bulk request if all errors are configured retry error codes Only retry bulk request items that failed Implemented bulk retry test & implemented splitting up a bulk request if large enough Added exception if a bulk request item is too large to send, even as a single item Use HttpStatus.SC_NOT_FOUND instead of 404 literal Added pseudo test to facilitate manual observation of chunking of bulk writes Added test asserting silent failure of write to ES due to chunk size limit causing the vertex to omitted from the ES mixed index Signed-off-by: Allan Clements --- docs/configs/janusgraph-cfg.md | 1 + .../diskstorage/es/ElasticSearchIndex.java | 5 + .../diskstorage/es/rest/RestClientSetup.java | 7 +- .../es/rest/RestElasticSearchClient.java | 190 +++++++++++++++--- .../es/ElasticsearchJanusGraphIndexTest.java | 54 +++++ .../es/rest/RestClientBulkRequestsTest.java | 122 +++++++++++ .../es/rest/RestClientRetryTest.java | 76 ++++++- .../es/rest/RestClientSetupTest.java | 15 +- 8 files changed, 427 insertions(+), 43 deletions(-) create mode 100644 janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index ff08908267..f5fd722f77 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -150,6 +150,7 @@ Elasticsearch index configuration | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | +| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL | | index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE | | index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE | | index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index 7479979761..e357fd20ad 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider { "Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " + "E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]); + public static final ConfigOption BULK_CHUNK_SIZE_LIMIT_BYTES = + new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes", + "The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " + + "chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000); + public static final int HOST_PORT_DEFAULT = 9200; /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java index bbfac23d6b..355bb7cc44 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException { long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT); Set errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)) .mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet()); + int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES); final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, - retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs); + retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes); if (config.has(ElasticSearchIndex.BULK_REFRESH)) { client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH)); } @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) { protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7, int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, - long retryMaxWaitMs) { + long retryMaxWaitMs, int bulkChunkSerializedLimit) { return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes, - retryInitialWaitMs, retryMaxWaitMs); + retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit); } /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 64e577724e..59fa0af314 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -16,11 +16,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties; import org.apache.tinkerpop.shaded.jackson.core.JsonParseException; +import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException; import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference; import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; @@ -39,16 +43,21 @@ import org.janusgraph.diskstorage.es.mapping.IndexMapping; import org.janusgraph.diskstorage.es.mapping.TypedIndexMappings; import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings; -import org.janusgraph.diskstorage.es.rest.RestBulkResponse.RestBulkItemResponse; import org.janusgraph.diskstorage.es.script.ESScriptResponse; +import org.javatuples.Pair; +import org.javatuples.Triplet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -121,9 +130,11 @@ public class RestElasticSearchClient implements ElasticSearchClient { private final long retryMaxWaitMs; + private final int bulkChunkSerializedLimitBytes; + public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7, int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, - long retryMaxWaitMs) { + long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) { this.delegate = delegate; majorVersion = getMajorVersion(); this.scrollKeepAlive = scrollKeepAlive+"s"; @@ -134,6 +145,7 @@ public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes); this.retryInitialWaitMs = retryInitialWaitMs; this.retryMaxWaitMs = retryMaxWaitMs; + this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes; } @Override @@ -241,7 +253,7 @@ public ESScriptResponse getStoredScript(String scriptName) throws IOException { final Response response = e.getResponse(); - if(e.getResponse().getStatusLine().getStatusCode() == 404){ + if(e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND){ ESScriptResponse esScriptResponse = new ESScriptResponse(); esScriptResponse.setFound(false); return esScriptResponse; @@ -380,10 +392,11 @@ public void clearStore(String indexName, String storeName) throws IOException { } } - @Override - public void bulkRequest(List requests, String ingestPipeline) throws IOException { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - for (final ElasticSearchMutation request : requests) { + private class RequestBytes { + final byte [] requestBytes; + final byte [] requestSource; + + private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { Map requestData = new HashMap<>(); if (useMappingTypes) { requestData.put("_index", request.getIndex()); @@ -398,15 +411,39 @@ public void bulkRequest(List requests, String ingestPipel requestData.put(retryOnConflictKey, retryOnConflict); } - outputStream.write(mapWriter.writeValueAsBytes( - ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData)) - ); - outputStream.write(NEW_LINE_BYTES); + this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData)); if (request.getSource() != null) { - outputStream.write(mapWriter.writeValueAsBytes(request.getSource())); + this.requestSource = mapWriter.writeValueAsBytes(request.getSource()); + } else { + this.requestSource = null; + } + } + + private int getSerializedSize() { + int serializedSize = this.requestBytes.length; + serializedSize+= 1; //For follow-up NEW_LINE_BYTES + if (this.requestSource != null) { + serializedSize += this.requestSource.length; + serializedSize+= 1; //For follow-up NEW_LINE_BYTES + } + return serializedSize; + } + + private void writeTo(OutputStream outputStream) throws IOException { + outputStream.write(this.requestBytes); + outputStream.write(NEW_LINE_BYTES); + if (this.requestSource != null) { + outputStream.write(requestSource); outputStream.write(NEW_LINE_BYTES); } } + } + + private Pair buildBulkRequestInput(List requests, String ingestPipeline) throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (final RequestBytes request : requests) { + request.writeTo(outputStream); + } final StringBuilder builder = new StringBuilder(); if (ingestPipeline != null) { @@ -416,17 +453,107 @@ public void bulkRequest(List requests, String ingestPipel APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh); } builder.insert(0, REQUEST_SEPARATOR + "_bulk"); + return Pair.with(builder.toString(), outputStream.toByteArray()); + } + + private List> pairErrorsWithSubmittedMutation( + //Bulk API is documented to return bulk item responses in the same order of submission + //https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body + //As such we only need to retry elements that failed + final List> bulkResponseItems, + final List submittedBulkRequestItems) { + final List> errors = new ArrayList<>(bulkResponseItems.size()); + for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) { + Collection bulkResponseItem = bulkResponseItems.get(itemIndex).values(); + if (bulkResponseItem.size() > 1) { + throw new IllegalStateException("There should only be a single item per bulk reponse item entry"); + } + RestBulkResponse.RestBulkItemResponse item = bulkResponseItem.iterator().next(); + if (item.getError() != null && item.getStatus() != HttpStatus.SC_NOT_FOUND) { + errors.add(Triplet.with(item.getError(), item.getStatus(), submittedBulkRequestItems.get(itemIndex))); + } + } + return errors; + } - final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray()); - try (final InputStream inputStream = response.getEntity().getContent()) { - final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class); - final List errors = bulkResponse.getItems().stream() - .flatMap(item -> item.values().stream()) - .filter(item -> item.getError() != null && item.getStatus() != 404) - .map(RestBulkItemResponse::getError).collect(Collectors.toList()); - if (!errors.isEmpty()) { - errors.forEach(error -> log.error("Failed to execute ES query: {}", error)); - throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors); + private class BulkRequestChunker implements Iterator> { + //By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under + //the specified limit + + //https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc + //There is no "correct" number of actions to perform in a single bulk request. Experiment with different + // settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum + // size of a HTTP request to 100mb by default + private final PeekingIterator requestIterator; + + private BulkRequestChunker(List requests) throws JsonProcessingException { + List serializedRequests = new ArrayList<>(requests.size()); + for (ElasticSearchMutation request : requests) { + serializedRequests.add(new RequestBytes(request)); + } + this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); + } + + @Override + public boolean hasNext() { + return requestIterator.hasNext(); + } + + @Override + public List next() { + List serializedRequests = new ArrayList<>(); + int chunkSerializedTotal = 0; + while (requestIterator.hasNext()) { + RequestBytes peeked = requestIterator.peek(); + int requestSerializedSize = peeked.getSerializedSize(); + if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { + chunkSerializedTotal += requestSerializedSize; + serializedRequests.add(requestIterator.next()); + } else if (requestSerializedSize > bulkChunkSerializedLimitBytes) { + //we've encountered an element we cannot send to Elasticsearch given the configured limit + throw new IllegalArgumentException(String.format( + "Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s", + bulkChunkSerializedLimitBytes, requestSerializedSize)); + } else { + //Adding this element would exceed the limit, so return the chunk + return serializedRequests; + } + } + //All remaining requests fit in this chunk + return serializedRequests; + } + } + + @Override + public void bulkRequest(final List requests, String ingestPipeline) throws IOException { + BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests); + while (bulkRequestChunker.hasNext()) { + List bulkRequestChunk = bulkRequestChunker.next(); + int retryCount = 0; + while (true) { + final Pair bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline); + final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1()); + try (final InputStream inputStream = response.getEntity().getContent()) { + final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class); + List> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk); + if (!bulkItemsThatFailed.isEmpty()) { + //Only retry the bulk request if *all* the bulk response item error codes are retry error codes + final Set errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet()); + if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) { + //Build up the next request batch, of only the failed mutations + bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList()); + performRetryWait(retryCount); + retryCount++; + } else { + final List errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList()); + errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error)); + throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems); + } + } else { + //The entire bulk request was successful, leave the loop + break; + } + } } } } @@ -571,19 +698,22 @@ private Response performRequestWithRetry(Request request) throws IOException { if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) { throw e; } - //Wait before trying again - long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs); - log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit); - try { - Thread.sleep(waitDurationMs); - } catch (InterruptedException interruptedException) { - throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException); - } + performRetryWait(retryCount); } retryCount++; } } + private void performRetryWait(int retryCount) { + long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs); + log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit); + try { + Thread.sleep(waitDurationMs); + } catch (InterruptedException interruptedException) { + throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException); + } + } + private Response performRequest(Request request, byte[] requestData) throws IOException { final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null; diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java index 8d1088f292..e2877e2a16 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java @@ -15,6 +15,7 @@ package org.janusgraph.diskstorage.es; import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.janusgraph.core.Cardinality; import org.janusgraph.core.JanusGraphTransaction; @@ -33,10 +34,12 @@ import org.janusgraph.graphdb.JanusGraphIndexTest; import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import java.time.Duration; +import java.util.stream.IntStream; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.FORCE_INDEX_USAGE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME; @@ -105,6 +108,57 @@ public void indexShouldNotExistAfterDeletion() throws Exception { assertFalse(esr.indexExists(expectedIndexName)); } + @Test + public void writingAnItemLargerThanPermittedChunkLimitFails() { + PropertyKey key = mgmt.makePropertyKey("some-field").dataType(Integer.class).make(); + mgmt.buildIndex("bulkTooLargeWriteTestIndex", Vertex.class).addKey(key).buildMixedIndex(INDEX); + mgmt.buildIndex("equalityLookupIndex", Vertex.class).addKey(key).buildCompositeIndex(); + mgmt.commit(); + + //Confirm we're able to successfully write initially + Vertex initiallyWrittenVertex = graph.traversal().addV().property(key.name(), 1).next(); + graph.tx().commit(); + + //Retrieve the vertex again based on the composite index and then mixed index that, confirming it's in both + Vertex initialVertexEqualityLookup = graph.traversal().V().has(key.name(), P.eq(1)).next(); + Vertex initialVertexRangeLookup = graph.traversal().V().has(key.name(), P.gt(0)).next(); + + Assertions.assertEquals(initiallyWrittenVertex.id(), initialVertexEqualityLookup.id(), + "Should have returned the same vertex"); + Assertions.assertEquals(initiallyWrittenVertex.id(), initialVertexRangeLookup.id(), + "Should have returned the same vertex"); + + //Now write a second vertex, but with a limit that prevents the mixed index write from succeeding + //Writes to mixed indices are "best effort", so a "successful" write that failed to write to a mixed index + //is still a success. However, lookups via the mixed index's predicates will now be blind to the vertex + clopen(option(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES, INDEX), 1); + Vertex secondWriteAttemptVertex = graph.traversal().addV().property(key.name(), 2).next(); + graph.tx().commit(); + Vertex secondVertexEqualityLookup = graph.traversal().V().has(key.name(), P.eq(2)).next(); + boolean secondVertexRangeLookup = graph.traversal().V().has(key.name(), P.gt(1)).hasNext(); + + Assertions.assertEquals(secondWriteAttemptVertex.id(), secondVertexEqualityLookup.id(), + "Should have returned the same vertex"); + Assertions.assertFalse(secondVertexRangeLookup, "The lookup for the second vertex using the mixed index " + + "predicate should have failed to find it due to a silent mutation failure to the mixed index due to the " + + "chunk size limit"); + } + + @Test + //Disabled to not slow down CI/CD builds given there's nothing to assert + //intended for manual observation of the bulk chunking to ElasticSearch + @Disabled + public void manuallyObserveBulkWritingChunking() { + clopen(option(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES, INDEX), 100); + PropertyKey key = mgmt.makePropertyKey("some-field").dataType(String.class).make(); + mgmt.buildIndex("testChunkingIndex", Vertex.class).addKey(key).buildMixedIndex(INDEX); + mgmt.commit(); + + //Write 10 vertices that will each individually be split up into their own chunks due to the limit configured + IntStream.range(0, 10).forEach(i -> graph.traversal().addV().property(key.name(), "foobar").toList()); + graph.tx().commit(); + } + @Override public boolean supportsLuceneStyleQueries() { return true; diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java new file mode 100644 index 0000000000..87f4cc1c3f --- /dev/null +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java @@ -0,0 +1,122 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.es.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.janusgraph.diskstorage.es.ElasticSearchMutation; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.IntStream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RestClientBulkRequestsTest { + @Mock + private RestClient restClientMock; + + @Mock + private Response response; + + @Mock + private StatusLine statusLine; + + @Captor + private ArgumentCaptor requestCaptor; + + RestElasticSearchClient createClient(int bulkChunkSerializedLimitBytes) throws IOException { + //Just throw an exception when there's an attempt to look up the ES version during instantiation + when(restClientMock.performRequest(any())).thenThrow(new IOException()); + + RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, + 0, Collections.emptySet(), 0, 0, bulkChunkSerializedLimitBytes); + //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test + Mockito.reset(restClientMock); + return clientUnderTest; + } + + @Test + public void testSplittingOfLargeBulkItems() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + when(statusLine.getStatusCode()).thenReturn(200); + + //In both cases return a "success" + RestBulkResponse singletonBulkItemResponseSuccess = new RestBulkResponse(); + singletonBulkItemResponseSuccess.setItems( + Collections.singletonList(Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()))); + byte [] singletonBulkItemResponseSuccessBytes = mapper.writeValueAsBytes(singletonBulkItemResponseSuccess); + HttpEntity singletonBulkItemHttpEntityMock = mock(HttpEntity.class); + when(singletonBulkItemHttpEntityMock.getContent()) + .thenReturn(new ByteArrayInputStream(singletonBulkItemResponseSuccessBytes)) + //Have to setup a second input stream because it will have been consumed by the first pass + .thenReturn(new ByteArrayInputStream(singletonBulkItemResponseSuccessBytes)); + when(response.getEntity()).thenReturn(singletonBulkItemHttpEntityMock); + when(response.getStatusLine()).thenReturn(statusLine); + + int bulkLimit = 800; + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())).thenReturn(response).thenReturn(response); + StringBuilder payloadBuilder = new StringBuilder(); + IntStream.range(0, bulkLimit - 100).forEach(value -> payloadBuilder.append("a")); + String largePayload = payloadBuilder.toString(); + restClientUnderTest.bulkRequest(Arrays.asList( + //There should be enough characters in the payload that they can't both be sent in a single bulk call + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1", + Collections.singletonMap("someKey", largePayload)), + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2", + Collections.singletonMap("someKey", largePayload)) + ), null); + //Verify that despite only calling bulkRequest() once, we had 2 calls to the underlying rest client's + //perform request (due to the mutations being split across 2 calls) + verify(restClientMock, times(2)).performRequest(requestCaptor.capture()); + } + } + + @Test + public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException { + int bulkLimit = 800; + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + StringBuilder payloadBuilder = new StringBuilder(); + //This payload is too large to send given the set limit, since it is a single item we can't split it + IntStream.range(0, bulkLimit * 10).forEach(value -> payloadBuilder.append("a")); + Assertions.assertThrows(IllegalArgumentException.class, () -> restClientUnderTest.bulkRequest( + Collections.singletonList( + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id", + Collections.singletonMap("someKey", payloadBuilder.toString())) + ), null), "Should have thrown due to bulk request item being too large"); + } + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java index f49f675c65..9c9e101977 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java @@ -14,12 +14,15 @@ package org.janusgraph.diskstorage.es.rest; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; +import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; +import org.janusgraph.diskstorage.es.ElasticSearchMutation; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,9 +32,13 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -62,12 +69,63 @@ RestElasticSearchClient createClient(int retryAttemptLimit, Set retryEr when(restClientMock.performRequest(any())).thenThrow(new IOException()); RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, - retryAttemptLimit, retryErrorCodes, 0, 0); + retryAttemptLimit, retryErrorCodes, 0, 0, 100_000_000); //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test Mockito.reset(restClientMock); return clientUnderTest; } + @Test + public void testRetryOfIndividuallyFailedBulkItems() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + int retryErrorCode = 429; + //A bulk response will still return a success despite an underlying item having an error + when(statusLine.getStatusCode()).thenReturn(200); + + //The initial bulk request will have one element that failed in its response + RestBulkResponse.RestBulkItemResponse initialRequestFailedItem = new RestBulkResponse.RestBulkItemResponse(); + initialRequestFailedItem.setError("An Error"); + initialRequestFailedItem.setStatus(retryErrorCode); + RestBulkResponse initialBulkResponse = new RestBulkResponse(); + initialBulkResponse.setItems( + Stream.of( + Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()), + Collections.singletonMap("index", initialRequestFailedItem), + Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()) + ).collect(Collectors.toList()) + ); + HttpEntity initialHttpEntityMock = mock(HttpEntity.class); + when(initialHttpEntityMock.getContent()).thenReturn(new ByteArrayInputStream( + mapper.writeValueAsBytes(initialBulkResponse))); + Response initialResponseMock = mock(Response.class); + when(initialResponseMock.getEntity()).thenReturn(initialHttpEntityMock); + when(initialResponseMock.getStatusLine()).thenReturn(statusLine); + + //The retry should then only have a single item that succeeded + RestBulkResponse retriedBulkResponse = new RestBulkResponse(); + retriedBulkResponse.setItems( + Collections.singletonList(Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()))); + HttpEntity retriedHttpEntityMock = mock(HttpEntity.class); + when(retriedHttpEntityMock.getContent()).thenReturn(new ByteArrayInputStream( + mapper.writeValueAsBytes(retriedBulkResponse))); + Response retriedResponseMock = mock(Response.class); + when(retriedResponseMock.getEntity()).thenReturn(retriedHttpEntityMock); + when(retriedResponseMock.getStatusLine()).thenReturn(statusLine); + + try (RestElasticSearchClient restClientUnderTest = createClient(1, Sets.newHashSet(retryErrorCode))) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())).thenReturn(initialResponseMock).thenReturn(retriedResponseMock); + restClientUnderTest.bulkRequest(Arrays.asList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id1"), + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id2"), + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id3") + ), null); + //Verify that despite only calling bulkRequest once, we had 2 calls to the underlying rest client's + //perform request (due to the retried failure) + verify(restClientMock, times(2)).performRequest(requestCaptor.capture()); + } + } + @Test public void testRetryOnConfiguredErrorStatus() throws IOException { Integer retryCode = 429; @@ -85,7 +143,9 @@ public void testRetryOnConfiguredErrorStatus() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(responseException) .thenThrow(expectedFinalException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception actualException) { Assertions.assertSame(expectedFinalException, actualException); @@ -113,7 +173,9 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException { .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception e) { Assertions.assertSame(responseException, e); @@ -132,7 +194,9 @@ public void testNonRetryErrorCodeException() throws IOException { //prime the restClientMock again after it's reset after creation when(restClientMock.performRequest(any())) .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) { Assertions.assertSame(responseException, e); @@ -146,7 +210,9 @@ public void testNonResponseExceptionErrorThrown() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(differentExceptionType); try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) { - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) { Assertions.assertSame(differentExceptionType, e); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java index 97c876ed7c..0f8d6fb17e 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java @@ -123,6 +123,9 @@ public class RestClientSetupTest { @Captor ArgumentCaptor> retryErrorCodesCaptor; + @Captor + ArgumentCaptor bulkChunkSerializedLimitCaptor; + @Spy private RestClientSetup restClientSetup = new RestClientSetup(); @@ -158,7 +161,7 @@ private ElasticSearchClient baseConfigTest(Map extraConfigValues when(restClientSetup).getRestClientBuilder(any()); doReturn(restElasticSearchClientMock).when(restClientSetup). getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); return restClientSetup.connect(config.restrictTo(INDEX_NAME)); } @@ -190,7 +193,7 @@ public void testConnectBasicHttpConfigurationSingleHost() throws Exception { assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -218,7 +221,7 @@ public void testConnectBasicHttpConfigurationMultiHost() throws Exception { verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), - retryMaxWaitCaptor.capture()); + retryMaxWaitCaptor.capture(), bulkChunkSerializedLimitCaptor.capture()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -238,6 +241,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)). put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)). put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429"). + put("index." + INDEX_NAME + ".elasticsearch.bulk-chunk-size-limit-bytes", "1000000"). build()); assertNotNull(hostsConfigured); @@ -250,7 +254,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), - retryMaxWaitCaptor.capture()); + retryMaxWaitCaptor.capture(), bulkChunkSerializedLimitCaptor.capture()); assertEquals(ES_SCROLL_KA, scrollKACaptor.getValue().intValue()); assertEquals(RETRY_LIMIT, @@ -261,6 +265,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { retryInitialWaitCaptor.getValue().longValue()); assertEquals(RETRY_MAX_WAIT, retryMaxWaitCaptor.getValue().longValue()); + assertEquals(1_000_000, bulkChunkSerializedLimitCaptor.getValue().intValue()); verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH)); verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT)); @@ -283,7 +288,7 @@ public void testConnectBasicHttpsConfigurationSingleHost() throws Exception { assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue());