-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry bulk request if all errors are configured retry error codes & Split large bulk requests #4489
Conversation
Had an idea for an improvement this morning to only retry the bulk items that returned an error. This is predicated upon the Bulk API docs stating the response's returned bulk item order is the same as the submission order.
Wasn't sure if that would be preferable though, so I left it as an unsquashed addition to the PR if it'd be preferable to revert it. If it looks good happy to squash it down to true up the PR for merging. |
Added a test for bulk retry in 81b07ac. While I was working on that though I noticed on the ES docs state there is a size limit for bulk requests:
I've bumped into this in my JG deployment, ES pushes back saying the Payload is too large. I didn't have a meaningful knob to turn from what I could tell so I opted to work around the issue by more aggressively splitting up calls on my JG client side. So since I was already working in the bulk request logic I implemented a simple request chunker. If a bulk request has a payload that is too large for a single call, it'll split it up into multiple bulk request calls. Like last time keeping things unsquished for discussion if there is not a desire for the change. |
@li-boxuan / @porunov might interest you since you reviewed the last one? I accidentally left the job half finished. Didn't realize bulk request errors were nested into the item level. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! This looks good to me in general! I wonder if you could add an integration test to ElasticsearchJanusGraphIndexTest
that demonstrates this chunking behavior? You could set BULK_CHUNK_SIZE_LIMIT_BYTES to different numbers to test different scenario. I would imagine the following:
- A very small
BULK_CHUNK_SIZE_LIMIT_BYTES
that would fail the request - demonstrating thewe've encountered an element we cannot send to Elasticsearch given the configured limit
scenario - A relatively small
BULK_CHUNK_SIZE_LIMIT_BYTES
that would make the request split into chunks. We probably have no way to enforce that in the test (we lack observability) but I guess we could at least use IDE debugger to verify that manually.
janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java
Outdated
Show resolved
Hide resolved
janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java
Show resolved
Hide resolved
@li-boxuan I don't think it's possible to write such a test because JanusGraph doesn't treat such a scenario as a failure in code or in documentation.
So the response from the traversal comes back as a success. I guess I could assert that a query that uses the mixed index stored in ES fails to return the vertex because now that index is blind to the property...but that feels like a weird thing to test for, albeit it is congruent with intended behavior. I've bumped into this asymmetric situation when I can query using an equality predicate (thus uses the data backend's composite index) returns the expected vertex, but because the write to the mixed index failed (successfully) if you use like a range predicate over the same value, it won't find the vertex.
Took a stab at this in af88b6a, set it as |
af88b6a
to
dfb6e53
Compare
@li-boxuan sounded like there wasn't anything objectionable with the additions, so I went ahead and squashed & signed off on the commit so the PR was back in a merge ready state again pending further feedback. |
A much more cumbersome approach is to read the transaction recovery log... but I won't set this as acceptance criteria. I do think what you mentioned here seems promising. Doesn't seem weird to me :) we probably already have tests doing something similar. |
Retry bulk request if all errors are configured retry error codes Only retry bulk request items that failed Implemented bulk retry test & implemented splitting up a bulk request if large enough Added exception if a bulk request item is too large to send, even as a single item Use HttpStatus.SC_NOT_FOUND instead of 404 literal Added pseudo test to facilitate manual observation of chunking of bulk writes Added test asserting silent failure of write to ES due to chunk size limit causing the vertex to omitted from the ES mixed index Signed-off-by: Allan Clements <criminosis@gmail.com>
dfb6e53
to
c30dd4e
Compare
@li-boxuan wrote An alternative idea I had was rather than abort the bulk write at the time that is observed would be to maybe still write the other elements that aren't in excess of the limit. Get in at least what could go in rather than have that data be lost too. In other words, given a batch of mutations of an exceptionally large mutation followed by some number of small ones, fail the first, but don't throw an exception until later. |
i like that idea! Do you wanna open a follow-up PR for that 😉? Maybe a config to control ;) |
Yeah, that'd probably be best done as a followup PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for continuously investing in this field!
Happy to @li-boxuan. JanusGraph has been a critical piece to what I'm working on, hence why I've been bumping into these sharp edges 😅 . Looking forward to getting a nightly build into my environment and put this change through its paces after whenever @porunov has time to give their review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @criminosis for this great contribution and sorry for the late review.
This PR looks great to me. I have added small nitpicks, but I think it's OK to fix them in follow up PRs.
public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES = | ||
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes", | ||
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " + | ||
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to add explanation with the link to let users know why the default size is 100MB.
I.e. could be something like this:
Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/8.14/modules-network.html).
Also, I think it would be great to tell users that in case chunk size is bigger then the specified here - it will silently be skipped which may result in permanent inconsistency between the storage backend and the mixed index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size. Ensure that this limit is always less than or equal to the configured limit of " +
"`http.max_content_length` on the Elasticsearch servers. For more information, refer to the " +
"[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).",
ConfigOption.Type.LOCAL, Integer.class, 100_000_000);
Got it ready for the follow-up👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Would you be able to also add here or somewhere in the documentation description of what happens when part of the request produces a chunk which is grater than the one specified here?
I think this information will let users know that it's dangerous to reduce this chunk size to small sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added If a single bulk item exceeds this limit an exception will be thrown after the smaller bulk items are submitted.
to the follow-up PR 👍 .
Figured we can continue the conversation of any further tweaking over there.
@@ -416,17 +453,107 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel | |||
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh); | |||
} | |||
builder.insert(0, REQUEST_SEPARATOR + "_bulk"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to your PR, but it seems it makes sense to move this line up (to be the next line after StringBuilder
is created) to avoid array shifting operation. I.e. Just:
final StringBuilder builder = new StringBuilder();
builder.append(REQUEST_SEPARATOR).append("_bulk");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked into this this for my follow-up, but it seems like this would change behavior?
Line 105 in d255396
private static final Function<StringBuilder, StringBuilder> APPEND_OP = sb -> sb.append(sb.length() == 0 ? REQUEST_PARAM_BEGINNING : REQUEST_PARAM_SEPARATOR); |
APPEND_OP
either prefixes a ?
or a &
if the builder is empty. So as currently written I believe it's producing paths like /_bulk?pipeline=foo&refresh=true
.
If we put builder.append(REQUEST_SEPARATOR).append("_bulk");
further up, I think it'd produce paths like /_bulk&pipeline=foo&refresh=true
which wouldn't be properly formatting the query parameters anymore.
How does this look if we just use the string builder for the query parameters, and then just directly build the path? I originally used another StringBuilder for the path variable, but IntelliJ's linter suggested just doing String concatenations.
final StringBuilder bulkRequestQueryParameters = new StringBuilder();
if (ingestPipeline != null) {
APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline);
}
if (bulkRefreshEnabled) {
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I missed that! I'm OK with leaving it as it is right now or updating it to the way you are suggesting. Both ways are OK by me.
btw. Usually final
String concatenation is directly compiled to reuse StringBuilder
by all modern Java compilers, so I believe that's why IntelliJ's suggested just using String concatenation here.
Anyways, the shift is probably too small to actually produce any meaningful performance differences, so I'm OK with either way as you prefer.
@porunov I'd be happy to do this in the follow-up you mentioned. There's already a tentative PR in mind that I was talking about with @li-boxuan above, figured I'd get rolling on those after this merged. Since you mentioned them as a follow-up is this PR valid for merging? I saw you tagged it for 1.1.0 so wasn't sure if that meant it was going to hang out unmerged for a period of time since there's also a live 1.0.1 milestone. |
@criminosis Just merged this PR into ‘master’ branch. Thus, these changes will be available in 1.1.0 release and in the following “commit” releases. However, it won’t be backported to 1.0.1 release since it’s based on the feature which isn’t available in 1.0.0 release. If you want to port these changes to 1.0.1 release for any reason then feel free to open a PR targeting ‘v1.0’ branch with these changes and your previous changes. |
Thanks @porunov! Having it 1.1.0 and the next commit release is perfectly fine. I was asking just as curiosity around merging logistics. Excited to get this into my environment soon as I've been bumping into what prompted this PR fairly often. I'll get rolling on the follow-ups. 👍 |
Closes #4488
Reused the retry logic from #4409 to also apply to bulk requests if an inner bulk item request failed.
Thank you for contributing to JanusGraph!
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
master
)?For code changes:
For documentation related changes: