diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index ff08908267..f5fd722f77 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -150,6 +150,7 @@ Elasticsearch index configuration | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | +| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL | | index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE | | index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE | | index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index 7479979761..e357fd20ad 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider { "Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " + "E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]); + public static final ConfigOption BULK_CHUNK_SIZE_LIMIT_BYTES = + new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes", + "The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " + + "chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000); + public static final int HOST_PORT_DEFAULT = 9200; /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java index bbfac23d6b..355bb7cc44 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException { long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT); Set errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)) .mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet()); + int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES); final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, - retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs); + retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes); if (config.has(ElasticSearchIndex.BULK_REFRESH)) { client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH)); } @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) { protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7, int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, - long retryMaxWaitMs) { + long retryMaxWaitMs, int bulkChunkSerializedLimit) { return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes, - retryInitialWaitMs, retryMaxWaitMs); + retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit); } /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 64e577724e..59fa0af314 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -16,11 +16,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties; import org.apache.tinkerpop.shaded.jackson.core.JsonParseException; +import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException; import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference; import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; @@ -39,16 +43,21 @@ import org.janusgraph.diskstorage.es.mapping.IndexMapping; import org.janusgraph.diskstorage.es.mapping.TypedIndexMappings; import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings; -import org.janusgraph.diskstorage.es.rest.RestBulkResponse.RestBulkItemResponse; import org.janusgraph.diskstorage.es.script.ESScriptResponse; +import org.javatuples.Pair; +import org.javatuples.Triplet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -121,9 +130,11 @@ public class RestElasticSearchClient implements ElasticSearchClient { private final long retryMaxWaitMs; + private final int bulkChunkSerializedLimitBytes; + public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7, int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, - long retryMaxWaitMs) { + long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) { this.delegate = delegate; majorVersion = getMajorVersion(); this.scrollKeepAlive = scrollKeepAlive+"s"; @@ -134,6 +145,7 @@ public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes); this.retryInitialWaitMs = retryInitialWaitMs; this.retryMaxWaitMs = retryMaxWaitMs; + this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes; } @Override @@ -241,7 +253,7 @@ public ESScriptResponse getStoredScript(String scriptName) throws IOException { final Response response = e.getResponse(); - if(e.getResponse().getStatusLine().getStatusCode() == 404){ + if(e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND){ ESScriptResponse esScriptResponse = new ESScriptResponse(); esScriptResponse.setFound(false); return esScriptResponse; @@ -380,10 +392,11 @@ public void clearStore(String indexName, String storeName) throws IOException { } } - @Override - public void bulkRequest(List requests, String ingestPipeline) throws IOException { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - for (final ElasticSearchMutation request : requests) { + private class RequestBytes { + final byte [] requestBytes; + final byte [] requestSource; + + private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { Map requestData = new HashMap<>(); if (useMappingTypes) { requestData.put("_index", request.getIndex()); @@ -398,15 +411,39 @@ public void bulkRequest(List requests, String ingestPipel requestData.put(retryOnConflictKey, retryOnConflict); } - outputStream.write(mapWriter.writeValueAsBytes( - ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData)) - ); - outputStream.write(NEW_LINE_BYTES); + this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData)); if (request.getSource() != null) { - outputStream.write(mapWriter.writeValueAsBytes(request.getSource())); + this.requestSource = mapWriter.writeValueAsBytes(request.getSource()); + } else { + this.requestSource = null; + } + } + + private int getSerializedSize() { + int serializedSize = this.requestBytes.length; + serializedSize+= 1; //For follow-up NEW_LINE_BYTES + if (this.requestSource != null) { + serializedSize += this.requestSource.length; + serializedSize+= 1; //For follow-up NEW_LINE_BYTES + } + return serializedSize; + } + + private void writeTo(OutputStream outputStream) throws IOException { + outputStream.write(this.requestBytes); + outputStream.write(NEW_LINE_BYTES); + if (this.requestSource != null) { + outputStream.write(requestSource); outputStream.write(NEW_LINE_BYTES); } } + } + + private Pair buildBulkRequestInput(List requests, String ingestPipeline) throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (final RequestBytes request : requests) { + request.writeTo(outputStream); + } final StringBuilder builder = new StringBuilder(); if (ingestPipeline != null) { @@ -416,17 +453,107 @@ public void bulkRequest(List requests, String ingestPipel APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh); } builder.insert(0, REQUEST_SEPARATOR + "_bulk"); + return Pair.with(builder.toString(), outputStream.toByteArray()); + } + + private List> pairErrorsWithSubmittedMutation( + //Bulk API is documented to return bulk item responses in the same order of submission + //https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body + //As such we only need to retry elements that failed + final List> bulkResponseItems, + final List submittedBulkRequestItems) { + final List> errors = new ArrayList<>(bulkResponseItems.size()); + for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) { + Collection bulkResponseItem = bulkResponseItems.get(itemIndex).values(); + if (bulkResponseItem.size() > 1) { + throw new IllegalStateException("There should only be a single item per bulk reponse item entry"); + } + RestBulkResponse.RestBulkItemResponse item = bulkResponseItem.iterator().next(); + if (item.getError() != null && item.getStatus() != HttpStatus.SC_NOT_FOUND) { + errors.add(Triplet.with(item.getError(), item.getStatus(), submittedBulkRequestItems.get(itemIndex))); + } + } + return errors; + } - final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray()); - try (final InputStream inputStream = response.getEntity().getContent()) { - final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class); - final List errors = bulkResponse.getItems().stream() - .flatMap(item -> item.values().stream()) - .filter(item -> item.getError() != null && item.getStatus() != 404) - .map(RestBulkItemResponse::getError).collect(Collectors.toList()); - if (!errors.isEmpty()) { - errors.forEach(error -> log.error("Failed to execute ES query: {}", error)); - throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors); + private class BulkRequestChunker implements Iterator> { + //By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under + //the specified limit + + //https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc + //There is no "correct" number of actions to perform in a single bulk request. Experiment with different + // settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum + // size of a HTTP request to 100mb by default + private final PeekingIterator requestIterator; + + private BulkRequestChunker(List requests) throws JsonProcessingException { + List serializedRequests = new ArrayList<>(requests.size()); + for (ElasticSearchMutation request : requests) { + serializedRequests.add(new RequestBytes(request)); + } + this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); + } + + @Override + public boolean hasNext() { + return requestIterator.hasNext(); + } + + @Override + public List next() { + List serializedRequests = new ArrayList<>(); + int chunkSerializedTotal = 0; + while (requestIterator.hasNext()) { + RequestBytes peeked = requestIterator.peek(); + int requestSerializedSize = peeked.getSerializedSize(); + if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { + chunkSerializedTotal += requestSerializedSize; + serializedRequests.add(requestIterator.next()); + } else if (requestSerializedSize > bulkChunkSerializedLimitBytes) { + //we've encountered an element we cannot send to Elasticsearch given the configured limit + throw new IllegalArgumentException(String.format( + "Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s", + bulkChunkSerializedLimitBytes, requestSerializedSize)); + } else { + //Adding this element would exceed the limit, so return the chunk + return serializedRequests; + } + } + //All remaining requests fit in this chunk + return serializedRequests; + } + } + + @Override + public void bulkRequest(final List requests, String ingestPipeline) throws IOException { + BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests); + while (bulkRequestChunker.hasNext()) { + List bulkRequestChunk = bulkRequestChunker.next(); + int retryCount = 0; + while (true) { + final Pair bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline); + final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1()); + try (final InputStream inputStream = response.getEntity().getContent()) { + final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class); + List> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk); + if (!bulkItemsThatFailed.isEmpty()) { + //Only retry the bulk request if *all* the bulk response item error codes are retry error codes + final Set errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet()); + if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) { + //Build up the next request batch, of only the failed mutations + bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList()); + performRetryWait(retryCount); + retryCount++; + } else { + final List errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList()); + errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error)); + throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems); + } + } else { + //The entire bulk request was successful, leave the loop + break; + } + } } } } @@ -571,19 +698,22 @@ private Response performRequestWithRetry(Request request) throws IOException { if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) { throw e; } - //Wait before trying again - long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs); - log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit); - try { - Thread.sleep(waitDurationMs); - } catch (InterruptedException interruptedException) { - throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException); - } + performRetryWait(retryCount); } retryCount++; } } + private void performRetryWait(int retryCount) { + long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs); + log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit); + try { + Thread.sleep(waitDurationMs); + } catch (InterruptedException interruptedException) { + throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException); + } + } + private Response performRequest(Request request, byte[] requestData) throws IOException { final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null; diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java index 8d1088f292..e2877e2a16 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticsearchJanusGraphIndexTest.java @@ -15,6 +15,7 @@ package org.janusgraph.diskstorage.es; import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.janusgraph.core.Cardinality; import org.janusgraph.core.JanusGraphTransaction; @@ -33,10 +34,12 @@ import org.janusgraph.graphdb.JanusGraphIndexTest; import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import java.time.Duration; +import java.util.stream.IntStream; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.FORCE_INDEX_USAGE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME; @@ -105,6 +108,57 @@ public void indexShouldNotExistAfterDeletion() throws Exception { assertFalse(esr.indexExists(expectedIndexName)); } + @Test + public void writingAnItemLargerThanPermittedChunkLimitFails() { + PropertyKey key = mgmt.makePropertyKey("some-field").dataType(Integer.class).make(); + mgmt.buildIndex("bulkTooLargeWriteTestIndex", Vertex.class).addKey(key).buildMixedIndex(INDEX); + mgmt.buildIndex("equalityLookupIndex", Vertex.class).addKey(key).buildCompositeIndex(); + mgmt.commit(); + + //Confirm we're able to successfully write initially + Vertex initiallyWrittenVertex = graph.traversal().addV().property(key.name(), 1).next(); + graph.tx().commit(); + + //Retrieve the vertex again based on the composite index and then mixed index that, confirming it's in both + Vertex initialVertexEqualityLookup = graph.traversal().V().has(key.name(), P.eq(1)).next(); + Vertex initialVertexRangeLookup = graph.traversal().V().has(key.name(), P.gt(0)).next(); + + Assertions.assertEquals(initiallyWrittenVertex.id(), initialVertexEqualityLookup.id(), + "Should have returned the same vertex"); + Assertions.assertEquals(initiallyWrittenVertex.id(), initialVertexRangeLookup.id(), + "Should have returned the same vertex"); + + //Now write a second vertex, but with a limit that prevents the mixed index write from succeeding + //Writes to mixed indices are "best effort", so a "successful" write that failed to write to a mixed index + //is still a success. However, lookups via the mixed index's predicates will now be blind to the vertex + clopen(option(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES, INDEX), 1); + Vertex secondWriteAttemptVertex = graph.traversal().addV().property(key.name(), 2).next(); + graph.tx().commit(); + Vertex secondVertexEqualityLookup = graph.traversal().V().has(key.name(), P.eq(2)).next(); + boolean secondVertexRangeLookup = graph.traversal().V().has(key.name(), P.gt(1)).hasNext(); + + Assertions.assertEquals(secondWriteAttemptVertex.id(), secondVertexEqualityLookup.id(), + "Should have returned the same vertex"); + Assertions.assertFalse(secondVertexRangeLookup, "The lookup for the second vertex using the mixed index " + + "predicate should have failed to find it due to a silent mutation failure to the mixed index due to the " + + "chunk size limit"); + } + + @Test + //Disabled to not slow down CI/CD builds given there's nothing to assert + //intended for manual observation of the bulk chunking to ElasticSearch + @Disabled + public void manuallyObserveBulkWritingChunking() { + clopen(option(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES, INDEX), 100); + PropertyKey key = mgmt.makePropertyKey("some-field").dataType(String.class).make(); + mgmt.buildIndex("testChunkingIndex", Vertex.class).addKey(key).buildMixedIndex(INDEX); + mgmt.commit(); + + //Write 10 vertices that will each individually be split up into their own chunks due to the limit configured + IntStream.range(0, 10).forEach(i -> graph.traversal().addV().property(key.name(), "foobar").toList()); + graph.tx().commit(); + } + @Override public boolean supportsLuceneStyleQueries() { return true; diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java new file mode 100644 index 0000000000..87f4cc1c3f --- /dev/null +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java @@ -0,0 +1,122 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.es.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.janusgraph.diskstorage.es.ElasticSearchMutation; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.IntStream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RestClientBulkRequestsTest { + @Mock + private RestClient restClientMock; + + @Mock + private Response response; + + @Mock + private StatusLine statusLine; + + @Captor + private ArgumentCaptor requestCaptor; + + RestElasticSearchClient createClient(int bulkChunkSerializedLimitBytes) throws IOException { + //Just throw an exception when there's an attempt to look up the ES version during instantiation + when(restClientMock.performRequest(any())).thenThrow(new IOException()); + + RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, + 0, Collections.emptySet(), 0, 0, bulkChunkSerializedLimitBytes); + //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test + Mockito.reset(restClientMock); + return clientUnderTest; + } + + @Test + public void testSplittingOfLargeBulkItems() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + when(statusLine.getStatusCode()).thenReturn(200); + + //In both cases return a "success" + RestBulkResponse singletonBulkItemResponseSuccess = new RestBulkResponse(); + singletonBulkItemResponseSuccess.setItems( + Collections.singletonList(Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()))); + byte [] singletonBulkItemResponseSuccessBytes = mapper.writeValueAsBytes(singletonBulkItemResponseSuccess); + HttpEntity singletonBulkItemHttpEntityMock = mock(HttpEntity.class); + when(singletonBulkItemHttpEntityMock.getContent()) + .thenReturn(new ByteArrayInputStream(singletonBulkItemResponseSuccessBytes)) + //Have to setup a second input stream because it will have been consumed by the first pass + .thenReturn(new ByteArrayInputStream(singletonBulkItemResponseSuccessBytes)); + when(response.getEntity()).thenReturn(singletonBulkItemHttpEntityMock); + when(response.getStatusLine()).thenReturn(statusLine); + + int bulkLimit = 800; + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())).thenReturn(response).thenReturn(response); + StringBuilder payloadBuilder = new StringBuilder(); + IntStream.range(0, bulkLimit - 100).forEach(value -> payloadBuilder.append("a")); + String largePayload = payloadBuilder.toString(); + restClientUnderTest.bulkRequest(Arrays.asList( + //There should be enough characters in the payload that they can't both be sent in a single bulk call + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1", + Collections.singletonMap("someKey", largePayload)), + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2", + Collections.singletonMap("someKey", largePayload)) + ), null); + //Verify that despite only calling bulkRequest() once, we had 2 calls to the underlying rest client's + //perform request (due to the mutations being split across 2 calls) + verify(restClientMock, times(2)).performRequest(requestCaptor.capture()); + } + } + + @Test + public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException { + int bulkLimit = 800; + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + StringBuilder payloadBuilder = new StringBuilder(); + //This payload is too large to send given the set limit, since it is a single item we can't split it + IntStream.range(0, bulkLimit * 10).forEach(value -> payloadBuilder.append("a")); + Assertions.assertThrows(IllegalArgumentException.class, () -> restClientUnderTest.bulkRequest( + Collections.singletonList( + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id", + Collections.singletonMap("someKey", payloadBuilder.toString())) + ), null), "Should have thrown due to bulk request item being too large"); + } + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java index f49f675c65..9c9e101977 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java @@ -14,12 +14,15 @@ package org.janusgraph.diskstorage.es.rest; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; +import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; +import org.janusgraph.diskstorage.es.ElasticSearchMutation; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,9 +32,13 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -62,12 +69,63 @@ RestElasticSearchClient createClient(int retryAttemptLimit, Set retryEr when(restClientMock.performRequest(any())).thenThrow(new IOException()); RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, - retryAttemptLimit, retryErrorCodes, 0, 0); + retryAttemptLimit, retryErrorCodes, 0, 0, 100_000_000); //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test Mockito.reset(restClientMock); return clientUnderTest; } + @Test + public void testRetryOfIndividuallyFailedBulkItems() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + int retryErrorCode = 429; + //A bulk response will still return a success despite an underlying item having an error + when(statusLine.getStatusCode()).thenReturn(200); + + //The initial bulk request will have one element that failed in its response + RestBulkResponse.RestBulkItemResponse initialRequestFailedItem = new RestBulkResponse.RestBulkItemResponse(); + initialRequestFailedItem.setError("An Error"); + initialRequestFailedItem.setStatus(retryErrorCode); + RestBulkResponse initialBulkResponse = new RestBulkResponse(); + initialBulkResponse.setItems( + Stream.of( + Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()), + Collections.singletonMap("index", initialRequestFailedItem), + Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()) + ).collect(Collectors.toList()) + ); + HttpEntity initialHttpEntityMock = mock(HttpEntity.class); + when(initialHttpEntityMock.getContent()).thenReturn(new ByteArrayInputStream( + mapper.writeValueAsBytes(initialBulkResponse))); + Response initialResponseMock = mock(Response.class); + when(initialResponseMock.getEntity()).thenReturn(initialHttpEntityMock); + when(initialResponseMock.getStatusLine()).thenReturn(statusLine); + + //The retry should then only have a single item that succeeded + RestBulkResponse retriedBulkResponse = new RestBulkResponse(); + retriedBulkResponse.setItems( + Collections.singletonList(Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()))); + HttpEntity retriedHttpEntityMock = mock(HttpEntity.class); + when(retriedHttpEntityMock.getContent()).thenReturn(new ByteArrayInputStream( + mapper.writeValueAsBytes(retriedBulkResponse))); + Response retriedResponseMock = mock(Response.class); + when(retriedResponseMock.getEntity()).thenReturn(retriedHttpEntityMock); + when(retriedResponseMock.getStatusLine()).thenReturn(statusLine); + + try (RestElasticSearchClient restClientUnderTest = createClient(1, Sets.newHashSet(retryErrorCode))) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())).thenReturn(initialResponseMock).thenReturn(retriedResponseMock); + restClientUnderTest.bulkRequest(Arrays.asList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id1"), + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id2"), + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id3") + ), null); + //Verify that despite only calling bulkRequest once, we had 2 calls to the underlying rest client's + //perform request (due to the retried failure) + verify(restClientMock, times(2)).performRequest(requestCaptor.capture()); + } + } + @Test public void testRetryOnConfiguredErrorStatus() throws IOException { Integer retryCode = 429; @@ -85,7 +143,9 @@ public void testRetryOnConfiguredErrorStatus() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(responseException) .thenThrow(expectedFinalException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception actualException) { Assertions.assertSame(expectedFinalException, actualException); @@ -113,7 +173,9 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException { .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception e) { Assertions.assertSame(responseException, e); @@ -132,7 +194,9 @@ public void testNonRetryErrorCodeException() throws IOException { //prime the restClientMock again after it's reset after creation when(restClientMock.performRequest(any())) .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) { Assertions.assertSame(responseException, e); @@ -146,7 +210,9 @@ public void testNonResponseExceptionErrorThrown() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(differentExceptionType); try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) { - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) { Assertions.assertSame(differentExceptionType, e); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java index 97c876ed7c..0f8d6fb17e 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java @@ -123,6 +123,9 @@ public class RestClientSetupTest { @Captor ArgumentCaptor> retryErrorCodesCaptor; + @Captor + ArgumentCaptor bulkChunkSerializedLimitCaptor; + @Spy private RestClientSetup restClientSetup = new RestClientSetup(); @@ -158,7 +161,7 @@ private ElasticSearchClient baseConfigTest(Map extraConfigValues when(restClientSetup).getRestClientBuilder(any()); doReturn(restElasticSearchClientMock).when(restClientSetup). getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); return restClientSetup.connect(config.restrictTo(INDEX_NAME)); } @@ -190,7 +193,7 @@ public void testConnectBasicHttpConfigurationSingleHost() throws Exception { assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -218,7 +221,7 @@ public void testConnectBasicHttpConfigurationMultiHost() throws Exception { verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), - retryMaxWaitCaptor.capture()); + retryMaxWaitCaptor.capture(), bulkChunkSerializedLimitCaptor.capture()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -238,6 +241,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)). put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)). put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429"). + put("index." + INDEX_NAME + ".elasticsearch.bulk-chunk-size-limit-bytes", "1000000"). build()); assertNotNull(hostsConfigured); @@ -250,7 +254,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), - retryMaxWaitCaptor.capture()); + retryMaxWaitCaptor.capture(), bulkChunkSerializedLimitCaptor.capture()); assertEquals(ES_SCROLL_KA, scrollKACaptor.getValue().intValue()); assertEquals(RETRY_LIMIT, @@ -261,6 +265,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { retryInitialWaitCaptor.getValue().longValue()); assertEquals(RETRY_MAX_WAIT, retryMaxWaitCaptor.getValue().longValue()); + assertEquals(1_000_000, bulkChunkSerializedLimitCaptor.getValue().intValue()); verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH)); verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT)); @@ -283,7 +288,7 @@ public void testConnectBasicHttpsConfigurationSingleHost() throws Exception { assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue());