Skip to content

Commit

Permalink
Closes #4488
Browse files Browse the repository at this point in the history
Retry bulk request if all errors are configured retry error codes

Only retry bulk request items that failed

Implemented bulk retry test & implemented splitting up a bulk request if large enough

Added exception if a bulk request item is too large to send, even as a single item

Use HttpStatus.SC_NOT_FOUND instead of 404 literal

Added pseudo test to facilitate manual observation of chunking of bulk writes

Added test asserting silent failure of write to ES due to chunk size limit causing the vertex to omitted from the ES mixed index

Signed-off-by: Allan Clements <criminosis@gmail.com>
  • Loading branch information
criminosis authored and porunov committed Jun 24, 2024
1 parent 55880ea commit d255396
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 43 deletions.
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider {
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimit) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties;
import org.apache.tinkerpop.shaded.jackson.core.JsonParseException;
import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException;
import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
Expand All @@ -39,16 +43,21 @@
import org.janusgraph.diskstorage.es.mapping.IndexMapping;
import org.janusgraph.diskstorage.es.mapping.TypedIndexMappings;
import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings;
import org.janusgraph.diskstorage.es.rest.RestBulkResponse.RestBulkItemResponse;
import org.janusgraph.diskstorage.es.script.ESScriptResponse;
import org.javatuples.Pair;
import org.javatuples.Triplet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -121,9 +130,11 @@ public class RestElasticSearchClient implements ElasticSearchClient {

private final long retryMaxWaitMs;

private final int bulkChunkSerializedLimitBytes;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
Expand All @@ -134,6 +145,7 @@ public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes;
}

@Override
Expand Down Expand Up @@ -241,7 +253,7 @@ public ESScriptResponse getStoredScript(String scriptName) throws IOException {

final Response response = e.getResponse();

if(e.getResponse().getStatusLine().getStatusCode() == 404){
if(e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND){
ESScriptResponse esScriptResponse = new ESScriptResponse();
esScriptResponse.setFound(false);
return esScriptResponse;
Expand Down Expand Up @@ -380,10 +392,11 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

@Override
public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final ElasticSearchMutation request : requests) {
private class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -398,15 +411,39 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel
requestData.put(retryOnConflictKey, retryOnConflict);
}

outputStream.write(mapWriter.writeValueAsBytes(
ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData))
);
outputStream.write(NEW_LINE_BYTES);
this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData));
if (request.getSource() != null) {
outputStream.write(mapWriter.writeValueAsBytes(request.getSource()));
this.requestSource = mapWriter.writeValueAsBytes(request.getSource());
} else {
this.requestSource = null;
}
}

private int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
}
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
if (ingestPipeline != null) {
Expand All @@ -416,17 +453,107 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
//Bulk API is documented to return bulk item responses in the same order of submission
//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body
//As such we only need to retry elements that failed
final List<Map<String, RestBulkResponse.RestBulkItemResponse>> bulkResponseItems,
final List<RequestBytes> submittedBulkRequestItems) {
final List<Triplet<Object, Integer, RequestBytes>> errors = new ArrayList<>(bulkResponseItems.size());
for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) {
Collection<RestBulkResponse.RestBulkItemResponse> bulkResponseItem = bulkResponseItems.get(itemIndex).values();
if (bulkResponseItem.size() > 1) {
throw new IllegalStateException("There should only be a single item per bulk reponse item entry");
}
RestBulkResponse.RestBulkItemResponse item = bulkResponseItem.iterator().next();
if (item.getError() != null && item.getStatus() != HttpStatus.SC_NOT_FOUND) {
errors.add(Triplet.with(item.getError(), item.getStatus(), submittedBulkRequestItems.get(itemIndex)));
}
}
return errors;
}

final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
final List<Object> errors = bulkResponse.getItems().stream()
.flatMap(item -> item.values().stream())
.filter(item -> item.getError() != null && item.getStatus() != 404)
.map(RestBulkItemResponse::getError).collect(Collectors.toList());
if (!errors.isEmpty()) {
errors.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors);
private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
int requestSerializedSize = peeked.getSerializedSize();
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//All remaining requests fit in this chunk
return serializedRequests;
}
}

@Override
public void bulkRequest(final List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests);
while (bulkRequestChunker.hasNext()) {
List<RequestBytes> bulkRequestChunk = bulkRequestChunker.next();
int retryCount = 0;
while (true) {
final Pair<String, byte[]> bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline);
final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
List<Triplet<Object, Integer, RequestBytes>> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk);
if (!bulkItemsThatFailed.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
//Build up the next request batch, of only the failed mutations
bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList());
performRetryWait(retryCount);
retryCount++;
} else {
final List<Object> errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList());
errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems);
}
} else {
//The entire bulk request was successful, leave the loop
break;
}
}
}
}
}
Expand Down Expand Up @@ -571,19 +698,22 @@ private Response performRequestWithRetry(Request request) throws IOException {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
performRetryWait(retryCount);
}
retryCount++;
}
}

private void performRetryWait(int retryCount) {
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;
Expand Down
Loading

1 comment on commit d255396

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: d255396 Previous: d35fc64 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 13037.066610559546 ms/op 12468.444225291154 ms/op 1.05
org.janusgraph.GraphCentricQueryBenchmark.getVertices 904.8334929847939 ms/op 918.3423996712321 ms/op 0.99
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 216.15113134855073 ms/op 216.65935547028985 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 339.15988539023806 ms/op 336.6219472083333 ms/op 1.01
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 211.23345408465195 ms/op 212.17318113485976 ms/op 1.00
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4931.693804153965 ms/op 4934.862177070978 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16798.592706644642 ms/op 17119.84681319412 ms/op 0.98
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 20120.169043790705 ms/op 20987.51313056889 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57363.86306676666 ms/op 57050.64306146667 ms/op 1.01
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1518.2833554927352 ms/op 1559.969298241005 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8221.340985996741 ms/op 8318.72392928847 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 382.37273965120175 ms/op 392.59846986303853 ms/op 0.97
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4210.67255364639 ms/op 4240.712364445486 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getNames 8305.742310268175 ms/op 8406.331584402715 ms/op 0.99
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5909.118475708231 ms/op 5987.64859878139 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getLabels 7039.962671466222 ms/op 7290.028061798635 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 437.4498647372582 ms/op 435.76237709334333 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12372.71565066 ms/op 12783.126217693469 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 358.7801587838268 ms/op 363.9668594437082 ms/op 0.99
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14304.014393286765 ms/op 15378.720663330263 ms/op 0.93
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 250.5548471669961 ms/op 253.97183167148802 ms/op 0.99
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 15266.85787542638 ms/op 14617.308247215036 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8292.784849794427 ms/op 8414.490379754512 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 9085.570613425429 ms/op 9340.049792997386 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8611.76857230251 ms/op 8726.780923948296 ms/op 0.99

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.