-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BLOB STORAGE: Pooled connection observed an error reactor.netty.http.client.HttpClientOperations$PrematureCloseException: Connection prematurely closed BEFORE response #5180
Comments
Hi, @vmaheshw. We will be working to add more samples before we GA. We are aware that they need to be filled out more, and it's just a matter of balancing feature work with samples. But like I said it'll be more complete before GA. Assuming your blockSize is the same as the size of your byteArray, I'm a bit surprised you're hitting this exception. Are you able to reproduce this consistently? And are you able to share more of a stack trace? |
I saw this in different path this time. Basically, the sample code I pasted above is not working with internal retry logic when some network related issue occurs. So two things to investigate:
|
@rickle-msft : Should I create another BUG ticket for the crash, to track Async example and the Exception separately. |
Sure. It might be better to rename this one for the crash since there's already context here and then open a new on for the examples and just reference this issue since we've already acknowledged that ask |
@vmaheshw I've run the code snippet you gave me locally and forced an IO error by unplugging my network connection. I also reran our test for retrying network errors from the commit for this release, and in both cases it behaved as expected. I have a suspicion that this line is actually underlying most of what's going on: IllegalReferenceCountException extends IllegalStateException, which is what we check for in the retry policy to give that extended error message, and we don't retry an IllegalStateException, which explains why you're seeing it fail so quickly. I also think that would explain why the connection gets closed prematurely--we're failing to read from your ByteBuf so we cancel the operation. The next question, then, is why your ByteBuf has a refCount of 0. Could you try setting a watch point on that value for your ByteBuf object and see where it's getting decremented? |
@rickle-msft: 2019/08/30 18:21:54.198 INFO com.azure.storage.blob.BlockBlobAsyncClient@ Upload block start for blob: 00057 for block size:10524791. _blobClient.stageBlock(blockIdEncoded, Flux.just(buf), blockSize).block(); |
@vmaheshw Sorry for not being clear. I wasn't suggesting that the length of the block is zero. ByteBuf's are explicitly reference counted and will throw this exception if somehow someone tries to read from the buffer after the reference count has been decremented to zero because it's effectively been marked for deallocation. I was asking if you'd be able to set a watch point on this reference count value to determine when this value is changing (probably somewhere internal to the sdk). Something worth noting here is that in preview 3, which will be out in about a week I think, we are changing this api signature to accept ByteBuffer instead of ByteBuf, so hopefully that will fix this issue for free if you'd rather wait for that next preview instead of spending more cycles debugging this now. @Jianghao It's interesting that this is happening only after a 12 hour run. Did you ever encounter anything like this in your stress testing? (TLDR it seems that something is trying to read a ByteBuf with refCount 0 and throwing). |
Assigning to @jianghaolu as this appears to be azure core related. CC: @alzimmermsft |
Can this be validated against the latest azure-core that brings in the updated Reactor / Netty dependencies? |
I think there is an issue with below code: byte[] byteArray;
ByteBuf buf = Unpooled.wrappedBuffer(byteArray, 0, blockSize);
_blobClient.stageBlock(blockId, Flux.just(buf), blockSize).block(); My gut feeling is - though the provided Flux<ByteBuf> inputFlux = Flux.defer(() -> {
byte[] byteArray = // init byte[];
ByteBuf buf = Unpooled.wrappedBuffer(byteArray, 0, blockSize);
return Flux.just(buf);
});
_blobClient.stageBlock(blockId, inputFlux, blockSize).block(); This way using |
Ok, I can see that the storage blob Line 92 in 1b31221
Flux<ByteBuf> bufferedBody =
(context.httpRequest().body() == null)
? null
: context.httpRequest().body().map(ByteBuf::duplicate); indeed call I guess, with preview2 above proposed |
Merging the info from my last two comments simplifying them to simple flow. I think below is possible flow causing the error:
|
@JonathanGiles I will double check the code to see whether current ByteBuffer based core/storage has similar flaw in different places. |
Thanks @anuchandy |
@anuchandy Thanks for investigating this. I suspect that since we moved back to accepting ByteBuffers in preview 3, the logic in the retry policy (duplicate) should work now, right? |
@rickle-msft right, with |
@anuchandy @rickle-msft Are there unit test cases to verify that RetryPolicy is working fine? I'm not able to figure out a way to enforce RetryPolicy other than wait for error from storage account. |
@vmaheshw There are unit tests for the RetryPolicy here. I'm not sure I understood the second part of your question, though. What are you trying to achieve? And what do you mean by "enforce RetryPolicy". If you are still seeing this error after switching to preview3, we'd love to get more information on that. |
@rickle-msft I want to be sure that RetryPolicy is working as described. I could not find a way to simulate failure which will automatically enforce the default Retry. |
I am going to close this issue as I believe this issue was addressed with the switch to accepting ByteBuffers, and we have not heard anything to the contrary from the customer. Please feel free to reopen or post again if you continue to his this or similar issues. |
Thanks for working with Microsoft on GitHub! Tell us how you feel about your experience using the reactions on this comment. |
Is your feature request related to a problem? Please describe.
Query/Question
There are no examples for Async api. I'm currently more interested in BlockBlob stageBlock and flux with replayable.
I'm seeing this in my code.
java.lang.IllegalStateException: The request failed because the size of the contents of the provided Flux did not match the provided data size upon attempting to retry. This is likely caused by the Flux not being replayable. To support retries, all Fluxes must produce the same data for each subscriber. Please ensure this behavior.
Code Snippet:
byte[] byteArray;
ByteBuf buf = Unpooled.wrappedBuffer(byteArray, 0, blockSize);
_blobClient.stageBlock(blockId, Flux.just(buf), blockSize).block();
Describe the solution you'd like
There should be sample examples for async api.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
Add any other context or screenshots about the feature request here.
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
The text was updated successfully, but these errors were encountered: