Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ERROR: corrupted messages continues on 2.0.1 #211

Closed
aaguilartablada opened this issue Sep 22, 2022 · 20 comments · Fixed by #215 or #218
Closed

ERROR: corrupted messages continues on 2.0.1 #211

aaguilartablada opened this issue Sep 22, 2022 · 20 comments · Fixed by #215 or #218

Comments

@aaguilartablada
Copy link
Contributor

Hello.

The problem described in #207 still occurs. I have tested the same dummy function with the version 2.0.1 and I still see corrupted messages.

I have tested it over Function Host 4.1.1 and 4.11.0 with no difference.

@M3LiNdRu
Copy link

After further investigation I noticed that message is malformed before publish it in the queue, so it seems that rabbitmq client does not have any relation with the issue.

Something weird occurs here: Microsoft.Azure.WebJobs.Extensions.RabbitMQ.RabbitMQListener

this.consumer.Received += async (model, ea) =>
 {
     // ea.Body is healthy here
     var input = new TriggeredFunctionData() { TriggerValue = ea };
     FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false);

     if (result.Succeeded)
     {
          this.rabbitMQModel.BasicAck(ea.DeliveryTag, false);
     }
     else
     {
           // ea.Body is already corrupted here
           if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey(Constants.RequeueCount))
           {
                 this.CreateHeadersAndRepublish(ea);
           }
           else
           {
                 this.RepublishMessages(ea);
           }
     }
 };

After TryExecuteAsync execution, ea.Body comes corrupted.

@aaguilartablada
Copy link
Contributor Author

Exactly @M3LiNdRu . We have tested as far as we could introducing debugging lines in RabbitMQListener and we saw the same as you mention.

@JatinSanghvi
Copy link
Contributor

Sorry, I have been busy this week on a Microsoft Hackathon project. As per my testing (the details are there in the other issue), I found the corruption to have happened out of the extension code. I could not reproduce the issue after upgrading to the latest client library with repeated testing. I had done another round of testing before creating release 2.0.1. I will try to come back to this tomorrow if time permits.

@aaguilartablada
Copy link
Contributor Author

Hello @JatinSanghvi . I've just created a PR (#212) that solves the issue. It is a workaround, but it works. Please, give it some time when you can. Thanks!

@JatinSanghvi
Copy link
Contributor

Apologies due to error on my part. I used a string of length 8192 characters for testing. It seems the issue never reproduces for strings of lengths that are multiple of 512 bytes. I am not clear on why the message string was found corrupted only after receiving it the next time. I will go through @M3LiNdRu's message, try to debug the issue inside WebJobs SDK code, and appropriately will approve the associated PR if it looks like a correct fix or workaround for the issue. I will update my findings as I have them on this GitHub issue.

@JatinSanghvi
Copy link
Contributor

After TryExecuteAsync execution, ea.Body comes corrupted.

I am having a different observation which matches with what I had during initial testing. I wrapped the call to BasicPublish inside method RepublishMessages in file RabbitMQListener.cs like below:

this.logger.LogCritical($"RepublishMessages (1): {ea.Body.Span[0]}");
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, basicProperties: ea.BasicProperties, body: ea.Body);
this.logger.LogCritical($"RepublishMessages (2): {ea.Body.Span[0]}");
this.rabbitMQModel.BasicAck(ea.DeliveryTag, false); // Manually ACK'ing, but ack after resend
this.logger.LogCritical($"RepublishMessages (3): {ea.Body.Span[0]}");

This intermittently produces following output:

crit: Host.Triggers.RabbitMQ[0]
      RepublishMessages (1): 97
crit: Host.Triggers.RabbitMQ[0]
      RepublishMessages (2): 1
crit: Host.Triggers.RabbitMQ[0]
      RepublishMessages (3): 1

This means the client is updating the body before sending it back to RabbitMQ queue (as we can see the corruption). From next time onwards, the listener to this.consumer.Received event receives the same corrupted string. Somehow the error is not reproducible when debugging, which might the case of release vs debug DLLs.

@JatinSanghvi
Copy link
Contributor

JatinSanghvi commented Sep 29, 2022

I think I have understood the reason for the observation above. The call to BasicPublish is quickly triggering another event on this.consumer.Received before the handler for the previous event could finish logging of the message with prefix RepublishMessages (2) in above code. It seems RabbitMQ client is internally doing an unsafe operation and reusing the same chunk of memory to the pass the body to next handlers. It is not enough that the previous handler has reference to the ea.Body. RabbitMQ client is free to prep that chunk of memory for the next event.

The comment in RabbitMQ client source code here about requiring the handers to copy the delivery body partially points to this happening. Looking into the entire source code of the client should have cleared all doubts.

This explains the corrupted message while logging in the end of previous invocation but does not explain the corruption in future invocations. I guess the BasicPublish call (when republishing the message) is not an atomic operation for messages of size larger than 512 bytes, so it is not done before the client starts overriding bytes in the body's memory. There are few more unknowns, but I hope I have sufficient knowledge now to come up with a proper fix.

@tijmenamsing
Copy link

Hi @JatinSanghvi, great to see that you've found a solution. When do you expect to release it?

@JatinSanghvi
Copy link
Contributor

Hi @JatinSanghvi, great to see that you've found a solution. When do you expect to release it?

This is scheduled for Thursday this week. The release needs to be approved and the approvers are on vacation :)

@JatinSanghvi
Copy link
Contributor

JatinSanghvi commented Oct 7, 2022

We have released version 2.0.2 that should have this issue fixed. Thanks to everyone on this issue thread.

@tijmenamsing
Copy link

@JatinSanghvi We have updated to v2.0.2 but we still have an issue with corrupted request bodies. The error isn't always the same. We initially send a json object of type ProfileRequest, which is properly deserialized the first time but isn't thereafter.

  • Unable to deserialize. Newtonsoft.Json.JsonReaderException: Error parsing comment. Expected: *, got e. Path '', line 1, position 1.

  • Unable to deserialize. Newtonsoft.Json.JsonSerializationException: Cannot deserialize the current JSON array (e.g. [1,2,3]) into type 'ProfileRequest' because the type requires a JSON object (e.g. {"name":"value"}) to deserialize correctly. To fix this error either change the JSON to a JSON object (e.g. {"name":"value"}) or change the deserialized type to an array or a type that implements a collection interface (e.g. ICollection, IList) like List<T> that can be deserialized from a JSON array. JsonArrayAttribute can also be added to the type to force it to deserialize from a JSON array. Path '', line 1, position 1.

@JatinSanghvi
Copy link
Contributor

Hi @tijmenamsing, if possible, can you share a function that will reproduce the error?

@yvanoers
Copy link

yvanoers commented Oct 13, 2022

I've traced this down to being caused by a combination of how the RabbitMQ client manages the memory of the body that gets passed in to the Received handler combined with the event handler being async and awaiting TryExecuteAsync.

The RabbitMQ client (in ConcurrentConsumerDispatcher.cs in method HandleBasicDeliver(...)) rents an array from an ArrayPool to hold a copy of the body for the listeners to use.
That array is returned to the pool in a finally block after all event handlers are finished.
However, the event handler here is async. At the moment the first await statement (which is TryExecuteAsync`) is hit in the handler, the calling function continues and frees the memory.
The memory can then be reused for other messages, corrupting whatever the handler is still working with.

The problem can be fixed by creating a copy of ea.Body before calling TryExecuteAsync and using that copy in the call to BasicPublish.
I can provide a PR if desirable.

This is somewhat of a concurrency problem, so to reproduce it, the Azure Function needs to be sent multiple messages at once. Using only one message will not (or almost never) cause memory corruption.
I've been able to reproduce the problem in Debug mode with 5 messages sent to our function.
Our function in this scenario has some work to do (calling a web service) and eventually throws an exception.

Edit:
I reckon PR #212 would actually have fixed the problem, because in that PR the copy does happen before TryExecuteAsync.

@JatinSanghvi

@JatinSanghvi
Copy link
Contributor

JatinSanghvi commented Oct 14, 2022

Hi @yvanoers, this is a great finding. I had actually made a change similar to what you suggested to be committed to be on the safer side as part of refactoring listener class, though I did not know that async is at play here. If possible, please have a check on the highlighted lines there and let me know if you think that should fix the issue.

EDIT: The comment needs to be corrected there.

@JatinSanghvi JatinSanghvi reopened this Oct 14, 2022
@yvanoers
Copy link

yvanoers commented Oct 14, 2022

Hi @JatinSanghvi,
I tested the change you linked and it does fix the problem, but I would refrain from reassigning the Body property in the ea object. Although this (currently) does not effectively change the content nor introduce a problem, there might be unexpected side effects if there should ever be another event handler added that expects the property to hold the precise value the event was called with.

The same can be said for ea.BasicProperties.Headers in that method, by the way.

I would suggest using a local variable to hold the copy of ea.Body to eliminate any and all present or future unintended changes to it.

@yvanoers
Copy link

@JatinSanghvi
Awesome that the fix has been merged!
Is it possible to release it soon? We're eager to update our library.

@JatinSanghvi
Copy link
Contributor

I should get the new release out in a day.

@JatinSanghvi
Copy link
Contributor

The new version is released now. Thanks everyone for your help and support towards getting the issue fixed.

@JatinSanghvi
Copy link
Contributor

JatinSanghvi commented Nov 3, 2022

It seems that AsyncEventingBasicConsumer class from RabbitMQ client eliminates the issue so there is no need to copy the message body. Check https://gist.github.com/JatinSanghvi/0993825524957bb3f36f207ac7d4a676. I will add a commit for future release that replaces EventingBasicConsumer with AsyncEventingBasicConsumer.

The AsyncEventHandlerExtensions implementation and the definition of AsyncEventHandler in the gist are copied from RabbitMQ dotnet client 6.x source code.

@yvanoers
Copy link

yvanoers commented Nov 3, 2022

That looks like the intended way - it should be the actual fix.
Excellent!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment