Skip to content

Commit

Permalink
Fix message body corruption on republish
Browse files Browse the repository at this point in the history
  • Loading branch information
JatinSanghvi committed Oct 14, 2022
1 parent a46b8df commit dddf571
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,45 @@ public Task StartAsync(CancellationToken cancellationToken)
this.rabbitMQModel.BasicQos(0, this.prefetchCount, false); // Non zero prefetchSize doesn't work (tested upto 5.2.0) and will throw NOT_IMPLEMENTED exception
this.consumer = new EventingBasicConsumer(this.rabbitMQModel.Model);

this.consumer.Received += async (model, ea) =>
this.consumer.Received += async (model, args) =>
{
using Activity activity = StartActivity(ea);

var input = new TriggeredFunctionData() { TriggerValue = ea };
// The RabbitMQ client rents an array from the ArrayPool to hold a copy the message body, and passes it to
// the listener. Once all event handlers are executed, the array is returned back to the pool so that the
// memory can be reused for future messages for that connection. However, since our event handler is async,
// the very first await statement i.e. the call to TryExecuteAsync ends the event handler invocation and
// let the RabbitMQ client reclaim the memory. This led to issue with message corruption on republish (see:
// https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
//
// Since the same args argument is passed to all event handlers, replacing it with a local copy (with
// message body copied) will ensure that the other event handlers (in case they are present) will receive
// exactly the same args as it was composed by the RabbitMQ client.
args = new BasicDeliverEventArgs(args.ConsumerTag, args.DeliveryTag, args.Redelivered, args.Exchange, args.RoutingKey, args.BasicProperties, args.Body.ToArray());

using Activity activity = StartActivity(args);

var input = new TriggeredFunctionData() { TriggerValue = args };
FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false);

if (!result.Succeeded)
{
ea.BasicProperties.Headers ??= new Dictionary<string, object>();
ea.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
args.BasicProperties.Headers ??= new Dictionary<string, object>();
args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;

if (requeueCount >= 5)
{
// Add message to dead letter exchange.
this.logger.LogDebug("Requeue count exceeded: rejecting message");
this.rabbitMQModel.BasicReject(ea.DeliveryTag, false);
this.rabbitMQModel.BasicReject(args.DeliveryTag, false);
return;
}

this.logger.LogDebug("Republishing message");
ea.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;

// RabbitMQ client library seems to be reusing the memory pointed by 'ea.Body' for subsequent
// message-received events. This led to https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211.
// Hence, pass a copy of 'ea.Body' to method 'BasicPublish' instead of the object itself to prevent
// sharing of the memory and possibility of memory corruption.
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, ea.BasicProperties, ea.Body.ToArray());
args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
}

this.rabbitMQModel.BasicAck(ea.DeliveryTag, false);
this.rabbitMQModel.BasicAck(args.DeliveryTag, false);
};

this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer);
Expand Down

0 comments on commit dddf571

Please sign in to comment.