Skip to content

Commit

Permalink
Fix/routing key to queue name (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
pat-goins authored Feb 11, 2022
1 parent ad53e12 commit f3ccff2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/Trigger/RabbitMQListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ internal void CreateHeadersAndRepublish(BasicDeliverEventArgs ea)

ea.BasicProperties.Headers[Constants.RequeueCount] = 0;
_logger.LogDebug("Republishing message");
_rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: ea.RoutingKey, basicProperties: ea.BasicProperties, body: ea.Body);
_rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: _queueName, basicProperties: ea.BasicProperties, body: ea.Body);
_rabbitMQModel.BasicAck(ea.DeliveryTag, false);
}

Expand All @@ -173,7 +173,7 @@ internal void RepublishMessages(BasicDeliverEventArgs ea)
if (Convert.ToInt32(ea.BasicProperties.Headers[Constants.RequeueCount]) < 5)
{
_logger.LogDebug("Republishing message");
_rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: ea.RoutingKey, basicProperties: ea.BasicProperties, body: ea.Body);
_rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: _queueName, basicProperties: ea.BasicProperties, body: ea.Body);
_rabbitMQModel.BasicAck(ea.DeliveryTag, false); // Manually ACK'ing, but ack after resend
}
else
Expand Down
17 changes: 9 additions & 8 deletions test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQListenerTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
Expand Down Expand Up @@ -46,30 +46,31 @@ public RabbitMQListenerTests()
[Fact]
public void CreatesHeadersAndRepublishes()
{
var queueName = "blah";
_mockService.Setup(m => m.RabbitMQModel).Returns(_mockModel.Object);

RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object, 30);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, queueName, _mockLogger.Object, _mockDescriptor.Object, 30);

var properties = Mock.Of<IBasicProperties>();
BasicDeliverEventArgs args = new BasicDeliverEventArgs("tag", 1, false, "", "queue", properties, Encoding.UTF8.GetBytes("hello world"));
BasicDeliverEventArgs args = new BasicDeliverEventArgs("tag", 1, false, "", "routingKey", properties, Encoding.UTF8.GetBytes("hello world"));
listener.CreateHeadersAndRepublish(args);

_mockModel.Verify(m => m.BasicAck(It.IsAny<ulong>(), false), Times.Exactly(1));
_mockModel.Verify(m => m.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<IBasicProperties>(), It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(1));
_mockModel.Verify(m => m.BasicPublish(It.IsAny<string>(), queueName, It.IsAny<IBasicProperties>(), It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(1));
}

[Fact]
public void RepublishesMessages()
{
var queueName = "blah";
_mockService.Setup(m => m.RabbitMQModel).Returns(_mockModel.Object);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object, 30);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, queueName, _mockLogger.Object, _mockDescriptor.Object, 30);

var properties = Mock.Of<IBasicProperties>(property => property.Headers == new Dictionary<string, object>() { { "requeueCount", 1 } });
BasicDeliverEventArgs args = new BasicDeliverEventArgs("tag", 1, false, "", "queue", properties, Encoding.UTF8.GetBytes("hello world"));
BasicDeliverEventArgs args = new BasicDeliverEventArgs("tag", 1, false, "", "routingKey", properties, Encoding.UTF8.GetBytes("hello world"));
listener.RepublishMessages(args);

_mockModel.Verify(m => m.BasicAck(It.IsAny<ulong>(), false), Times.Exactly(1));
_mockModel.Verify(m => m.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<IBasicProperties>(), It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(1));
_mockModel.Verify(m => m.BasicPublish(It.IsAny<string>(), queueName, It.IsAny<IBasicProperties>(), It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(1));
}

[Fact]
Expand Down

0 comments on commit f3ccff2

Please sign in to comment.