Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry bulk request if all errors are configured retry error codes & Split large bulk requests #4489

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider {
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

Comment on lines +327 to +331
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to add explanation with the link to let users know why the default size is 100MB.
I.e. could be something like this:

Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/8.14/modules-network.html).

Also, I think it would be great to tell users that in case chunk size is bigger then the specified here - it will silently be skipped which may result in permanent inconsistency between the storage backend and the mixed index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
        new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
            "The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
                "chunked to this size. Ensure that this limit is always less than or equal to the configured limit of " +
                "`http.max_content_length` on the Elasticsearch servers. For more information, refer to the " +
                "[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).",
            ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

Got it ready for the follow-up👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Would you be able to also add here or somewhere in the documentation description of what happens when part of the request produces a chunk which is grater than the one specified here?
I think this information will let users know that it's dangerous to reduce this chunk size to small sizes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added If a single bulk item exceeds this limit an exception will be thrown after the smaller bulk items are submitted. to the follow-up PR 👍 .

Figured we can continue the conversation of any further tweaking over there.

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimit) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties;
import org.apache.tinkerpop.shaded.jackson.core.JsonParseException;
import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException;
import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
Expand All @@ -39,16 +43,21 @@
import org.janusgraph.diskstorage.es.mapping.IndexMapping;
import org.janusgraph.diskstorage.es.mapping.TypedIndexMappings;
import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings;
import org.janusgraph.diskstorage.es.rest.RestBulkResponse.RestBulkItemResponse;
import org.janusgraph.diskstorage.es.script.ESScriptResponse;
import org.javatuples.Pair;
import org.javatuples.Triplet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -121,9 +130,11 @@

private final long retryMaxWaitMs;

private final int bulkChunkSerializedLimitBytes;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
Expand All @@ -134,6 +145,7 @@
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes;
}

@Override
Expand Down Expand Up @@ -241,7 +253,7 @@

final Response response = e.getResponse();

if(e.getResponse().getStatusLine().getStatusCode() == 404){
if(e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND){
ESScriptResponse esScriptResponse = new ESScriptResponse();
esScriptResponse.setFound(false);
return esScriptResponse;
Expand Down Expand Up @@ -380,10 +392,11 @@
}
}

@Override
public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final ElasticSearchMutation request : requests) {
private class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -398,15 +411,39 @@
requestData.put(retryOnConflictKey, retryOnConflict);
}

outputStream.write(mapWriter.writeValueAsBytes(
ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData))
);
outputStream.write(NEW_LINE_BYTES);
this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData));
if (request.getSource() != null) {
outputStream.write(mapWriter.writeValueAsBytes(request.getSource()));
this.requestSource = mapWriter.writeValueAsBytes(request.getSource());
} else {
this.requestSource = null;
}
}

private int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
}
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
if (ingestPipeline != null) {
Expand All @@ -416,17 +453,107 @@
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your PR, but it seems it makes sense to move this line up (to be the next line after StringBuilder is created) to avoid array shifting operation. I.e. Just:

final StringBuilder builder = new StringBuilder();
builder.append(REQUEST_SEPARATOR).append("_bulk");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into this this for my follow-up, but it seems like this would change behavior?

private static final Function<StringBuilder, StringBuilder> APPEND_OP = sb -> sb.append(sb.length() == 0 ? REQUEST_PARAM_BEGINNING : REQUEST_PARAM_SEPARATOR);

APPEND_OP either prefixes a ? or a & if the builder is empty. So as currently written I believe it's producing paths like /_bulk?pipeline=foo&refresh=true.

If we put builder.append(REQUEST_SEPARATOR).append("_bulk"); further up, I think it'd produce paths like /_bulk&pipeline=foo&refresh=true which wouldn't be properly formatting the query parameters anymore.

How does this look if we just use the string builder for the query parameters, and then just directly build the path? I originally used another StringBuilder for the path variable, but IntelliJ's linter suggested just doing String concatenations.

final StringBuilder bulkRequestQueryParameters = new StringBuilder();
        if (ingestPipeline != null) {
            APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline);
        }
        if (bulkRefreshEnabled) {
            APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
        }
        final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
        return Pair.with(bulkRequestPath, outputStream.toByteArray());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I missed that! I'm OK with leaving it as it is right now or updating it to the way you are suggesting. Both ways are OK by me.
btw. Usually final String concatenation is directly compiled to reuse StringBuilder by all modern Java compilers, so I believe that's why IntelliJ's suggested just using String concatenation here.
Anyways, the shift is probably too small to actually produce any meaningful performance differences, so I'm OK with either way as you prefer.

return Pair.with(builder.toString(), outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
//Bulk API is documented to return bulk item responses in the same order of submission
//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body
//As such we only need to retry elements that failed
final List<Map<String, RestBulkResponse.RestBulkItemResponse>> bulkResponseItems,
final List<RequestBytes> submittedBulkRequestItems) {
final List<Triplet<Object, Integer, RequestBytes>> errors = new ArrayList<>(bulkResponseItems.size());
for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) {
Collection<RestBulkResponse.RestBulkItemResponse> bulkResponseItem = bulkResponseItems.get(itemIndex).values();
if (bulkResponseItem.size() > 1) {
throw new IllegalStateException("There should only be a single item per bulk reponse item entry");

Check warning on line 469 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L469

Added line #L469 was not covered by tests
}
RestBulkResponse.RestBulkItemResponse item = bulkResponseItem.iterator().next();
if (item.getError() != null && item.getStatus() != HttpStatus.SC_NOT_FOUND) {
errors.add(Triplet.with(item.getError(), item.getStatus(), submittedBulkRequestItems.get(itemIndex)));
}
}
return errors;
}

final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
final List<Object> errors = bulkResponse.getItems().stream()
.flatMap(item -> item.values().stream())
.filter(item -> item.getError() != null && item.getStatus() != 404)
.map(RestBulkItemResponse::getError).collect(Collectors.toList());
if (!errors.isEmpty()) {
errors.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors);
private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
int requestSerializedSize = peeked.getSerializedSize();
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//All remaining requests fit in this chunk
return serializedRequests;
}
}

@Override
public void bulkRequest(final List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests);
while (bulkRequestChunker.hasNext()) {
List<RequestBytes> bulkRequestChunk = bulkRequestChunker.next();
int retryCount = 0;
while (true) {
final Pair<String, byte[]> bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline);
final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
List<Triplet<Object, Integer, RequestBytes>> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk);
if (!bulkItemsThatFailed.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
//Build up the next request batch, of only the failed mutations
bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList());
performRetryWait(retryCount);
retryCount++;
} else {
final List<Object> errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList());
errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error));
li-boxuan marked this conversation as resolved.
Show resolved Hide resolved
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems);
}
} else {
//The entire bulk request was successful, leave the loop
break;
}
}
}
}
}
Expand Down Expand Up @@ -571,19 +698,22 @@
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
performRetryWait(retryCount);
}
retryCount++;
}
}

private void performRetryWait(int retryCount) {
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);

Check warning on line 713 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L712-L713

Added lines #L712 - L713 were not covered by tests
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;
Expand Down
Loading
Loading