From e84aed7c0851048df473276864b59b0a17210d46 Mon Sep 17 00:00:00 2001 From: Jatin Sanghvi <20547963+JatinSanghvi@users.noreply.github.com> Date: Mon, 17 Oct 2022 14:25:39 +0530 Subject: [PATCH] Fix message body corruption on republish --- .../Trigger/RabbitMQListener.cs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index a437732..7ed8fd8 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -85,38 +85,44 @@ public Task StartAsync(CancellationToken cancellationToken) this.rabbitMQModel.BasicQos(0, this.prefetchCount, false); // Non zero prefetchSize doesn't work (tested upto 5.2.0) and will throw NOT_IMPLEMENTED exception this.consumer = new EventingBasicConsumer(this.rabbitMQModel.Model); - this.consumer.Received += async (model, ea) => + this.consumer.Received += async (model, args) => { - using Activity activity = StartActivity(ea); - - var input = new TriggeredFunctionData() { TriggerValue = ea }; + // The RabbitMQ client rents an array from the ArrayPool to hold a copy of the message body, and passes it + // to the listener. Once all event handlers are executed, the array is returned back to the pool so that the + // memory can be reused for future messages for that connection. However, since our event handler is async, + // the very first await statement i.e. the call to TryExecuteAsync below causes the event handler invocation + // to complete and lets the RabbitMQ client release the memory. This led to message body corruption when the + // message is republished (see: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211). + // + // We chose to copy the message body instead of having a new 'args' object as there is only one event + // handler registered for the consumer so there should be no side-effects. + args.Body = args.Body.ToArray(); + + using Activity activity = StartActivity(args); + + var input = new TriggeredFunctionData() { TriggerValue = args }; FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false); if (!result.Succeeded) { - ea.BasicProperties.Headers ??= new Dictionary(); - ea.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue); + args.BasicProperties.Headers ??= new Dictionary(); + args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue); int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1; if (requeueCount >= 5) { // Add message to dead letter exchange. this.logger.LogDebug("Requeue count exceeded: rejecting message"); - this.rabbitMQModel.BasicReject(ea.DeliveryTag, false); + this.rabbitMQModel.BasicReject(args.DeliveryTag, false); return; } this.logger.LogDebug("Republishing message"); - ea.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount; - - // RabbitMQ client library seems to be reusing the memory pointed by 'ea.Body' for subsequent - // message-received events. This led to https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211. - // Hence, pass a copy of 'ea.Body' to method 'BasicPublish' instead of the object itself to prevent - // sharing of the memory and possibility of memory corruption. - this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, ea.BasicProperties, ea.Body.ToArray()); + args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount; + this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body); } - this.rabbitMQModel.BasicAck(ea.DeliveryTag, false); + this.rabbitMQModel.BasicAck(args.DeliveryTag, false); }; this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer);