Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix message body corruption on republish #218

Merged
merged 1 commit into from
Oct 17, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,44 @@ public Task StartAsync(CancellationToken cancellationToken)
this.rabbitMQModel.BasicQos(0, this.prefetchCount, false); // Non zero prefetchSize doesn't work (tested upto 5.2.0) and will throw NOT_IMPLEMENTED exception
this.consumer = new EventingBasicConsumer(this.rabbitMQModel.Model);

this.consumer.Received += async (model, ea) =>
this.consumer.Received += async (model, args) =>
{
using Activity activity = StartActivity(ea);

var input = new TriggeredFunctionData() { TriggerValue = ea };
// The RabbitMQ client rents an array from the ArrayPool to hold a copy of the message body, and passes it
// to the listener. Once all event handlers are executed, the array is returned back to the pool so that the
// memory can be reused for future messages for that connection. However, since our event handler is async,
// the very first await statement i.e. the call to TryExecuteAsync below causes the event handler invocation
// to complete and lets the RabbitMQ client release the memory. This led to message body corruption when the
// message is republished (see: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
//
// We chose to copy the message body instead of having a new 'args' object as there is only one event
// handler registered for the consumer so there should be no side-effects.
args.Body = args.Body.ToArray();

using Activity activity = StartActivity(args);

var input = new TriggeredFunctionData() { TriggerValue = args };
FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false);

if (!result.Succeeded)
{
ea.BasicProperties.Headers ??= new Dictionary<string, object>();
ea.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
args.BasicProperties.Headers ??= new Dictionary<string, object>();
args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;

if (requeueCount >= 5)
{
// Add message to dead letter exchange.
this.logger.LogDebug("Requeue count exceeded: rejecting message");
this.rabbitMQModel.BasicReject(ea.DeliveryTag, false);
this.rabbitMQModel.BasicReject(args.DeliveryTag, false);
return;
}

this.logger.LogDebug("Republishing message");
ea.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;

// RabbitMQ client library seems to be reusing the memory pointed by 'ea.Body' for subsequent
// message-received events. This led to https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211.
// Hence, pass a copy of 'ea.Body' to method 'BasicPublish' instead of the object itself to prevent
// sharing of the memory and possibility of memory corruption.
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, ea.BasicProperties, ea.Body.ToArray());
args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
}

this.rabbitMQModel.BasicAck(ea.DeliveryTag, false);
this.rabbitMQModel.BasicAck(args.DeliveryTag, false);
};

this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer);
Expand Down